
Shared Memory, Zero-Copy, and the Object Store
Deep dive into Ray's object store architecture: how shared memory enables zero-copy transfers, plasma store implementation, and patterns for efficient distributed data handling.
Here's a question that doesn't get asked enough: how does Ray pass a 10GB NumPy array between tasks without grinding to a halt?
Think about it. You've got Task A on Worker 1 that produces a large result. Task B on Worker 2 needs that result. In normal Python, you'd serialize it, send it over a socket, deserialize it on the other end. For 10GB, that's seconds of overhead—serialize, network transfer, deserialize. Do that thousands of times in a pipeline, and your distributed system spends more time moving data than computing.
Ray doesn't do that. When I first saw it working, I assumed magic—some clever compression, maybe streaming serialization. The dashboard showed tasks completing in milliseconds while passing gigabytes between them. How?
The answer isn't magic. It's shared memory—but not the way most people think.
At Mechademy, we set out to build an Automated Machine Learning Lifecycle system. The vision was ambitious: data scientists should define training workflows once, and our platform would handle the distributed execution—feature engineering across thousands of turbine configurations, parallel hyperparameter tuning, model training on datasets too large for single machines.
XGBoost on 50GB per turbine was the test case. If we could make that fast, we could handle anything.
Ray was the obvious choice for the distributed runtime. The API was clean, the architecture made sense, and the object store promised exactly what we needed: efficient data sharing across the cluster without the serialization nightmare that plagued our earlier attempts with Celery.
But there was a learning curve. When we first scaled to production data—50GB datasets, thousands of features computed in parallel—training times didn't match our expectations. Tasks showed "PENDING_ARGS" for seconds before execution. The dashboard revealed tasks spending more time waiting than computing.
The issue wasn't Ray—it was how we were using it. Here's the kind of pattern that causes problems:
# Pattern that causes serialization overhead import numpy as np large_array = np.zeros((10000, 10000)) # ~800MB @ray.remote def process_chunk(data, chunk_id): return data[chunk_id].sum() # Ray serializes large_array for EVERY task # Result: 20 × 800MB = 16GB of serialization overhead results = [process_chunk.remote(large_array, i) for i in range(20)]
Passing large data directly to remote functions means Ray serializes it for every single task. Twenty tasks, twenty copies, twenty serializations. The object store exists—designed exactly for this—but this pattern doesn't leverage it.
The breakthrough was understanding the object store's actual model: store once with ray.put(), reference everywhere with ObjectRefs.
# Pattern that uses object store correctly import numpy as np large_array = np.zeros((10000, 10000)) # ~800MB array_ref = ray.put(large_array) # Stored ONCE in object store @ray.remote def process_chunk(data_ref, chunk_id): # Ray automatically retrieves from object store data = ray.get(data_ref) return data[chunk_id].sum() # array_ref is a lightweight reference, not the data itself # Ray handles data access efficiently results = [process_chunk.remote(array_ref, i) for i in range(20)]
Same cluster, same code structure—but 10× faster because we eliminated serialization overhead. One ray.put(), thousands of tasks, single copy in memory.
That insight became foundational for our ML platform. The object store isn't just "where results live"—it's the infrastructure that makes distributed data processing practical. It's why passing large objects between tasks is cheap. It's why actors can serve model predictions without reloading weights. It's why data pipelines don't collapse under their own serialization overhead.
But only if you use it correctly: store once with ray.put(), reference everywhere with ObjectRefs.
This is what Part 5 is about—not just what the object store is, but how it works and why understanding that architecture matters when you're building systems at scale.
Per-Node Shared Memory
The object store isn't one giant pool of shared memory across your cluster. That's the first misconception to clear up.
Instead, Ray creates a separate object store on each node. When you start Ray on a machine, one of the processes that launches is the Plasma store—a local shared memory region carved out from /dev/shm on Linux (or /tmp on macOS).
Think back to the office analogy from Part 2. The object store is the file room in each regional office. Workers in that office can access files instantly—it's just shared memory. But workers in another office? They need the data transferred first.
Where Do Objects Actually Live?
When you call ray.put(), the object is stored on the node where that code runs—usually your driver (the head node).
Here's the key insight from the Ray docs: "When data is put into the object store, it does not get automatically broadcasted to other nodes. Data remains local to the writer until requested by another task or actor on another node."
import ray import numpy as np ray.init() # This stores data on YOUR current node (likely the driver) data = np.zeros((5000, 5000)) data_ref = ray.put(data) @ray.remote def process(data_ref): # Same node as data_ref: instant shared memory access # Different node: Ray transfers data first, then shared memory access local_data = ray.get(data_ref) return local_data.sum() result = ray.get(process.remote(data_ref))
What Control Do You Have?
You don't directly control where objects are stored—there's no ray.put(data, node_id="xyz") API. But you have indirect control:
Task returns: When a remote task returns a value, that result is stored on the node where the task executed—not the driver.
@ray.remote def load_data_on_worker(): # Data is created and stored on whatever node runs this task return np.zeros((5000, 5000)) # The returned object lives on the worker node, not the driver data_ref = load_data_on_worker.remote()
Locality scheduling: Ray's scheduler prefers to place tasks on nodes where their input data already exists—avoiding transfers when possible.
Placement groups: For advanced control, you can use placement groups to ensure tasks run on specific nodes, meaning their outputs end up in those nodes' object stores.
The Practical Mental Model
- Objects live where they're created
- Task returns live where the task ran
- Ray moves data lazily—only when a task actually needs it
- Locality scheduling reduces movement—Ray prefers to bring compute to data
At Mechademy, this clicked when we restructured our feature engineering pipeline. Instead of loading data on the driver and shipping it everywhere, we created loader tasks that ran on each worker node. Data-heavy loading stayed local. Only the smaller feature vectors moved across the network.
Zero-Copy Reads and Immutability
Here's where the object store gets clever.
When multiple workers on the same node need the same data, they don't each get a copy. They all read from the exact same bytes in shared memory—no serialization, no copying. This is zero-copy deserialization.
But there's a catch: for this to work safely, objects must be immutable. Once you put something in, it can never change.
Why Immutability?
Imagine two workers reading the same NumPy array. Worker A is summing values while Worker B computes a mean. If Worker A could modify the array mid-computation, Worker B would get corrupted results.
Immutability eliminates this. When you call ray.put(), Ray writes your object to shared memory and seals it—read-only forever. Any worker can safely read it because no one can change it.
Zero-Copy in Practice
The magic happens with NumPy arrays. When Ray deserializes a NumPy array from the object store, it doesn't copy the data. Instead, it creates a NumPy array object pointing directly to shared memory.
import ray import numpy as np ray.init() large_array = np.ones((10000, 10000)) # ~800MB array_ref = ray.put(large_array) @ray.remote def check_memory_address(arr_ref): arr = ray.get(arr_ref) return arr.ctypes.data # Memory address of underlying data # Multiple tasks all see the same memory address (on the same node) addresses = ray.get([check_memory_address.remote(array_ref) for _ in range(5)]) print(set(addresses)) # All the same - zero-copy confirmed
The Read-Only Gotcha
Because objects are immutable, NumPy arrays from the object store are read-only:
@ray.remote def try_to_modify(arr_ref): arr = ray.get(arr_ref) arr[0, 0] = 999 # Raises: ValueError: assignment destination is read-only # Fix: make a copy if you need to modify @ray.remote def modify_safely(arr_ref): arr = ray.get(arr_ref).copy() # Now you own this copy arr[0, 0] = 999 # Works fine return arr
What Gets Zero-Copy Treatment?
Not everything benefits from zero-copy. Ray optimizes specifically for:
- NumPy arrays: The primary use case. Arrays are stored in Arrow-compatible format and accessed directly from shared memory.
- Collections of NumPy arrays: Lists or dicts containing arrays maintain zero-copy for the array contents.
- Arrow tables: Used heavily by Ray Data for tabular data.
Regular Python objects—dicts, lists of strings, custom classes—still get serialized and deserialized normally. They go through pickle, and each ray.get() creates a new Python object. Still faster than network transfer, but not zero-copy.
A note on pandas: If you're working with pandas DataFrames (as most ML engineers do), they don't get zero-copy treatment directly. DataFrames go through pickle serialization. However, the underlying NumPy arrays inside the DataFrame do benefit from zero-copy when you access them via .values or .to_numpy(). If you're passing large DataFrames frequently, consider extracting the NumPy arrays for the hot path and reconstructing the DataFrame only when needed.
# Zero-copy: NumPy arrays arr_ref = ray.put(np.zeros((1000, 1000))) arr = ray.get(arr_ref) # Points to shared memory # Not zero-copy: pandas DataFrame df_ref = ray.put(pd.DataFrame({"a": range(1000)})) df = ray.get(df_ref) # Deserialized copy # Workaround for large DataFrames: extract NumPy arrays values_ref = ray.put(df.values) # The underlying array gets zero-copy
The rule of thumb: structure your hot path data as NumPy arrays. That's where zero-copy shines.
Memory Management
The object store has a fixed size—by default, about 30% of your system's RAM. What happens when you try to put more data in than fits?
Reference Counting
Ray uses distributed reference counting. As long as any ObjectRef points to an object, it stays in memory—it's "pinned."
arr_ref = ray.put(np.zeros((5000, 5000))) # Pinned # When arr_ref goes out of scope or you delete it, object becomes evictable del arr_ref # Now Ray can reclaim this memory if needed
Key insight: deleting an ObjectRef doesn't immediately delete the object. It just makes it eligible for eviction when Ray needs space.
Eviction and Spilling
When the object store fills up:
-
Eviction: Ray removes unpinned objects (LRU policy). If you access an evicted object later, Ray may reconstruct evicted objects by re-executing their lineage, but this is not guaranteed. In production pipelines, assume evicted objects are gone unless you explicitly spill to disk or persist results externally.
-
Spilling: Ray can write objects to disk instead of evicting them. Slower than memory, but faster than reconstruction.
# Configure spilling ray.init( object_store_memory=2 * 1024 * 1024 * 1024, # 2GB _system_config={ "object_spilling_config": '{"type": "filesystem", "params": {"directory_path": "/tmp/ray_spill"}}' } )
Monitor with ray memory --stats-only to see spill rates. Lots of spilling means your object store is undersized.
Sizing the Object Store
# Increase object store size ray.init(object_store_memory=32 * 1024 * 1024 * 1024) # 32GB
How much do you need? Enough for one "stage" of your pipeline's intermediate data. At Mechademy, we size based on our largest intermediate result—for XGBoost training, that's the feature matrix (20-30GB), so we set object store to 40GB.
Common Issues
ObjectStoreFullError: Everything is pinned, nothing can be evicted. Fix: don't hold references you no longer need.
Spilling thrashing: Constantly spilling and restoring the same objects. Fix: increase object store size or restructure your pipeline for a smaller working set.
Object Transfers Between Nodes
What happens when a task on Node B needs data that lives on Node A?
Ray handles this transparently. When you call ray.get(some_ref) and the object isn't local:
- Ray checks where the object lives (tracked in metadata)
- Your node's ObjectManager requests the data from the remote node
- Data streams over the network
- The object is written to your local object store
- Now it's local—you get zero-copy access (for NumPy arrays)
After transfer, the object exists in both stores. Future ray.get() calls on Node B are instant.
@ray.remote def create_on_worker(): return np.zeros((5000, 5000)) # Stored on this worker's node @ray.remote def process(data_ref): data = ray.get(data_ref) # Transfer happens here if needed return data.sum() data_ref = create_on_worker.remote() result = ray.get(process.remote(data_ref))
Key insight: Ray transfers once per node, not once per task. If 100 tasks on Node B all need the same data from Node A, there's only one transfer—then all 100 tasks read from Node B's local store.
For very large objects, transfer time can dominate. A 10GB array over a 10Gbps network takes ~8 seconds just for transfer. If your task does 1 second of compute, you're spending 89% of time moving data.
This is why locality matters. The best transfer is no transfer at all.
Practical Patterns
Three patterns that matter most in production:
Pattern 1: Store Once, Reference Everywhere
model = load_trained_model() model_ref = ray.put(model) @ray.remote def predict(model_ref, batch): model = ray.get(model_ref) return model.predict(batch) # 1000 prediction tasks, 1 copy of the model futures = [predict.remote(model_ref, batch) for batch in batches]
Pattern 2: Create Data Where It's Needed
@ray.remote def load_partition(partition_id): return load_from_storage(f"s3://bucket/partition_{partition_id}.parquet") @ray.remote def process_partition(data_ref): data = ray.get(data_ref) # Likely local - no transfer return transform(data) # Each partition loads and processes locally partition_refs = [load_partition.remote(i) for i in range(num_partitions)] result_refs = [process_partition.remote(ref) for ref in partition_refs]
Pattern 3: Batch Your ray.get() Calls
# Slow: Sequential fetches results = [ray.get(ref) for ref in refs] # Fast: Parallel fetches results = ray.get(refs)
What's Next
You now understand Ray's object store—not as a black box, but as per-node shared memory with specific behaviors and tradeoffs.
The key insights:
- Per-node stores: Each node has its own object store, not one shared pool
- Indirect placement: Objects live where they're created; use task placement to control this
- Zero-copy for NumPy: Immutable objects enable safe shared memory access
- Memory management: Reference counting, eviction, and spilling keep things running
- Locality matters: The best transfer is no transfer at all
Coming in Part 6: We've mentioned the GCS throughout this series—it tracks where objects live, which nodes are healthy, what tasks are running. Part 6 dives into Ray's coordination layer.
Try It Yourself
import ray import numpy as np ray.init() @ray.remote def create_array(): return np.random.rand(1000, 1000) @ray.remote def sum_array(arr_ref): arr = ray.get(arr_ref) return arr.sum(), ray.get_runtime_context().get_node_id() data_ref = create_array.remote() # See which node processes it results = ray.get([sum_array.remote(data_ref) for _ in range(5)]) for total, node_id in results: print(f"Sum: {total:.2f}, Node: {node_id[:8]}...")
Understanding the object store changes how you write distributed code. You stop thinking about "passing data to tasks" and start thinking about "where data lives and where compute runs."
See you in Part 6.