Back to Home
Production Ray: Patterns, Mistakes, and Lessons
Distributed Systems

Production Ray: Patterns, Mistakes, and Lessons

The final part of our Ray deep dive: five production patterns that work, five mistakes that hurt, and a debugging playbook for distributed ML systems.

March 15, 2026
#Ray#Distributed Computing#Python#ML Infrastructure#Production#Architecture

Seven posts ago, Ray was magic.

We knew the APIs—@ray.remote, ray.get(), maybe actors if we were feeling brave. But when something broke, we guessed. When scaling failed, we threw resources at it. When performance lagged, we shrugged.

Today, debugging is different. Dashboard shows workers idle? Check the GCS logs for coordination bottlenecks (Part 6). Tasks stuck in PENDING_ARGS? Check object store capacity and locality (Parts 4 and 5). Training slow despite resources? Check if we're hitting serialization overhead or event storms (Part 4).

The difference isn't that we're smarter. It's that we understand the system.

I spent months building this mental model—reading source code, debugging production failures at Mechademy, tracing tasks through logs at 2am. The journey from Part 1 (why Ray exists) through Part 2 (the runtime), Part 3 (execution models), Part 4 (scheduling), Part 5 (object store), Part 6 (coordination), and Part 7 (fault tolerance) wasn't about memorizing internals. It was about building intuition.

This final post isn't about one more Ray feature. It's about patterns—the architectures that work, the mistakes that hurt, and the lessons that stick. Think of it as the field manual that comes after the training course.

Everything here comes from running Ray in production. Not toy examples. Not benchmarks on synthetic data. Real ML pipelines, real failures, real fixes.

TL;DR: Five production patterns (store once/reference everywhere, actors orchestrate/tasks execute, create data where it's needed, batch your ray.get() calls, right-size resources). Five common mistakes with symptoms and fixes. A debugging playbook for when things go sideways. And a reflection on what seven posts of digging into Ray's internals actually taught us.


The ML Lifecycle Architecture

Before the patterns, let's talk about what we actually run. This is the production architecture at Mechademy—the thing all seven previous posts were building toward.

Our system trains thousands of custom turbomachinery models. Each turbine gets its own XGBoost model, trained on 50GB of sensor data. Data security constraints mean we can't share data between clients. Scale means we can't train sequentially.

Here's the architecture:

Lightweight head node (coordination only): Always on, cheap. Runs the GCS (executive office from Part 2), the dashboard, and the Ray Jobs API. Costs roughly $50/month on a small AWS instance. Zero task execution happens here.

python
# Head node: coordination only, no task execution
ray start --head --num-cpus=0 --num-gpus=0
# Workers autoscale from 0 to N based on demand

Setting num_cpus=0 is the key—we covered this pattern in Part 4. The head node is a regional manager with no employees. It coordinates, but it doesn't do the work.

Autoscaling workers via KubeRay: GPU and CPU workers spin up on-demand through Kubernetes. When a training job arrives, KubeRay provisions the nodes. When training finishes, workers scale back to zero. We pay for compute only when compute is happening.

Job submission via Ray Jobs API: Our orchestration platform submits jobs to the head node. No SSH, no direct cluster access. Jobs carry their own dependencies and configuration.

Monitoring through Ray Dashboard: Real-time visibility into task execution, resource utilization, object store pressure, and worker health.

Why this architecture? Three reasons.

Cost: Idle workers cost nothing because they don't exist. The always-on head node is tiny. We went from spending thousands on persistent GPU clusters to paying per-training-minute.

Separation of concerns: The head node handles coordination. Workers handle computation. When we need to debug scheduling, we look at head node logs. When we need to debug execution, we look at worker logs. Clean boundaries.

Resilience: If a worker dies mid-training, Part 7's fault tolerance mechanisms kick in—task retries, lineage reconstruction, actor restarts. The head node stays up, so the cluster doesn't lose its brain.

This is the production version of what we've been building toward across the series. Every component maps to something we've studied: GCS coordination, Raylet scheduling, object store data management, fault tolerance recovery.


Five Patterns That Work

These aren't theoretical. Each one solved a real problem in our pipeline.

Pattern 1: Store Once, Reference Everywhere

The single most impactful optimization we made. Covered in depth in Part 5, but worth repeating because I still see teams getting this wrong.

python
# Bad: serializes data for every task
results = [process.remote(large_data) for _ in range(100)]
 
# Good: one copy, 100 references
data_ref = ray.put(large_data)
results = [process.remote(data_ref) for _ in range(100)]

The first version serializes large_data 100 times. For a 500MB dataset, that's 50GB of serialization overhead. The second version serializes once into the object store and passes a lightweight reference. Same result, fraction of the cost.

Pattern 2: Actors Orchestrate, Tasks Execute

This confused me at first. I'd create an actor and do all the work inside actor methods—sequentially. The actor became a bottleneck because actor methods execute one at a time.

The fix: use the actor for state management and dispatch parallel tasks for the heavy lifting.

python
@ray.remote
class TrainingOrchestrator:
    def __init__(self):
        self.results = {}
 
    def run_pipeline(self, configs):
        # Actor dispatches parallel tasks
        refs = [train_model.remote(cfg) for cfg in configs]
        # Actor collects and stores results
        self.results = dict(zip(
            [c["model_id"] for c in configs],
            ray.get(refs)
        ))
        return len(self.results)
 
@ray.remote
def train_model(config):
    # Stateless, parallel execution
    return {"model_id": config["model_id"], "accuracy": 0.95}

The actor holds state (Part 3). Tasks do the parallel work. Best of both worlds.

Pattern 3: Create Data Where It's Needed

When we first built our pipeline, the driver loaded all the data and shipped it to workers. For 50GB datasets, that meant the head node became a data bottleneck—everything funneled through one machine's network interface.

The fix: load data on the workers themselves.

python
# Bad: driver loads data, ships to workers
data = load_from_s3("s3://bucket/turbine_data.parquet")
data_ref = ray.put(data)  # Stored on driver node
results = [process.remote(data_ref) for _ in range(20)]
# 20 network transfers from driver to workers
 
# Good: workers load their own data
@ray.remote
def load_and_process(partition_path):
    data = load_from_s3(partition_path)  # Loaded locally
    return process(data)  # No network transfer needed
 
results = [load_and_process.remote(f"s3://bucket/part_{i}.parquet")
           for i in range(20)]

Data stays where it's created. Compute doesn't wait for transfers. This is the locality principle from Part 5 in action—the best transfer is no transfer at all.

Pattern 4: Batch Your ray.get() Calls

Small change, big impact. A sequential loop of ray.get() calls blocks on each object one at a time. Batching lets Ray fetch them in parallel.

python
# Bad: blocks on each one sequentially
results = [ray.get(ref) for ref in refs]
 
# Good: parallel fetch, single blocking call
results = ray.get(refs)

The bad version waits for object 1, then object 2, then object 3. If each takes 100ms, 100 objects take 10 seconds. The good version fetches all of them concurrently. Same 100 objects might take 200ms.

Pattern 5: Right-Size Your Resources

Default resource allocation wastes capacity. A task that only needs 1 CPU but doesn't declare it might block other tasks from scheduling. A task that needs 4 CPUs but doesn't say so might get insufficient resources and run slowly.

python
# Bad: defaults (1 CPU, no memory specification)
@ray.remote
def lightweight_task(x):
    return x * 2
 
# Good: explicit requirements
@ray.remote(num_cpus=0.5)  # Two of these can share a CPU
def lightweight_task(x):
    return x * 2
 
@ray.remote(num_cpus=4, num_gpus=1, memory=8 * 1024 * 1024 * 1024)
def heavy_training(data_ref):
    return train_xgboost(ray.get(data_ref))

Fractional CPUs let you pack more lightweight tasks per node. Explicit GPU requirements prevent tasks from landing on CPU-only nodes. Memory declarations help the scheduler avoid out-of-memory kills. All of this ties back to the resource management concepts in Part 4.


Five Mistakes That Hurt

Every pattern above came from making the opposite mistake first.

MistakeSymptomFix
Ignoring resource requirementsTasks on wrong hardware, GPU nodes idleExplicit num_cpus, num_gpus (Part 4)
Too many small tasksDashboard lag, scheduling latency spikesBatch into larger chunks, tune event reporting (Part 4)
Assuming infinite object storeObjectStoreFullError, disk spillingStream processing, right-size object_store_memory (Part 5)
Synchronous ray.get() in loopsParallel work serialized, slow pipelinesCollect refs first, batch fetch
Not monitoring the right thingsSilent failures, mystery slowdownsWatch GCS health, object store pressure, worker utilization

Ignoring resource requirements bit us when we had GPU nodes sitting idle while CPU tasks queued. The scheduler didn't know our training tasks needed GPUs because we never told it. Two lines of decorator changes fixed a week of frustration.

Too many small tasks was the coordination tax from Part 4. Millions of millisecond tasks generated event storms that overwhelmed the GCS—the exact bottleneck we dissected in Part 6. We batched feature computations—instead of one task per feature, one task per feature group. Task count dropped 100x. Throughput doubled.

Assuming infinite object store caused our first production outage. We were ray.put()-ing intermediate results without thinking about memory. The object store filled, spilling started, and performance cratered. Now we monitor ray memory --stats-only and size the store based on our largest pipeline stage.

Synchronous ray.get() in loops is the most common mistake I see in Ray code reviews. It turns a distributed system back into a sequential one. The fix is mechanical: collect all your ObjectRefs, then call ray.get() once.

Not monitoring the right things is insidious. Ray can fail silently—an actor might restart repeatedly, tasks might spill to disk, the GCS might be under pressure—and your job still "completes," just 10x slower than it should. At Mechademy, we added alerts on object store spill rates, GCS event processing latency, and worker restart counts.


The Debugging Playbook

When something goes wrong in production, you need a systematic approach. Here's the playbook we use at Mechademy—symptom to fix in the fewest steps.

SymptomCheckLikely CauseFix
Tasks pending, workers idleGCS logsCoordination bottleneckTune event reporting interval (Parts 4 and 6)
PENDING_ARGS for secondsObject store metricsData transfer delayLocality optimization (Part 5)
Dashboard unresponsiveGCS healthEvent storm from high task volumeIncrease reporting interval (Parts 4 and 6)
ObjectStoreFullErrorray memoryMemory pressure, too many pinned objectsSize object store, stream data (Part 5)
Actor methods slowWorker logsSerialization overhead on argumentsReduce argument size, use object refs
Tasks retry repeatedlyTask error logsTransient failures or resource exhaustionCheck max_retries, add error handling (Part 7)

The debugging commands we use most:

bash
# Cluster health overview
ray status
 
# Object store memory breakdown
ray memory --stats-only
 
# GCS logs on Kubernetes (head pod)
kubectl logs <head-pod> -c ray-head | grep -i gcs
 
# Check for spilling
kubectl logs <head-pod> -c ray-head | grep -i spill
 
# Worker-level task errors
kubectl logs <worker-pod> -c ray-worker | grep -i error

The pattern is always the same: identify the symptom, check the relevant component, confirm the cause, apply the fix. No guessing. No "restart everything and hope." The mental model from this series—GCS for coordination, Raylet for scheduling, object store for data, workers for execution—tells you exactly where to look.


What We've Learned

Eight posts. Hundreds of pages of source code. Dozens of production incidents. Here are the insights that stuck.

Components coordinate through GCS, not through commands (Part 6). The executive office doesn't micromanage. It maintains the ledger—who exists, what's running, where data lives. Raylets and workers make local decisions using that shared state. This is why Ray scales where centralized schedulers don't.

Data stays put, compute moves to data (Parts 4 and 5). The scheduler prefers nodes where data already lives. The object store keeps data local. Network transfers happen only when necessary. Understanding this single principle eliminated half our performance issues.

Local decisions scale, global coordination doesn't (Part 4). Raylets handle scheduling locally whenever possible. The GCS only gets involved for cross-node decisions. This hybrid approach is why Ray handles millions of tasks without drowning in coordination overhead—as long as you don't generate event storms.

Immutability enables zero-copy sharing (Part 5). Objects in the store are sealed once written. Multiple workers read the same bytes in shared memory safely because nobody can modify them. This constraint—objects are read-only—is what makes the performance possible.

Fault tolerance comes from lineage and idempotency (Part 7). Ray doesn't prevent failures. It recovers from them by re-executing lost work from recorded lineage. Tasks are retried. Actors are restarted. The system heals without manual intervention.

Tasks for parallelism, actors for state (Part 3). This simple rule prevents most architectural mistakes. If the work is stateless and parallel, use tasks. If you need to maintain state across calls, use an actor. If you need both, use an actor that dispatches tasks.

What Wasn't Covered

This series focused on Ray Core—the foundation. There's a whole ecosystem built on top:

  • Ray Train and Tune: Distributed training and hyperparameter optimization
  • Ray Serve: Model serving with autoscaling
  • Ray Data: Large-scale data processing pipelines
  • Advanced KubeRay patterns: Multi-cluster federation, spot instance handling
  • Ray 2.x improvements: Compiled DAGs, accelerated DAGs for lower overhead

Each of these deserves its own deep dive. But they all build on the foundation we've covered—GCS, Raylets, object store, scheduling, fault tolerance. Understanding the core makes the ecosystem approachable.


Try It Yourself

Here's a complete mini-pipeline that ties together concepts from every part of the series:

python
import ray
import numpy as np
 
ray.init()
 
# Pattern: Store once, reference everywhere (Part 5)
config = ray.put({"learning_rate": 0.01, "epochs": 10})
 
# Pattern: Actor orchestrates, tasks execute (Part 3)
@ray.remote(num_cpus=1, max_retries=3)  # Right-sized resources (Part 4), fault tolerance (Part 7)
def process_batch(config_ref, batch_id):
    cfg = ray.get(config_ref)  # Zero-copy if on same node (Part 5)
    # Simulate loading data on the worker (Part 5 locality)
    data = np.random.rand(1000, 100)
    loss = np.mean(data) * cfg["learning_rate"]
    return {"batch": batch_id, "loss": loss}
 
# Pattern: Batch ray.get() calls (Part 4)
refs = [process_batch.remote(config, i) for i in range(20)]
results = ray.get(refs)  # Single batch fetch, not sequential
 
print(f"Processed {len(results)} batches")
for r in results[:3]:
    print(f"  Batch {r['batch']}: loss={r['loss']:.4f}")
 
# Check cluster state (Part 2)
print(f"\nCluster resources: {ray.cluster_resources()}")

Run this locally, watch it in the dashboard at http://localhost:8265, and trace the concepts: object store references, resource declarations, batch fetching, retry configuration. Every line maps to something we've studied.


The End of the Expedition

Seven parts ago, we asked: why Ray? Now we know—not just the marketing answer, but the engineering one.

Ray is a distributed company with an executive office (GCS) that tracks state, regional managers (Raylets) that schedule locally, employees (workers) that execute tasks, and file rooms (object stores) that share data efficiently. Understanding the runtime, the execution model, the scheduler, the object store, the coordination layer, and the recovery mechanisms doesn't just make you better at Ray. It makes you better at distributed systems.

The principles are universal. Data locality matters in every distributed system. Local decisions scale better than global ones. Immutability enables safe concurrency. Fault tolerance through lineage beats fault prevention through over-engineering.

Ray is engineering, not magic. Understanding the pieces makes you effective. When something breaks, you know where to look. When something's slow, you know what to measure. When you need to scale, you know what tradeoffs you're making.

That's the whole point of this series. Not to make Ray less impressive—but to make it less mysterious.

See you on the next expedition.

Discussion