
Production Ray: Patterns, Mistakes, and Lessons
The final part of our Ray deep dive: five production patterns that work, five mistakes that hurt, and a debugging playbook for distributed ML systems.
Seven posts ago, Ray was magic.
We knew the APIs—@ray.remote, ray.get(), maybe actors if we were feeling brave. But when something broke, we guessed. When scaling failed, we threw resources at it. When performance lagged, we shrugged.
Today, debugging is different. Dashboard shows workers idle? Check the GCS logs for coordination bottlenecks (Part 6). Tasks stuck in PENDING_ARGS? Check object store capacity and locality (Parts 4 and 5). Training slow despite resources? Check if we're hitting serialization overhead or event storms (Part 4).
The difference isn't that we're smarter. It's that we understand the system.
I spent months building this mental model—reading source code, debugging production failures at Mechademy, tracing tasks through logs at 2am. The journey from Part 1 (why Ray exists) through Part 2 (the runtime), Part 3 (execution models), Part 4 (scheduling), Part 5 (object store), Part 6 (coordination), and Part 7 (fault tolerance) wasn't about memorizing internals. It was about building intuition.
This final post isn't about one more Ray feature. It's about patterns—the architectures that work, the mistakes that hurt, and the lessons that stick. Think of it as the field manual that comes after the training course.
Everything here comes from running Ray in production. Not toy examples. Not benchmarks on synthetic data. Real ML pipelines, real failures, real fixes.
TL;DR: Five production patterns (store once/reference everywhere, actors orchestrate/tasks execute, create data where it's needed, batch your ray.get() calls, right-size resources). Five common mistakes with symptoms and fixes. A debugging playbook for when things go sideways. And a reflection on what seven posts of digging into Ray's internals actually taught us.
The ML Lifecycle Architecture
Before the patterns, let's talk about what we actually run. This is the production architecture at Mechademy—the thing all seven previous posts were building toward.
Our system trains thousands of custom turbomachinery models. Each turbine gets its own XGBoost model, trained on 50GB of sensor data. Data security constraints mean we can't share data between clients. Scale means we can't train sequentially.
Here's the architecture:
Lightweight head node (coordination only): Always on, cheap. Runs the GCS (executive office from Part 2), the dashboard, and the Ray Jobs API. Costs roughly $50/month on a small AWS instance. Zero task execution happens here.
# Head node: coordination only, no task execution ray start --head --num-cpus=0 --num-gpus=0 # Workers autoscale from 0 to N based on demand
Setting num_cpus=0 is the key—we covered this pattern in Part 4. The head node is a regional manager with no employees. It coordinates, but it doesn't do the work.
Autoscaling workers via KubeRay: GPU and CPU workers spin up on-demand through Kubernetes. When a training job arrives, KubeRay provisions the nodes. When training finishes, workers scale back to zero. We pay for compute only when compute is happening.
Job submission via Ray Jobs API: Our orchestration platform submits jobs to the head node. No SSH, no direct cluster access. Jobs carry their own dependencies and configuration.
Monitoring through Ray Dashboard: Real-time visibility into task execution, resource utilization, object store pressure, and worker health.
Why this architecture? Three reasons.
Cost: Idle workers cost nothing because they don't exist. The always-on head node is tiny. We went from spending thousands on persistent GPU clusters to paying per-training-minute.
Separation of concerns: The head node handles coordination. Workers handle computation. When we need to debug scheduling, we look at head node logs. When we need to debug execution, we look at worker logs. Clean boundaries.
Resilience: If a worker dies mid-training, Part 7's fault tolerance mechanisms kick in—task retries, lineage reconstruction, actor restarts. The head node stays up, so the cluster doesn't lose its brain.
This is the production version of what we've been building toward across the series. Every component maps to something we've studied: GCS coordination, Raylet scheduling, object store data management, fault tolerance recovery.
Five Patterns That Work
These aren't theoretical. Each one solved a real problem in our pipeline.
Pattern 1: Store Once, Reference Everywhere
The single most impactful optimization we made. Covered in depth in Part 5, but worth repeating because I still see teams getting this wrong.
# Bad: serializes data for every task results = [process.remote(large_data) for _ in range(100)] # Good: one copy, 100 references data_ref = ray.put(large_data) results = [process.remote(data_ref) for _ in range(100)]
The first version serializes large_data 100 times. For a 500MB dataset, that's 50GB of serialization overhead. The second version serializes once into the object store and passes a lightweight reference. Same result, fraction of the cost.
Pattern 2: Actors Orchestrate, Tasks Execute
This confused me at first. I'd create an actor and do all the work inside actor methods—sequentially. The actor became a bottleneck because actor methods execute one at a time.
The fix: use the actor for state management and dispatch parallel tasks for the heavy lifting.
@ray.remote class TrainingOrchestrator: def __init__(self): self.results = {} def run_pipeline(self, configs): # Actor dispatches parallel tasks refs = [train_model.remote(cfg) for cfg in configs] # Actor collects and stores results self.results = dict(zip( [c["model_id"] for c in configs], ray.get(refs) )) return len(self.results) @ray.remote def train_model(config): # Stateless, parallel execution return {"model_id": config["model_id"], "accuracy": 0.95}
The actor holds state (Part 3). Tasks do the parallel work. Best of both worlds.
Pattern 3: Create Data Where It's Needed
When we first built our pipeline, the driver loaded all the data and shipped it to workers. For 50GB datasets, that meant the head node became a data bottleneck—everything funneled through one machine's network interface.
The fix: load data on the workers themselves.
# Bad: driver loads data, ships to workers data = load_from_s3("s3://bucket/turbine_data.parquet") data_ref = ray.put(data) # Stored on driver node results = [process.remote(data_ref) for _ in range(20)] # 20 network transfers from driver to workers # Good: workers load their own data @ray.remote def load_and_process(partition_path): data = load_from_s3(partition_path) # Loaded locally return process(data) # No network transfer needed results = [load_and_process.remote(f"s3://bucket/part_{i}.parquet") for i in range(20)]
Data stays where it's created. Compute doesn't wait for transfers. This is the locality principle from Part 5 in action—the best transfer is no transfer at all.
Pattern 4: Batch Your ray.get() Calls
Small change, big impact. A sequential loop of ray.get() calls blocks on each object one at a time. Batching lets Ray fetch them in parallel.
# Bad: blocks on each one sequentially results = [ray.get(ref) for ref in refs] # Good: parallel fetch, single blocking call results = ray.get(refs)
The bad version waits for object 1, then object 2, then object 3. If each takes 100ms, 100 objects take 10 seconds. The good version fetches all of them concurrently. Same 100 objects might take 200ms.
Pattern 5: Right-Size Your Resources
Default resource allocation wastes capacity. A task that only needs 1 CPU but doesn't declare it might block other tasks from scheduling. A task that needs 4 CPUs but doesn't say so might get insufficient resources and run slowly.
# Bad: defaults (1 CPU, no memory specification) @ray.remote def lightweight_task(x): return x * 2 # Good: explicit requirements @ray.remote(num_cpus=0.5) # Two of these can share a CPU def lightweight_task(x): return x * 2 @ray.remote(num_cpus=4, num_gpus=1, memory=8 * 1024 * 1024 * 1024) def heavy_training(data_ref): return train_xgboost(ray.get(data_ref))
Fractional CPUs let you pack more lightweight tasks per node. Explicit GPU requirements prevent tasks from landing on CPU-only nodes. Memory declarations help the scheduler avoid out-of-memory kills. All of this ties back to the resource management concepts in Part 4.
Five Mistakes That Hurt
Every pattern above came from making the opposite mistake first.
| Mistake | Symptom | Fix |
|---|---|---|
| Ignoring resource requirements | Tasks on wrong hardware, GPU nodes idle | Explicit num_cpus, num_gpus (Part 4) |
| Too many small tasks | Dashboard lag, scheduling latency spikes | Batch into larger chunks, tune event reporting (Part 4) |
| Assuming infinite object store | ObjectStoreFullError, disk spilling | Stream processing, right-size object_store_memory (Part 5) |
Synchronous ray.get() in loops | Parallel work serialized, slow pipelines | Collect refs first, batch fetch |
| Not monitoring the right things | Silent failures, mystery slowdowns | Watch GCS health, object store pressure, worker utilization |
Ignoring resource requirements bit us when we had GPU nodes sitting idle while CPU tasks queued. The scheduler didn't know our training tasks needed GPUs because we never told it. Two lines of decorator changes fixed a week of frustration.
Too many small tasks was the coordination tax from Part 4. Millions of millisecond tasks generated event storms that overwhelmed the GCS—the exact bottleneck we dissected in Part 6. We batched feature computations—instead of one task per feature, one task per feature group. Task count dropped 100x. Throughput doubled.
Assuming infinite object store caused our first production outage. We were ray.put()-ing intermediate results without thinking about memory. The object store filled, spilling started, and performance cratered. Now we monitor ray memory --stats-only and size the store based on our largest pipeline stage.
Synchronous ray.get() in loops is the most common mistake I see in Ray code reviews. It turns a distributed system back into a sequential one. The fix is mechanical: collect all your ObjectRefs, then call ray.get() once.
Not monitoring the right things is insidious. Ray can fail silently—an actor might restart repeatedly, tasks might spill to disk, the GCS might be under pressure—and your job still "completes," just 10x slower than it should. At Mechademy, we added alerts on object store spill rates, GCS event processing latency, and worker restart counts.
The Debugging Playbook
When something goes wrong in production, you need a systematic approach. Here's the playbook we use at Mechademy—symptom to fix in the fewest steps.
| Symptom | Check | Likely Cause | Fix |
|---|---|---|---|
| Tasks pending, workers idle | GCS logs | Coordination bottleneck | Tune event reporting interval (Parts 4 and 6) |
| PENDING_ARGS for seconds | Object store metrics | Data transfer delay | Locality optimization (Part 5) |
| Dashboard unresponsive | GCS health | Event storm from high task volume | Increase reporting interval (Parts 4 and 6) |
ObjectStoreFullError | ray memory | Memory pressure, too many pinned objects | Size object store, stream data (Part 5) |
| Actor methods slow | Worker logs | Serialization overhead on arguments | Reduce argument size, use object refs |
| Tasks retry repeatedly | Task error logs | Transient failures or resource exhaustion | Check max_retries, add error handling (Part 7) |
The debugging commands we use most:
# Cluster health overview ray status # Object store memory breakdown ray memory --stats-only # GCS logs on Kubernetes (head pod) kubectl logs <head-pod> -c ray-head | grep -i gcs # Check for spilling kubectl logs <head-pod> -c ray-head | grep -i spill # Worker-level task errors kubectl logs <worker-pod> -c ray-worker | grep -i error
The pattern is always the same: identify the symptom, check the relevant component, confirm the cause, apply the fix. No guessing. No "restart everything and hope." The mental model from this series—GCS for coordination, Raylet for scheduling, object store for data, workers for execution—tells you exactly where to look.
What We've Learned
Eight posts. Hundreds of pages of source code. Dozens of production incidents. Here are the insights that stuck.
Components coordinate through GCS, not through commands (Part 6). The executive office doesn't micromanage. It maintains the ledger—who exists, what's running, where data lives. Raylets and workers make local decisions using that shared state. This is why Ray scales where centralized schedulers don't.
Data stays put, compute moves to data (Parts 4 and 5). The scheduler prefers nodes where data already lives. The object store keeps data local. Network transfers happen only when necessary. Understanding this single principle eliminated half our performance issues.
Local decisions scale, global coordination doesn't (Part 4). Raylets handle scheduling locally whenever possible. The GCS only gets involved for cross-node decisions. This hybrid approach is why Ray handles millions of tasks without drowning in coordination overhead—as long as you don't generate event storms.
Immutability enables zero-copy sharing (Part 5). Objects in the store are sealed once written. Multiple workers read the same bytes in shared memory safely because nobody can modify them. This constraint—objects are read-only—is what makes the performance possible.
Fault tolerance comes from lineage and idempotency (Part 7). Ray doesn't prevent failures. It recovers from them by re-executing lost work from recorded lineage. Tasks are retried. Actors are restarted. The system heals without manual intervention.
Tasks for parallelism, actors for state (Part 3). This simple rule prevents most architectural mistakes. If the work is stateless and parallel, use tasks. If you need to maintain state across calls, use an actor. If you need both, use an actor that dispatches tasks.
What Wasn't Covered
This series focused on Ray Core—the foundation. There's a whole ecosystem built on top:
- Ray Train and Tune: Distributed training and hyperparameter optimization
- Ray Serve: Model serving with autoscaling
- Ray Data: Large-scale data processing pipelines
- Advanced KubeRay patterns: Multi-cluster federation, spot instance handling
- Ray 2.x improvements: Compiled DAGs, accelerated DAGs for lower overhead
Each of these deserves its own deep dive. But they all build on the foundation we've covered—GCS, Raylets, object store, scheduling, fault tolerance. Understanding the core makes the ecosystem approachable.
Try It Yourself
Here's a complete mini-pipeline that ties together concepts from every part of the series:
import ray import numpy as np ray.init() # Pattern: Store once, reference everywhere (Part 5) config = ray.put({"learning_rate": 0.01, "epochs": 10}) # Pattern: Actor orchestrates, tasks execute (Part 3) @ray.remote(num_cpus=1, max_retries=3) # Right-sized resources (Part 4), fault tolerance (Part 7) def process_batch(config_ref, batch_id): cfg = ray.get(config_ref) # Zero-copy if on same node (Part 5) # Simulate loading data on the worker (Part 5 locality) data = np.random.rand(1000, 100) loss = np.mean(data) * cfg["learning_rate"] return {"batch": batch_id, "loss": loss} # Pattern: Batch ray.get() calls (Part 4) refs = [process_batch.remote(config, i) for i in range(20)] results = ray.get(refs) # Single batch fetch, not sequential print(f"Processed {len(results)} batches") for r in results[:3]: print(f" Batch {r['batch']}: loss={r['loss']:.4f}") # Check cluster state (Part 2) print(f"\nCluster resources: {ray.cluster_resources()}")
Run this locally, watch it in the dashboard at http://localhost:8265, and trace the concepts: object store references, resource declarations, batch fetching, retry configuration. Every line maps to something we've studied.
The End of the Expedition
Seven parts ago, we asked: why Ray? Now we know—not just the marketing answer, but the engineering one.
Ray is a distributed company with an executive office (GCS) that tracks state, regional managers (Raylets) that schedule locally, employees (workers) that execute tasks, and file rooms (object stores) that share data efficiently. Understanding the runtime, the execution model, the scheduler, the object store, the coordination layer, and the recovery mechanisms doesn't just make you better at Ray. It makes you better at distributed systems.
The principles are universal. Data locality matters in every distributed system. Local decisions scale better than global ones. Immutability enables safe concurrency. Fault tolerance through lineage beats fault prevention through over-engineering.
Ray is engineering, not magic. Understanding the pieces makes you effective. When something breaks, you know where to look. When something's slow, you know what to measure. When you need to scale, you know what tradeoffs you're making.
That's the whole point of this series. Not to make Ray less impressive—but to make it less mysterious.
See you on the next expedition.