Raya. Fast, Easy ML development on Ray

Raya. A thin wrapper over Ray for faster ML development
markdown
Published

May 2, 2023

Problem

Data Scientists need to play with large models during development. Some challenges are:

  • Conflicting Dependencies: In applications with multiple models, one model may need Tensorflow 1.x (TF1) and another Tensorflow 2.x (TF2).
  • Long Load Time: These models take time to load and it’s slow to make a small change and run a script every time even if the model hasn’t changed.
  • Scalable and Distributed. Even for development, sometimes a laptop is too small to host and run multiple models. It would be convenient to be able to quickly iterate over code but run them on a remote powerful machine(with GPUs) or remote cluster created for experimentation.
  • Ease of Use: The gold standard would be to be able to create a Python class and use that in a way to solve all problems mentioned above.
  • Caching: Often models are ‘pure’. i.e. the same output for the same input. But naively, we still send prediction requests to the model and have to wait for the output which is time consuming. It would be great to use a cache and make that work in the solution that I propose

Existing Solutions

The existing solution is to create an http endpoint within a docker container. But these have a learning curve and most data scientists are new to it.

My Experiment

I’m building raya(to make into a library soon if there is interest) to explore a dev experience with the help of Ray and Ray’s ‘Runtime Environments’ that’s easy for data scientists

Ray

Ray is a distributed framework for building ML applications. The power of Ray is that the same code can run efficiently on a laptop(multi-core) but also scale to huge clusters just by changing an environment variable.

One Ray concept is of a Ray Actor. For me, this is an simple alternative to creating a service endpoint(see Caveat: Ray Serve). For more details, refer to the Actor documentation but a short description is in the code is below. Adapting from their site

import ray
@ray.remote # Specifies that this is a Ray Actor 
class Counter(object):
    def __init__(self):
        self.value = 0

    def increment(self, step):
        self.value += step
        return self.value

    def get_counter(self):
        return self.value

# Create an actor object from this class.
# When run on a distributed cluster, this object could be anywhere
# on the cluster, perhaps on a more powerful server.
# The distributed complexity is hidden from us. Note the `remote`
counter = Counter.remote()

# Call the actor. Compared to our typical Python objects,
# all calls to the distibuted actor object 'counter' are
# asynchronous, i.e. non-blocking. 
# This is great for ML applications as often some operations can
# take a lot of time and we can proceed with 'other work'.
# `obj_ref` is a reference to the future output 
# of the computation `counter.increment` 
# which we can hold on to until we actually need the value
obj_ref = counter.increment.remote(step=2)

do_other_work() # dummy method. not implemented 


# Ok, now we are ready and we want the output of `counter.increment`
print(ray.get(obj_ref)) # 2

Ray Clusters:

Normally clusters are a remote set of machines. To run code on a remote cluster, the code has to be copied and sent to the Ray cluster in form of Ray jobs. Some of the info we need to provide are:

  • the code folder
  • requirements (i.e. installable by pip)
  • (optional) environment variables

These information can be provided to a Ray Job as Ray’s ‘Runtime Environment’(docs) . We can create multiple isolated environments on the Ray cluster. e.g. one for TF1 and one for TF2. raya attempts to be a thin opinionated wrapper over it.

Now, for running the Ray cluster on your laptop, the same concepts apply i.e. of packaging up the code folder and requirements etc. This in imho, is slightly slower than just setting up multiple virtual environments on your laptop with python -m venv venv_TF1 and python -m venv venv_TF2 for e.g. and trying to run different models but the multiple virtual environment locally idea does not work with Ray. But the Ray team has done some splendid engineering to only reload the changes on to the Ray cluster, so this may be good for practical purposes. See ‘Caching for speed’ here

For raya, we focus on providing a separate runtime environment per Actor only

Demo

I’ll make a library soon but for now you can clone a demo repo. Also, I’m using the latest Python version which does not support Tensorflow 1.x so I’m simulating it by just using different 2.x versions. So TF1 is 2.10.0 and TF2 is 2.12.0

export DEV_DIR=/path/to/your/folder
cd $DEV_DIR
git clone https://github.com/RAbraham/raya-trial
cd raya-trial
python3 -m venv venv &&  source venv/bin/activate && pip install -r requirements.txt 
# start ray cluster locally. Only once required
ray stop; ray start --head --disable-usage-stats  

You’ll notice the following sub folders tf1 and tf2. They could also have been different repos if required. There is ‘raya’ which will be made into a library…. soon :).

Let’s start with tf2, a simpler version first:

# tf2/actor.py

import raya
import tensorflow as tf

class TF2Actor(raya.Actor):
    def __init__(self):
        print("================= In TF2 init ====================================")

    def do(self, name):

        print("================= In TF2 do ====================================")
        return f"TF:{tf.__version__}: {name}"

Above, one inherits from raya.Actor which does some admin work.

tf2 requirements.txt

tensorflow==2.12.0

Let’s deploy this actor. We use a convenient cli tool called invoke to run my scripts

invoke actor-deploy --class-path=$DEV_DIR/raya-trial/tf2/actor.py:TF2Actor --requirements=$DEV_DIR/raya-trial/tf2/requirements.txt

class-path has the format </path/to/file.py>:<ClassName> so that raya can dynamically load a simple Python class as a Ray Object. requirements allows us to create this actor in it’s own virtual environment on the Ray cluster.

# trial_tf2.py

import ray

ray.init(namespace="serve")

a2 = ray.get_actor(namespace="serve", name="TF2Actor")

ref = a2.do.remote(name="Rajiv")
result = ray.get(ref)
print(result) # TF:2.12.0: Rajiv

By default, the actor is deployed to namespace serve. This is because these actors are normally used in conjunction with the Ray Serve http framework. Ray Serve acts as an external endpoint which forwards the requests to our actors in the distributed cluster. I think serve namespace is hardcoded inside the framework so I use that as the default. This can be changed in actor-deploy

The actor is accessed by name TF2Actor i.e. the class name anywhere in the distributed Ray cluster :). It’s

  • isolated in it’s own environment so no worry of conflicting dependencies,
  • long running as long as the cluster is up. Even if trial_tf2.py finishes execution, TF2Actor is still running on your cluster.
  • scalable if deployed to a remote cluster with minimal effort
  • and imo, easy to use :)

Now, I hope you see why I use it as a simple alternative to http service endpoints. I don’t need to create endpoints for every component of code that needs process isolation and long life. I can just create Named Actors. name is customizable at actor-deploy

Next actor is at folder tf1:

# tf1/actor.py
from pathlib import Path
import tensorflow as tf
import raya

class TF1Actor(raya.Actor):
    def __init__(self, folder):
        weights = Path(folder) / "weights.txt"
        print(weights.read_text())
        print("================= In TF1 init ====================================")

    def act(self, name):
        print("================= In TF1 act ====================================")
        return f"Version:{tf.__version__}: {name}"

In this variant, __init__ is passed a folder. One can use it to pass data like model weights, configuration files and other data files. Here I’m passing some dummy weights as an example.

tf1 requirements are different from tf2 (ok, slightly different but just to prove a point)

tensorflow==2.10.0

Now, let’s deploy this code. note the passing of folder.

invoke actor-deploy --class-path=$DEV_DIR/raya-trial/tf1/actor.py:TF1Actor --folder=$DEV_DIR/raya-trial/data --requirements=$DEV_DIR/raya-trial/tf1/requirements.txt"

Now we are ready for our experimentation. Try keeping actors like TF1Actor and TF2Actor as thin wrappers over model weights as they rarely change. That way, we don’t need to redeploy them very often.

We can then experiment over them like in trial.py

# trial.py

import ray

ray.init(namespace="serve")

a1 = ray.get_actor(namespace="serve", name="TF1Actor")
a2 = ray.get_actor(namespace="serve", name="TF2Actor")

ref = a1.act.remote(name="Rajiv")
result = ray.get(ref)
print(result)

ref = a2.do.remote(name="Rajiv")
result = ray.get(ref)
print(result)

If we run it:

"python $DEV_DIR/raya-trial/trial.py"  

You should see

TF:2.10.0: Rajiv
TF:2.12.0: Rajiv

I enjoyed making this :) (well, except for the one whole day I had to debug a crazy timing bug)

Caching

Even if we have these models up as actors and they are long running, they mostly will have the same prediction for the same input. Since model predictions on CPUs can take seconds, why not cache the output? Only pylru seems to work in a Ray Actor. So..

# caching_model/actor.py

import raya
from pylru import lrudecorator # <--------------
import time

class CachingModelActor(raya.Actor):
    def __init__(self):
        print("================= In Caching Model init ====================================")

    @lrudecorator(100) # <-------------------
    def act(self, name):
        time.sleep(5)
        print("================= In Caching Do ====================================")
        return f"Hi {name}"

I put a sleep for 5 seconds to simulate a slow model. My trial file is:

# trial_caching.py
import ray
import time

ray.init(namespace="serve")

a = ray.get_actor(namespace="serve", name="CachingModelActor")

st = time.time()
ref = a.act.remote(name="Rajiv1")
result = ray.get(ref)
print(result)
print(f"Time1:{time.time() - st}")

st = time.time()
ref = a.act.remote(name="Rajiv2")
result = ray.get(ref)
print(result)
print(f"Time2:{time.time() - st}")

# This should return quickly
st = time.time()
ref = a.act.remote(name="Rajiv1")
result = ray.get(ref)
print(result)
print(f"Time1:{time.time() - st}")

Running "python $DEV_DIR/raya-trial/trial_caching.py gives


Hi Rajiv1
Time1:5.054863452911377 # <---------- returned in 5 secs
Hi Rajiv2
Time2:5.024395227432251
Hi Rajiv1
Time1:0.0022411346435546875 # <--------------- returned in 0 secs

There are additional small features like copy_env_vars which will copy your local environment variables to the Ray Cluster

Notes

  • For production, Ray does not recommend using Runtime Environments(source). It’s probably because it will pip install the libraries on first load which can take a few seconds. They recommend putting it all in a docker image so it’s fast to load. So that means, when it actually comes to moving to production and you have models with conflicting dependencies, you’ll have to create separate Ray clusters. I think this is easiest with Kubernetes. However, we currently use the above approach in production as we have retries on the client side and rolling upgrades.

Caveats

  • In the current implementation of raya, the method arguments have to be explicitly mentioned. e.g. like name in args. ref = a2.do.remote(name="Rajiv"). args. ref = a2.do.remote("Rajiv") won’t work

  • Ray Serve: Though I used named actors as an alternative to an distrbuted endpoint, Ray has it’s own endpoint framework called Ray Serve. It’s feature rich and in production, that’s probably what one should use. However, it’s not strictly a conflict between Ray Serve and Named Actors. The way I differentiate right now is that if I want to expose an endpoint to external systems(both external and internal to the company not runninng on the same Ray cluster), I’ll use Ray Serve. But within a cluster, I’d like to explore named actors. However, I feel that at some point, I’ll have to move from named actors to serve components internally to a cluster too, mostly for rolling upgrades (or someone writes rolling upgrades for actors please!). So is this work all lost? I hope not, because one experiment I’d like to try is to transparently create a Ray Serve Object automatically from raya.Actor just like I have created a Ray named actor transparently right now.

  • Ray runtime environments don’t give complete isolation unlike Docker. If we have conflicting binaries, then we have a problem. One workaround is to pack the binaries in a bin folder local to the project and use that in our ray code but I’m not sure if all OS dependencies can be handled that way. My number one wish list is to be able to run each job/named actor in it’s own container while still accessible in a ray cluster like conventional Ray Actors. Ant Group have done this for their internal Ray code in Java and I can’t wait to see it land for Python in Ray.

Hope you liked this experiment! If you want to check out other similar stuff, check out my blog or my fledgling Youtube Channel