flowchart LR A[bucket1] --> B(CopyLambda) B --> C[bucket2]
Introduction
This is a high level introduction to the projects I have worked on that I’m proud of and things which excite me(or make me irritated enough to want to change).
=============================================================================
MerDB
MerDB is a dataframe library like pandas but
- is a minimal relational api to query data (like SQL but in Python)
- has Unix like pipes to compose operators using the
|
syntax - scales to multi core or a cluster(via Modin)
- processes data too big to fit into memory(via Modin)
- support interactive and optimized processing(optimizations in roadmap)
import pandas as pd
from merdb.interactive import *
def is_senior(row) -> bool:
return row['age'] > 35
def double_age(row) -> int:
return row["age"] * 2
# Test Data
= ["name", "age"]
cols = pd.DataFrame([
people_df "Raj", 35],
["Sona", 20],
["Abby", 70],
["Abba", 90],
[=cols)
], columns
# One can specify functions without any source data like quadruple age
# map is a merdb function
= map(double_age, "age") | map(double_age, "age")
quadruple_age
= (t(people_df) # convert people_df to a merdb table
result | where(is_senior)
| order_by("name", "asc")
| quadruple_age # Unix like pipe syntax making it easy to refactor out intermediate processing
| select("age")
| rename({"age": "new_age"})
)
# Convert to Pandas Dataframe and print
print(result.df())
# Output
new_age0 360
1 280
For more details: https://github.com/RAbraham/merdb
=============================================================================
Thampi
Thampi is a serverless ML serving system that runs on AWS Lambda.
Train and Save
For e.g. If you have a training script called train_and_save.py
import numpy as np
from sklearn import datasets
from typing import Dict
import thampi
from sklearn.neighbors import KNeighborsClassifier
class ThampiWrapper(thampi.Model):
def __init__(self, sklearn_model):
self.sklearn_model = sklearn_model
super().__init__()
def predict(self, args: Dict, context) -> Dict:
= [args.get('input')]
original_input = self.sklearn_model.predict(np.array(original_input))
result return dict(result=int(list(result)[0]))
def train_model():
= datasets.load_iris()
iris
...= KNeighborsClassifier()
knn
knn.fit(...)return ThampiWrapper(knn)
if __name__ == '__main__':
= train_model()
model 'iris-sklearn', './models') thampi.save(model,
On running
python train_and_save.py
train_model
trains the sklearn model asknn
and wraps aThampiWrapper
around it.thampi.save
will store theThampiWrapper
on the file system for now- When we upload this to AWS Lambda, the
predict
method inThampiWrapper
will be called and the inference takes place.
Serving the model
Now it’s time to deploy the model to AWS Lambda. All you have to provide is the requirements.txt file along with the above trained ./models/iris-sklearn directory.
--model_dir=./models/iris-sklearn --dependency_file=./requirements.txt thampi serve staging
Find the endpoint by
thampi info staging
You’ll see something similar to:
{'url': 'https://8i7a6qtlri.execute-api.us-east-1.amazonaws.com/staging/mymodel/predict'}
Predict
You can do a curl replacing a_url
with the url that you receive from above.
a_url = https://.../mymodel/predict
curl -d '{"data": {"input": [5.9, 3.2, 4.8, 1.8]}}' -H "Content-Type: application/json" -X POST $a_url
You’ll see output like
Output:
{
"properties": {
"instance_id": "9dbc56dd-936d-4dff-953c-8c22267ebe84",
"served_time_utc": "2018-09-06T22:03:09.247038",
"thampi_data_version": "0.1",
"trained_time_utc": "2018-09-06T22:03:04.886644"
},
"result": {
"result": 2
}
}
For details, refer to: https://rabraham.github.io/site/posts/thampi-introduction.html
=============================================================================
Mercylog
Mercylog is Datalog, a logic programming language, in Python.
Suppose you want to find the ancestors for a given child. The data is in a table with two columns parent
and child
in a table called family
.
The general code is:
- if X is the parent of Y, then X is an ancestor of Y
- if X is the parent of Y and Y is an ancestor of Z, then X is an ancestor of Z too. Do this recursively
For comparison, here is the SQL code.
SQL Code
WITH RECURSIVE Ancestors AS (
SELECT parent, child
FROM family
WHERE child = 'given_child' -- Replace 'given_child' with the specific child you are looking for.
UNION ALL
SELECT f.parent, f.child
FROM family f
INNER JOIN Ancestors a ON f.child = a.parent
)SELECT parent AS ancestor FROM Ancestors WHERE parent IS NOT NULL;
Datalog
X, Y) :- family(X, Y).
ancestor(X, Z) :- family(X, Y), ancestor(Y, Z). ancestor(
The query will be
A, 'given_child'). ancestor(
Mercylog
from mercylog import db, R, V, and_
from mercylog.df import row
= V.X
X = V.Y
Y = R.family
family = R.ancestor
ancestor
= [
rules # Make a family relation for a dataframe with columns 'parent' and 'child'
<< row(parent=X, child=Y),
family(X, Y)
# Actual rules
# ancestor(X, Y) :- family(X, Y).
# ancestor(X, Z) :- family(X, Y), ancestor(Y, Z).
<< family(X, Y),
ancestor(X, Y) << and_(family(X, Y), ancestor(Y, Z))
ancestor(X, Z)
]
= ancestor(X, "given_child")
query
= ... # some data
df = db(df)
d = d(rules + [query]) result
For details: https://github.com/RAbraham/mercylog
=============================================================================
Jaya
You want to build a pipeline where any files put in bucket1
triggers an AWS Lambda to copy it to bucket2
Jaya makes it easy to build such pipelines easily in Python without yaml or json like config.
NOTE: Code and commands elided to show the central idea
# copy_pipeline.py
from jaya import S3, Pipeline, AWSLambda
= 'CopyLambda'
lambda_name
# trigger notifications on object creation
= S3('bucket1',
s1 =[S3.event(S3.ALL_CREATED_OBJECTS, service_name=lambda_name)])
events
= AWSLambda(lambda_name,
copy_lambda
copy_handler,
...)
= S3('bucket2')
s2
# Like the diagram above
= s1 >> copy_lambda >> s2
p
= Pipeline("my-copy-pipeline", [p])
piper
def copy_handler(aws_config, jaya_context, event, context):
# aws_config for creds
# jaya_context to get access to the pipeline source and destination services
# event and context are AWS Lambda parameters
# Copy files in `event`
pass
The code piece p = s1 >> copy_lambda >> s2
will create bucket1
and bucket2
if they don’t exist. It will create/update CopyLambda
Deploy the pipeline
$ jaya deploy --file=./copy_pipeline.py
For more details, see https://github.com/RAbraham/jaya
Closing
Hope you liked this! If you want to check out other similar stuff, check out my blog or my fledgling Youtube Channel