flowchart LR A[bucket1] --> B(CopyLambda) B --> C[bucket2]
Introduction
This is a high level introduction to the projects I have worked on that I’m proud of and things which excite me(or make me irritated enough to want to change).
=============================================================================
MerDB
MerDB is a dataframe library like pandas but
- is a minimal relational api to query data (like SQL but in Python)
- has Unix like pipes to compose operators using the
|syntax - scales to multi core or a cluster(via Modin)
- processes data too big to fit into memory(via Modin)
- support interactive and optimized processing(optimizations in roadmap)
import pandas as pd
from merdb.interactive import *
def is_senior(row) -> bool:
return row['age'] > 35
def double_age(row) -> int:
return row["age"] * 2
# Test Data
cols = ["name", "age"]
people_df = pd.DataFrame([
["Raj", 35],
["Sona", 20],
["Abby", 70],
["Abba", 90],
], columns=cols)
# One can specify functions without any source data like quadruple age
# map is a merdb function
quadruple_age = map(double_age, "age") | map(double_age, "age")
result = (t(people_df) # convert people_df to a merdb table
| where(is_senior)
| order_by("name", "asc")
| quadruple_age # Unix like pipe syntax making it easy to refactor out intermediate processing
| select("age")
| rename({"age": "new_age"})
)
# Convert to Pandas Dataframe and print
print(result.df())
# Output
new_age
0 360
1 280For more details: https://github.com/RAbraham/merdb
=============================================================================
Thampi
Thampi is a serverless ML serving system that runs on AWS Lambda.
Train and Save
For e.g. If you have a training script called train_and_save.py
import numpy as np
from sklearn import datasets
from typing import Dict
import thampi
from sklearn.neighbors import KNeighborsClassifier
class ThampiWrapper(thampi.Model):
def __init__(self, sklearn_model):
self.sklearn_model = sklearn_model
super().__init__()
def predict(self, args: Dict, context) -> Dict:
original_input = [args.get('input')]
result = self.sklearn_model.predict(np.array(original_input))
return dict(result=int(list(result)[0]))
def train_model():
iris = datasets.load_iris()
...
knn = KNeighborsClassifier()
knn.fit(...)
return ThampiWrapper(knn)
if __name__ == '__main__':
model = train_model()
thampi.save(model, 'iris-sklearn', './models')On running
python train_and_save.pytrain_modeltrains the sklearn model asknnand wraps aThampiWrapperaround it.thampi.savewill store theThampiWrapperon the file system for now- When we upload this to AWS Lambda, the
predictmethod inThampiWrapperwill be called and the inference takes place.
Serving the model
Now it’s time to deploy the model to AWS Lambda. All you have to provide is the requirements.txt file along with the above trained ./models/iris-sklearn directory.
thampi serve staging --model_dir=./models/iris-sklearn --dependency_file=./requirements.txtFind the endpoint by
thampi info stagingYou’ll see something similar to:
{'url': 'https://8i7a6qtlri.execute-api.us-east-1.amazonaws.com/staging/mymodel/predict'}Predict
You can do a curl replacing a_url with the url that you receive from above.
a_url = https://.../mymodel/predict
curl -d '{"data": {"input": [5.9, 3.2, 4.8, 1.8]}}' -H "Content-Type: application/json" -X POST $a_urlYou’ll see output like
Output:
{
"properties": {
"instance_id": "9dbc56dd-936d-4dff-953c-8c22267ebe84",
"served_time_utc": "2018-09-06T22:03:09.247038",
"thampi_data_version": "0.1",
"trained_time_utc": "2018-09-06T22:03:04.886644"
},
"result": {
"result": 2
}
}For details, refer to: https://rabraham.github.io/site/posts/thampi-introduction.html
=============================================================================
Mercylog
Mercylog is Datalog, a logic programming language, in Python.
Suppose you want to find the ancestors for a given child. The data is in a table with two columns parent and child in a table called family.
The general code is:
- if X is the parent of Y, then X is an ancestor of Y
- if X is the parent of Y and Y is an ancestor of Z, then X is an ancestor of Z too. Do this recursively
For comparison, here is the SQL code.
SQL Code
WITH RECURSIVE Ancestors AS (
SELECT parent, child
FROM family
WHERE child = 'given_child' -- Replace 'given_child' with the specific child you are looking for.
UNION ALL
SELECT f.parent, f.child
FROM family f
INNER JOIN Ancestors a ON f.child = a.parent
)
SELECT parent AS ancestor FROM Ancestors WHERE parent IS NOT NULL;Datalog
ancestor(X, Y) :- family(X, Y).
ancestor(X, Z) :- family(X, Y), ancestor(Y, Z).The query will be
ancestor(A, 'given_child').Mercylog
from mercylog import db, R, V, and_
from mercylog.df import row
X = V.X
Y = V.Y
family = R.family
ancestor = R.ancestor
rules = [
# Make a family relation for a dataframe with columns 'parent' and 'child'
family(X, Y) << row(parent=X, child=Y),
# Actual rules
# ancestor(X, Y) :- family(X, Y).
# ancestor(X, Z) :- family(X, Y), ancestor(Y, Z).
ancestor(X, Y) << family(X, Y),
ancestor(X, Z) << and_(family(X, Y), ancestor(Y, Z))
]
query = ancestor(X, "given_child")
df = ... # some data
d = db(df)
result = d(rules + [query])For details: https://github.com/RAbraham/mercylog
=============================================================================
Jaya
You want to build a pipeline where any files put in bucket1 triggers an AWS Lambda to copy it to bucket2
Jaya makes it easy to build such pipelines easily in Python without yaml or json like config.
NOTE: Code and commands elided to show the central idea
# copy_pipeline.py
from jaya import S3, Pipeline, AWSLambda
lambda_name = 'CopyLambda'
# trigger notifications on object creation
s1 = S3('bucket1',
events=[S3.event(S3.ALL_CREATED_OBJECTS, service_name=lambda_name)])
copy_lambda = AWSLambda(lambda_name,
copy_handler,
...)
s2 = S3('bucket2')
# Like the diagram above
p = s1 >> copy_lambda >> s2
piper = Pipeline("my-copy-pipeline", [p])
def copy_handler(aws_config, jaya_context, event, context):
# aws_config for creds
# jaya_context to get access to the pipeline source and destination services
# event and context are AWS Lambda parameters
# Copy files in `event`
pass
The code piece p = s1 >> copy_lambda >> s2 will create bucket1 and bucket2 if they don’t exist. It will create/update CopyLambda
Deploy the pipeline
$ jaya deploy --file=./copy_pipeline.pyFor more details, see https://github.com/RAbraham/jaya
Closing
Hope you liked this! If you want to check out other similar stuff, check out my blog or my fledgling Youtube Channel