
When Nodes Die: Ray's Fault Tolerance
How Ray detects failures, retries tasks, recovers actors, and reconstructs lost objects—plus production patterns for building resilient distributed ML pipelines.
I watched the dashboard. Node 5 turned red. I expected our training pipeline to crash.
It didn't.
Tasks that had been running on that node just... reappeared on other nodes. The pipeline continued like nothing happened. No error messages, no manual intervention, no frantic Slack messages to the team. One node went down, and the system absorbed the failure.
That moment changed how I think about distributed systems. Not because it was dramatic—because it was boring. A node died, and nobody needed to care.
But I needed to understand why nobody needed to care. What actually happened between "node goes red" and "tasks reappear elsewhere"? How did Ray know what was lost? How did it decide where to reschedule? This isn't magic—it's the coordination layer from Part 6 enabling recovery.
At Mechademy, running on AWS with autoscaling workers means nodes come and go constantly. Spot instances get reclaimed by AWS with two minutes of warning. Workers scale down when load drops. Kubernetes pods get evicted during node maintenance. In this world, fault tolerance isn't a nice-to-have feature you enable for safety. It's how the system operates normally.
Our training pipelines run XGBoost on 50GB datasets across dozens of worker nodes. A single pipeline might take hours. If every spot instance reclamation required a full restart, we'd never finish a training run. The economics of cloud ML depend on cheap, unreliable compute—and that only works if your runtime can handle the unreliability.
So I dug into Ray's fault tolerance mechanisms. What I found was a layered system: heartbeat-based failure detection, automatic task retries, actor resurrection, and object reconstruction. Each layer builds on the components we've covered in earlier parts—the GCS tracking state, Raylets managing local resources, the object store holding intermediate results.
Here's the thing most documentation glosses over: fault tolerance isn't one feature. It's four different recovery strategies, each designed for a different type of loss. Losing a task mid-execution is fundamentally different from losing an actor with accumulated state, which is different from losing an object that downstream tasks depend on. Ray handles each differently.
TL;DR: Ray detects node failures through GCS heartbeats and recovers automatically. Stateless tasks retry with zero user effort. Stateful actors need checkpointing to survive restarts. Lost objects get reconstructed via lineage tracking. Design your pipelines for idempotency—it's the foundation everything else depends on.
What Happens When a Node Dies
The GCS detects failure through heartbeats within seconds, then triggers cascading recovery across tasks, objects, and actors on the dead node.
Every Raylet in the cluster sends periodic heartbeats to the GCS—the executive office from Part 2. These are lightweight "I'm alive" signals. The GCS expects to hear from each node at regular intervals. When the heartbeats stop, the GCS starts a timer. If a node misses enough consecutive heartbeats, the GCS marks it as dead.
This is the same heartbeat mechanism that keeps the cluster roster updated during normal operation. The difference is what happens after the mark.
Once the GCS declares a node dead, it triggers a cascade:
- Tasks that were running on that node get marked as failed
- Objects stored in that node's object store get marked as lost
- Actors living on that node get marked as dead
The GCS doesn't just detect the failure—it knows exactly what was lost. It tracks which tasks were running where, which objects live on which nodes, and which actors are assigned to which machines. This is why the GCS's metadata tracking (covered in Part 6) matters so much. Without that centralized record, recovery would be guesswork.
You can see node states yourself:
import ray ray.init() # Check which nodes are alive nodes = ray.nodes() for node in nodes: status = "ALIVE" if node["Alive"] else "DEAD" print(f"Node {node['NodeID'][:8]}... - {status}") print(f" Resources: {node['Resources']}")
Or from the command line:
ray statusThis shows you the cluster's current health—active nodes, pending nodes, recently failed nodes, and available resources. When debugging production issues, ray status is the first command I run.
The detection speed matters. Ray's default heartbeat interval is a few seconds. That means a node failure is typically detected within 10-30 seconds, depending on configuration. Fast enough for batch workloads. For latency-sensitive serving, you might want tighter detection—but faster detection also means more risk of false positives from network blips.
Task Fault Tolerance
Tasks are stateless, which makes them trivially recoverable. Ray retries them automatically—you just need to make sure your functions are safe to re-run.
Remember the distinction from Part 3? Tasks are the one-off employees—they run a function, return a result, and forget everything. No accumulated state. No side effects (ideally). This statelessness is exactly what makes them easy to retry.
When a task fails—whether from a node dying, an exception, or a timeout—Ray can simply run it again on a different worker. The task doesn't know or care that it's a retry. Same inputs, same function, same expected output.
By default, Ray retries failed tasks up to 3 times. You can configure this:
@ray.remote(max_retries=5, retry_exceptions=[ConnectionError, TimeoutError]) def fetch_and_process(url): data = download(url) return transform(data)
The max_retries parameter controls how many times Ray will attempt the task before giving up and raising the exception to the caller. The retry_exceptions parameter is more subtle—it tells Ray which exception types are worth retrying. A ConnectionError might be transient (retry it), but a ValueError from bad input data won't fix itself (don't retry).
This selective retry is important. Retrying a task that fails because the input data is malformed wastes cluster resources. But retrying a task that failed because the node hosting its input data went down? That's exactly what you want—Ray will reconstruct the input object and try again.
Here's how lineage tracking makes this work. When a task produces an output object, Ray records the lineage: which function, which arguments, which dependencies. If that output object is later lost (the node storing it died), Ray can trace back through the lineage and re-execute the task to regenerate it. This is object reconstruction through task replay.
At Mechademy, our feature engineering pipeline survived spot instance terminations precisely because of this. Each feature computation task takes a turbine configuration and a time window, then returns a feature vector. Stateless. Idempotent. If a spot instance gets reclaimed mid-computation, Ray just reruns those tasks on surviving nodes. We've had pipelines lose 3 out of 20 nodes and finish without any manual intervention.
The key design principle: make tasks idempotent. Running a task twice with the same inputs should produce the same result. No database writes without deduplication keys. No file writes without checking for existing output. No side effects that compound on retry.
When retries don't help: if a task writes to an external system without idempotency guards, retrying it could write duplicate records. If a task depends on external state that has changed between attempts, the retry might produce different results. And if the failure is permanent—bad configuration, missing dependency, corrupt data—retrying 1000 times won't help. Use retry_exceptions to limit retries to transient failures.
# Idempotent: safe to retry @ray.remote(max_retries=5) def compute_features(config_id, time_window): data = load_from_s3(config_id, time_window) return extract_features(data) # Pure function, same inputs → same outputs # NOT idempotent: dangerous to retry without guards @ray.remote(max_retries=0) # Disable retries for non-idempotent work def write_to_database(record): db.insert(record) # Retry = duplicate record
Actor Fault Tolerance
Actors carry state, so recovery is harder. Ray can restart the process, but restoring the state is your responsibility.
This is where fault tolerance gets interesting—and where Part 3's stateless/stateful distinction really pays off. Tasks are easy: re-run them. Actors are different. An actor that's been processing data for an hour has accumulated state—model weights, training progress, aggregated metrics. When the node hosting that actor dies, the process is gone. The state is gone.
Ray provides two knobs for actor recovery:
max_restarts: How many times Ray will restart the actor process after failuremax_task_retries: How many times to retry individual method calls on the actor
@ray.remote(max_restarts=3, max_task_retries=3) class ModelTrainer: def __init__(self, config): self.config = config self.checkpoint = self._load_latest_checkpoint() def train_epoch(self, data): result = train(self.checkpoint, data) self._save_checkpoint(result) return result def _load_latest_checkpoint(self): """Restore state on restart—this is the recovery mechanism.""" try: return load_from_s3(f"s3://checkpoints/{self.config['model_id']}/latest.pt") except FileNotFoundError: return initialize_model(self.config) def _save_checkpoint(self, state): """Persist state externally so restarts can recover.""" save_to_s3(state, f"s3://checkpoints/{self.config['model_id']}/latest.pt")
When this actor's node dies, Ray restarts the actor on a healthy node. The __init__ method runs again—and because we load the latest checkpoint in __init__, the actor resumes from where it left off. The process is new, but the state is recovered.
This is the checkpointing pattern: save state to external storage (S3, a database, shared filesystem) at meaningful intervals, and restore it on restart.
The frequency tradeoff is real. Checkpoint every method call, and you add latency to every operation. Checkpoint once per hour, and you lose up to an hour of work on failure. The right interval depends on how expensive your work is versus how expensive the checkpoint is.
At Mechademy, our training actors checkpoint every N epochs—where N depends on epoch duration. Short epochs (seconds each), we checkpoint every 50. Long epochs (minutes each), we checkpoint every one. The goal is losing no more than 5-10 minutes of work on any single failure.
When to Use Actors vs. Restructure to Tasks
Not every stateful workflow needs an actor. Sometimes the better design is breaking the work into stateless tasks with external state management.
| Characteristic | Tasks | Actors |
|---|---|---|
| State | None (stateless) | Accumulated across calls |
| Retry mechanism | Automatic, transparent | Restart process + reload state |
| Recovery complexity | Zero (just re-run) | You manage checkpointing |
| Failure blast radius | Single task lost | All in-memory state lost |
| Best for | Parallel, independent work | Long-lived services, coordination |
| Idempotency requirement | Essential | Helpful but not always possible |
| Max retries default | 3 | 0 (must opt in) |
If your "stateful" actor is really just aggregating results that could be computed independently, restructure it as stateless tasks with a final aggregation step. You'll get automatic fault tolerance for free.
Use actors when you genuinely need persistent state across calls: model servers, training coordinators, resource managers. And when you do—checkpoint religiously.
Object Reconstruction
Lost objects can be rebuilt if Ray still knows the task that created them. This lineage-based reconstruction is automatic but not infinite.
Objects in Ray's object store are immutable but not permanent—as we covered in Part 5. When a node dies, every object in that node's shared memory file room vanishes. Downstream tasks that depend on those objects would normally fail.
But Ray has a trick: lineage tracking. For every object in the store, Ray remembers which task created it and what inputs that task used. If an object is lost, Ray can trace back through the lineage, re-execute the original task, and regenerate the object.
Here's the reconstruction flow:
- Task B requests Object X (stored on the now-dead Node 5)
- Ray detects Object X is lost
- Ray looks up the lineage: "Object X was created by Task A with inputs [Y, Z]"
- Ray checks if inputs Y and Z are still available
- If yes, Ray re-executes Task A to regenerate Object X
- Object X is stored on a new node, and Task B proceeds
This is transparent. Task B doesn't know the object was reconstructed. It just waited a bit longer for ray.get() to return.
@ray.remote def load_and_compute(source_path): data = load_data(source_path) # Load inside the task return process(data) # Result stored in object store @ray.remote def downstream(result_ref): # If the node storing result_ref dies, Ray reconstructs it # by re-running load_and_compute automatically result = ray.get(result_ref) return analyze(result) # Data loaded inside the task — lineage is fully traceable intermediate = load_and_compute.remote("s3://bucket/data.parquet") final = ray.get(downstream.remote(intermediate))
Note: Objects created with
ray.put()are not reconstructable via lineage — Ray has no task to re-execute to recreate them. For lineage reconstruction to work, objects must originate from a task, not aray.put()call. If you need a large shared object to survive node failures, persist it to external storage (S3, shared filesystem) and reload it in a task.
But lineage tracking has limits. Ray doesn't keep infinite history. Lineage is evicted once memory usage exceeds RAY_max_lineage_bytes (default: 1GB). Once lineage is evicted, reconstruction fails. If the lineage is gone, reconstruction fails.
Reconstruction also fails when the original inputs are themselves lost and unrecoverable—a cascading lineage problem. Or when the task was fire-and-forget (no one held a reference to the output). Or when the task is non-deterministic and producing a different result would break downstream logic.
The practical takeaway: for critical intermediate results in long pipelines, don't rely solely on lineage reconstruction. Persist important checkpoints to external storage. Lineage reconstruction is a safety net, not a guarantee.
Building Resilient Pipelines
Not all failures need recovery. Sometimes the right response is to fail fast. Design your fault tolerance strategy around the cost of lost work versus the cost of recovery overhead.
After running Ray in production for over a year, here are the patterns that actually matter.
Pattern 1: Make Tasks Idempotent
This is the foundation. If your tasks are safe to re-run, Ray's automatic retry handles most failures without any effort from you. Idempotency means: same inputs, same outputs, no compounding side effects.
Pattern 2: Checkpoint Actors at Meaningful Boundaries
Don't checkpoint every method call—checkpoint at logical boundaries. After each training epoch. After processing each batch of records. After completing each stage of a pipeline. Match checkpoint frequency to the cost of lost work.
Pattern 3: Use External Storage for Critical State
S3, a database, a shared filesystem—anything outside the Ray cluster. Objects in the object store are transient by design. If losing something would require restarting the entire pipeline, it belongs in external storage.
Pattern 4: Size Retries Appropriately
Three retries for transient network failures. Zero retries for data validation errors. Five retries for spot instance interruptions with exponential backoff. Match your retry strategy to the failure mode.
| Recovery Strategy | When to Use | Tradeoff |
|---|---|---|
| Task retries (automatic) | Transient failures, stateless work | Minimal overhead, requires idempotency |
| Actor restarts + checkpointing | Long-lived stateful services | Checkpoint overhead vs. lost work |
| Object reconstruction (lineage) | Lost intermediate results | Re-execution cost, lineage must exist |
| External persistence (S3, DB) | Critical state, pipeline boundaries | I/O latency, storage cost |
| Fail fast (no recovery) | Permanent errors, bad input data | Pipeline stops, but no wasted retries |
At Mechademy, our training pipeline combines all of these. Feature engineering tasks are fully idempotent—retry freely. The training coordinator actor checkpoints every 10 epochs to S3. Intermediate feature matrices rely on lineage reconstruction for transient failures but get persisted to S3 at pipeline stage boundaries. And data validation failures kill the pipeline immediately—no point retrying bad data.
The most common mistake I see: teams enable max_retries=1000 everywhere and assume they're fault-tolerant. They're not. They're just retrying permanent failures 1000 times before crashing. Real fault tolerance is designing your pipeline so that transient failures recover automatically, permanent failures fail fast, and critical state survives independently of the cluster.
Key Takeaways
- GCS detects failures via heartbeats; node death is detected within seconds, triggering cascading recovery across tasks, objects, and actors
- Stateless tasks retry automatically with configurable
max_retriesand selectiveretry_exceptions—zero effort if your functions are idempotent - Stateful actors need checkpointing; Ray restarts the process, but restoring accumulated state is your responsibility
- Object reconstruction uses lineage tracking to re-execute the task that created a lost object—automatic but not infinite
- Design for idempotency; it's the single most important property for fault-tolerant distributed systems
- Not every failure needs recovery; sometimes failing fast saves more time than retrying a permanent error
What's Next
You now understand all the core pieces of Ray's architecture. The runtime boots up (Part 2). Tasks and actors execute your code (Part 3). The scheduler places work intelligently (Part 4). The object store shares data efficiently (Part 5). The GCS coordinates the cluster (Part 6). And now you know how the system recovers when things go wrong.
Part 8 brings it all together: production architectures, common mistakes, debugging playbooks, and the patterns that emerge when you've been running Ray at scale. We'll go from "I understand each piece" to "I know how to build and operate the whole thing."
The journey from "Ray is magic" to "Ray is engineering" is almost complete.
Try It Yourself
import ray import time ray.init() @ray.remote(max_retries=3) def unreliable_task(x): """Simulates a task that fails ~30% of the time.""" import random if random.random() < 0.3: raise RuntimeError("Transient failure!") return x * 2 # Run 20 tasks - some will fail and be automatically retried refs = [unreliable_task.remote(i) for i in range(20)] results = ray.get(refs) print(f"All {len(results)} tasks completed despite transient failures") print(f"Results: {sorted(results)}")
Run this a few times. Watch how every invocation completes successfully even though individual attempts fail randomly. That's Ray's task retry in action—no recovery code, no try/except, no manual restart logic. Just stateless functions and automatic retries.
Now imagine that reliability applied to a pipeline processing 50GB of turbine data across 20 nodes. That's what makes distributed ML practical.
See you in Part 8.