Notable projects of interest

Highlight of Notable projects I liked working on
markdown
Published

September 6, 2023

Introduction

This is a high level introduction to the projects I have worked on that I’m proud of and things which excite me(or make me irritated enough to want to change).

=============================================================================

MerDB

MerDB is a dataframe library like pandas but

  • is a minimal relational api to query data (like SQL but in Python)
  • has Unix like pipes to compose operators using the | syntax
  • scales to multi core or a cluster(via Modin)
  • processes data too big to fit into memory(via Modin)
  • support interactive and optimized processing(optimizations in roadmap)
import pandas as pd
from merdb.interactive import *

def is_senior(row) -> bool:
    return row['age'] > 35


def double_age(row) -> int:
    return row["age"] * 2

# Test Data
cols = ["name", "age"]
people_df = pd.DataFrame([
    ["Raj", 35],
    ["Sona", 20],
    ["Abby", 70],
    ["Abba", 90],
], columns=cols)

# One can specify functions without any source data like quadruple age
# map is a merdb function
quadruple_age = map(double_age, "age") | map(double_age, "age")

result = (t(people_df) # convert people_df to a merdb table
          | where(is_senior)
          | order_by("name", "asc")
          | quadruple_age # Unix like pipe syntax making it easy to refactor out intermediate processing
          | select("age")
          | rename({"age": "new_age"})
          )

# Convert to Pandas Dataframe and print
print(result.df())

# Output
   new_age
0      360
1      280

For more details: https://github.com/RAbraham/merdb

=============================================================================

Thampi

Thampi is a serverless ML serving system that runs on AWS Lambda.

Train and Save

For e.g. If you have a training script called train_and_save.py

import numpy as np
from sklearn import datasets
from typing import Dict
import thampi
from sklearn.neighbors import KNeighborsClassifier
 
class ThampiWrapper(thampi.Model):
    def __init__(self, sklearn_model):
        self.sklearn_model = sklearn_model
        super().__init__()
 
 
    def predict(self, args: Dict, context) -> Dict:
        original_input = [args.get('input')]
        result = self.sklearn_model.predict(np.array(original_input))
        return dict(result=int(list(result)[0]))
 

def train_model():
    iris = datasets.load_iris()
    ...
    knn = KNeighborsClassifier()
    knn.fit(...)
    return ThampiWrapper(knn)
 
 
if __name__ == '__main__':
    model = train_model()
    thampi.save(model, 'iris-sklearn', './models')

On running

python train_and_save.py
  • train_model trains the sklearn model as knn and wraps a ThampiWrapper around it.
  • thampi.save will store the ThampiWrapper on the file system for now
  • When we upload this to AWS Lambda, the predict method in ThampiWrapper will be called and the inference takes place.

Serving the model

Now it’s time to deploy the model to AWS Lambda. All you have to provide is the requirements.txt file along with the above trained ./models/iris-sklearn directory.

thampi serve staging --model_dir=./models/iris-sklearn --dependency_file=./requirements.txt

Find the endpoint by

thampi info staging

You’ll see something similar to:

{'url': 'https://8i7a6qtlri.execute-api.us-east-1.amazonaws.com/staging/mymodel/predict'}

Predict

You can do a curl replacing a_url with the url that you receive from above.

a_url = https://.../mymodel/predict
curl -d '{"data": {"input": [5.9, 3.2, 4.8, 1.8]}}' -H "Content-Type: application/json" -X POST $a_url

You’ll see output like

Output:

{
  "properties": {
    "instance_id": "9dbc56dd-936d-4dff-953c-8c22267ebe84",
    "served_time_utc": "2018-09-06T22:03:09.247038",
    "thampi_data_version": "0.1",
    "trained_time_utc": "2018-09-06T22:03:04.886644"
  },
  "result": {
    "result": 2
  }
}

For details, refer to: https://rabraham.github.io/site/posts/thampi-introduction.html

=============================================================================

Mercylog

Mercylog is Datalog, a logic programming language, in Python.

Suppose you want to find the ancestors for a given child. The data is in a table with two columns parent and child in a table called family.

The general code is:

  • if X is the parent of Y, then X is an ancestor of Y
  • if X is the parent of Y and Y is an ancestor of Z, then X is an ancestor of Z too. Do this recursively

For comparison, here is the SQL code.

SQL Code

WITH RECURSIVE Ancestors AS (
  SELECT parent, child
  FROM family
  WHERE child = 'given_child' -- Replace 'given_child' with the specific child you are looking for.
  
  UNION ALL
  
  SELECT f.parent, f.child
  FROM family f
  INNER JOIN Ancestors a ON f.child = a.parent
)
SELECT parent AS ancestor FROM Ancestors WHERE parent IS NOT NULL;

Datalog


ancestor(X, Y) :- family(X, Y).
ancestor(X, Z) :- family(X, Y), ancestor(Y, Z).

The query will be

ancestor(A, 'given_child').

Mercylog

from mercylog import db, R, V, and_
from mercylog.df import row

X = V.X
Y = V.Y
family = R.family
ancestor = R.ancestor


rules = [
 # Make a family relation for a dataframe with columns 'parent' and 'child'
 family(X, Y) << row(parent=X, child=Y),

 # Actual rules
 # ancestor(X, Y) :- family(X, Y).
 # ancestor(X, Z) :- family(X, Y), ancestor(Y, Z). 

 ancestor(X, Y) << family(X, Y),
 ancestor(X, Z) << and_(family(X, Y), ancestor(Y, Z))
]

query = ancestor(X, "given_child") 

df = ... # some data 
d = db(df)
result = d(rules + [query])

For details: https://github.com/RAbraham/mercylog

=============================================================================

Jaya

You want to build a pipeline where any files put in bucket1 triggers an AWS Lambda to copy it to bucket2

flowchart LR
  A[bucket1] --> B(CopyLambda)
  B --> C[bucket2]

Jaya makes it easy to build such pipelines easily in Python without yaml or json like config.

NOTE: Code and commands elided to show the central idea

# copy_pipeline.py
from jaya import S3, Pipeline, AWSLambda

lambda_name = 'CopyLambda'

# trigger notifications on object creation
s1 = S3('bucket1',
        events=[S3.event(S3.ALL_CREATED_OBJECTS, service_name=lambda_name)])


copy_lambda = AWSLambda(lambda_name,
                        copy_handler,
            ...)

s2 = S3('bucket2')

# Like the diagram above
p = s1 >> copy_lambda >> s2

piper = Pipeline("my-copy-pipeline", [p])


def copy_handler(aws_config, jaya_context, event, context):
    # aws_config for creds
    # jaya_context to get access to the pipeline source and destination services
    # event and context are AWS Lambda parameters

    # Copy files in `event`

    pass

The code piece p = s1 >> copy_lambda >> s2 will create bucket1 and bucket2 if they don’t exist. It will create/update CopyLambda

Deploy the pipeline

$ jaya deploy --file=./copy_pipeline.py

For more details, see https://github.com/RAbraham/jaya

Closing

Hope you liked this! If you want to check out other similar stuff, check out my blog or my fledgling Youtube Channel