Published in · 6 min read · Mar 24, 2020
--
March 25: Updated the discussion of the
timeout
option forray.get()
. See, in particular, the comparison ofray.get()
andray.wait()
at the end of this post.
In the first part of this series, I listed the most commonly-used API features for Ray, then explored how ray.wait()
works.
In this installment, I’ll explore using ray.get(object_ids)
effectively. The ray.get()
documentation provides a good explanation of this method and how to use it. You pass the id of a remote object or a list of them.
When you callray.get()
, it blocks until the corresponding object is available or all the objects in the list are available. It returns the objects.
In more detail, it blocks until the object corresponding to the object ID is available in Ray’s local object store (the objects cached in memory with Plasma). If necessary, the object will be copied from an object store on another node. In either case, the computation has to finish first so the object is available. When object_ids
is a list, then the objects corresponding to each object ID in the list will be returned. Hence, the method blocks for all of them.
Note: Ray uses “zero-copy” access to the objects in the object store. Any task (or Ray actor) that reads an object will read the one in the cache, not pay the overhead of copying that object to the local process memory.
There is an optional timeout
field that defaults to None
, meaning no timeout. If you specify a value and the timeout is hit before the objects are available locally, then RayTimeoutError
is raised. However, this option may be removed in a future version of Ray, to simplify the API. Later, I’ll discuss the alternative approach you should use instead.
AnException
is raised if the task that created the object or one of the objects in the list raised an exception.
Now let’s discuss tips and tricks.
Don’t call ray.get() inside an async context
A warning is issued if ray.get(object_id)
is called inside an async context. Use await object_id
instead. For a list of object ids, use await asyncio.gather(*object_ids)
.
Minimizing blocking when calling ray.get()
Let’s adapt the example code from Part 1. This time, we’ll start by defining a busy
function that isn’t a Ray remote task.
We don’t need to initialize Ray yet, but we used the previous example with the single change that busy
is not a Ray task. Calling ray.init()
is harmless here.
Before using Ray, let’s see how this function performs. We will time five sequential invocations of this function. The output is shown as a :
duration: 5.018 seconds
0: 1.0
1: 1.0
2: 1.0
3: 1.0
4: 1.0
Your times should be similar. The loop output isn’t all that interesting, but the duration
shows us that we waited five seconds for the loop to finish. Since each call to busy
sleeps for one second, the five second duration should make sense.
Now let’s wrap busy
in a Ray task:
We can just wrap the normal Python function busy
in a new function that defined to be a Ray task.
As usual, we invoke rbusy
using rbusy.remote(...)
and use ray.get()
to fetch the result, so let’s just do that in the loop:
The results are disappointing:
duration: 5.034 seconds
0: 1.0 (time: 1.007 seconds)
1: 1.0 (time: 2.015 seconds)
2: 1.0 (time: 3.022 seconds)
3: 1.0 (time: 4.026 seconds)
4: 1.0 (time: 5.034 seconds)
They are no better than before. Now we tracked how long each loop iteration took to run, about one second (the time
values, which are cumulative).
The problem is that we immediately called ray.get()
on each object ID returned by arbusy.remote()
call. This effectively blocked progress, because we waited for each remote call to finish before spawning the next task.
Instead, we need fire off the rbusy
invocations, which will run in parallel because rbusy.remote()
is nonblocking. Then, we can call ray.get()
to retrieve the objects, blocking if necessary.
We could call ray.get(ids)
for all the ids at once, but because we’re going to grab some extra information in our list, i.e., times, we’ll just call ray.get()
on each id, one at a time:
Here’s the output, which is a bit busy, so we’ll walk through it carefully:
duration1: 1.892 MILLISECONDS
0: ObjectID(e725323c902523f0ffffffff010000c801000000) (time: 0.602 MILLISECONDS)
1: ObjectID(b63236165db0eea7ffffffff010000c801000000) (time: 1.053 MILLISECONDS)
2: ObjectID(53a5a3a325aa2675ffffffff010000c801000000) (time: 1.332 MILLISECONDS)
3: ObjectID(04df2a355e22995fffffffff010000c801000000) (time: 1.546 MILLISECONDS)
4: ObjectID(a438aa430f188597ffffffff010000c801000000) (time: 1.745 MILLISECONDS)
0: 1.0 (time: 1001.139 MILLISECONDS)
1: 1.0 (time: 1001.476 MILLISECONDS)
2: 1.0 (time: 1004.851 MILLISECONDS)
3: 1.0 (time: 1005.418 MILLISECONDS)
4: 1.0 (time: 1005.659 MILLISECONDS)
duration2: 1.006 seconds
The duration1
value shows that it takes about 2 milliseconds to process the whole first loop, because the calls to rbusy.remote()
are nonblocking.
We then print out the object ids and the cumulative time it took for those five rbusy.remote()
calls. The last value, for pass “4”, shows a time that’s a bit less than the total duration1
time, as we would expect.
The times for the second loop are interesting. It takes just over one second (1001 milliseconds) for the first iteration through the loop to complete. Why? It calls ray.get()
, so it has to wait for the first task to finish. But then the rest of the loop iterations go very quickly. Each one takes a fraction of a millisecond. Why is that?? Recall that we’re running the five tasks in parallel, so when the first one finishes, the other four finish at about the same time, so for the rest of the loop iterations, each call to ray.get()
doesn’t wait very long, if at all, before the object is ready. If you watch this run, you’ll notice a short pause after the last ObjectID
line, then the rest of the output “bursts” out.
This is the best we can do for this contrived example. Our tasks ran in parallel, but when we needed the results, we had to wait for them to finish. If we had some other busy work to do and then called ray.get()
later, we might have seen no pauses at all.
Ray.get() vs. ray.wait()
Let’s discuss two considerations where ray.wait()
should be used instead of ray.get()
.
First, we used a simple example where all tasks took the same amount of time. What if we had very different task durations and what if the longest running task were at the beginning of our list? In this case, we would block waiting for it to finish, even though other tasks would finish sooner and their objects would be ready to process.
This is why you should consider using ray.wait()
for real-world applications when you need to wait on multiple running tasks. It will return as soon as results are available, subject to the options you provide when calling it, as we discussed in Part 1. Hence, you can process available results while longer-running tasks continue to run asynchronously.
Second, I mentioned above that the timeout
field for ray.get()
may be removed in a subsequent release of Ray. Going forward, ray.wait()
will be the API call of choice for this blocking operation when you want to specify a timeout.
In general, you should really avoid making blocking calls in production distributed systems without specifying timeout values. Otherwise, the situation can sometimes occur where an application freezes waiting for a blocking call to complete that may never complete for some reason. It’s better to keep the application “live” with a timeout and appropriate handling of the corresponding error.
Hence, for these two reasons, you should strongly consider using ray.wait(),
with a timeout value, in production code and limit the use of ray.get()
, except when you only need one task result and you’re confident that having no timeout is “safe”.
See Part 1 for more details about ray.wait()
.
If you have any questions or thoughts about Ray, you can join our community through Discourse or Slack. If you would like to see how Ray is being used throughout industry, please consider attending or watching Ray Summit.