Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Cluster Example

This chapter demonstrates building a distributed RPC cluster with automatic worker discovery, load balancing, and failure detection using RpcNet's built-in cluster features.

Architecture Overview

The cluster example showcases three main components working together:

                    ┌──────────────────────────┐
                    │      Director            │
                    │  (Coordinator Node)      │
                    │                          │
                    │  - WorkerRegistry        │
                    │  - ClusterClient         │
                    │  - Load Balancing        │
                    └────────┬─────────────────┘
                             │
                    Gossip Protocol (SWIM)
                             │
            ┌────────────────┼────────────────┐
            │                                 │
    ┌───────▼────────┐              ┌────────▼───────┐
    │   Worker A      │              │   Worker B      │
    │                 │              │                 │
    │  - Auto-join    │              │  - Auto-join    │
    │  - Tag: worker  │              │  - Tag: worker  │
    │  - Process tasks│              │  - Process tasks│
    └─────────────────┘              └─────────────────┘

Components

1. Director - Coordinator node that:

  • Uses WorkerRegistry for automatic worker discovery
  • Uses ClusterClient for load-balanced request routing
  • Employs LeastConnections strategy by default
  • Monitors worker pool status
  • Routes client requests to healthy workers

2. Workers - Processing nodes that:

  • Join cluster automatically via gossip protocol
  • Tag themselves with role=worker for discovery
  • Process compute tasks from clients
  • Monitor cluster events (node joined/left/failed)
  • Support simulated failures for testing

3. Client - Application that:

  • Connects to director
  • Gets worker assignment
  • Establishes direct connection to worker
  • Handles failover automatically

Why Use Built-in Cluster Features?

Compared to manual worker management patterns:

Manual Approach ❌:

  • Custom HashMap<Uuid, WorkerInfo> for tracking
  • Manual round-robin selection logic
  • Explicit RPC calls for worker registration
  • Custom ping-based health checks
  • ~200 lines of boilerplate code

Built-in Cluster ✅:

  • Built-in WorkerRegistry + ClusterClient
  • Multiple load balancing strategies (Round Robin, Random, Least Connections)
  • Automatic discovery via SWIM gossip protocol
  • Phi Accrual failure detection (accurate, adaptive)
  • ~50 lines to set up
  • 75% code reduction!

Running the Example

Prerequisites

Ensure test certificates exist:

ls certs/test_cert.pem certs/test_key.pem

All commands should be run from the project root directory.

Basic Setup

Open four terminals and run each component:

Terminal 1 - Director:

DIRECTOR_ADDR=127.0.0.1:61000 \
  RUST_LOG=info \
  cargo run --manifest-path examples/cluster/Cargo.toml --bin director

Terminal 2 - Worker A:

WORKER_LABEL=worker-a \
  WORKER_ADDR=127.0.0.1:62001 \
  DIRECTOR_ADDR=127.0.0.1:61000 \
  RUST_LOG=info \
  cargo run --manifest-path examples/cluster/Cargo.toml --bin worker

Terminal 3 - Worker B:

WORKER_LABEL=worker-b \
  WORKER_ADDR=127.0.0.1:62002 \
  DIRECTOR_ADDR=127.0.0.1:61000 \
  RUST_LOG=info \
  cargo run --manifest-path examples/cluster/Cargo.toml --bin worker

Terminal 4 - Client:

DIRECTOR_ADDR=127.0.0.1:61000 \
  RUST_LOG=info \
  cargo run --manifest-path examples/cluster/Cargo.toml --bin client

What You'll See

Director Output:

🎯 Starting Director at 127.0.0.1:61000
📁 Loading certificates from "../../certs/test_cert.pem"
✅ Director registered itself in cluster
✅ Cluster enabled - Director is now discoverable
🔄 Load balancing strategy: LeastConnections
📊 Worker pool status: 2 workers available
   - worker-a at 127.0.0.1:62001 (0 connections)
   - worker-b at 127.0.0.1:62002 (0 connections)
🚀 Director ready - listening on 127.0.0.1:61000

Worker Output:

👷 Starting Worker 'worker-a' at 127.0.0.1:62001
🔌 Binding server to 127.0.0.1:62001...
✅ Server bound successfully
🌐 Enabling cluster, connecting to director at 127.0.0.1:61000...
✅ Cluster enabled, connected to director
🏷️  Tagging worker with role=worker and label=worker-a...
✅ Worker 'worker-a' joined cluster with role=worker
🚀 Worker 'worker-a' is running and ready to handle requests

Client Output:

📡 Starting Client - connecting to director at 127.0.0.1:61000
✅ connected to director
🔀 director assigned worker - establishing direct connection
✅ direct connection established to worker
📤 creating request stream
🌊 stream opened successfully, starting to consume responses
📦 received token (sequence=1, text="token-1", total=1)
📦 received token (sequence=2, text="token-2", total=2)
...

Testing Failure Scenarios

Simulated Worker Failures

Enable periodic failures to test automatic failover:

Worker with Failures:

WORKER_LABEL=worker-a \
  WORKER_ADDR=127.0.0.1:62001 \
  DIRECTOR_ADDR=127.0.0.1:61000 \
  WORKER_FAILURE_ENABLED=true \  # Enable failure simulation
  RUST_LOG=info \
  cargo run --manifest-path examples/cluster/Cargo.toml --bin worker

Failure Cycle (~18 seconds):

  1. Run: 10 seconds of normal operation
  2. Warning: "⚠️ Simulating worker failure in 3 seconds..."
  3. Failed: 5 seconds in failed state - "💥 Worker failed!"
  4. Recovery: "🔄 Worker recovering..."
  5. Ready: "✅ Worker recovered and ready to serve!"
  6. Repeat

Client Behavior:

  • Detects failure via error response
  • Returns to director for new worker assignment
  • Switches to healthy worker seamlessly
  • Streaming continues with minimal interruption

Hard Kill Test

Test network-level failure detection:

# In a worker terminal, press Ctrl+C

Observe:

  • Director detects failure via gossip protocol
  • WorkerRegistry removes worker from pool
  • Client requests automatically route to remaining workers
  • Zero downtime for ongoing operations

Worker Restart Test

After killing a worker, restart it to see re-discovery:

WORKER_LABEL=worker-a \
  WORKER_ADDR=127.0.0.1:62001 \
  DIRECTOR_ADDR=127.0.0.1:61000 \
  RUST_LOG=info \
  cargo run --manifest-path examples/cluster/Cargo.toml --bin worker

Observe:

  • Worker automatically rejoins cluster
  • Gossip spreads worker availability
  • Director adds worker back to registry
  • Client requests resume to all available workers

How It Works

1. Automatic Discovery

Workers don't manually register - they just join the cluster:

#![allow(unused)]
fn main() {
// Worker code (simplified)
let cluster = ClusterMembership::new(config).await?;
cluster.join(vec![director_addr]).await?;

// Tag for discovery
cluster.set_tag("role", "worker");
cluster.set_tag("label", worker_label);

// That's it! Director discovers automatically via gossip
}

2. Load Balancing

Director uses WorkerRegistry for automatic load balancing:

#![allow(unused)]
fn main() {
// Director code
let registry = Arc::new(WorkerRegistry::new(
    cluster,
    LoadBalancingStrategy::LeastConnections
));
registry.start().await;

// Automatically tracks workers and balances load
}

3. Failure Detection

Phi Accrual algorithm provides accurate health monitoring:

  • Adapts to network conditions
  • Distinguishes slow nodes from failed nodes
  • No false positives from temporary delays
  • Automatic recovery when nodes return

4. Tag-Based Routing

Filter workers by capabilities:

#![allow(unused)]
fn main() {
// Get only GPU workers
let gpu_worker = registry.select_worker(Some("gpu=true")).await?;

// Get any worker
let any_worker = registry.select_worker(Some("role=worker")).await?;
}

Key Cluster Features Demonstrated

✅ Automatic Discovery

No manual registration needed - gossip protocol handles everything

✅ Load Balancing

Choose from:

  • Round Robin: Even distribution
  • Random: Stateless workload distribution
  • Least Connections: Balance based on current load (recommended)

✅ Failure Detection

Phi Accrual algorithm provides accurate, adaptive health monitoring

✅ Tag-Based Routing

Route by worker capabilities (GPU, CPU, zone, etc.)

✅ Event Monitoring

Subscribe to cluster events:

  • NodeJoined - New worker available
  • NodeLeft - Worker gracefully departed
  • NodeFailed - Worker detected as failed

Configuration Options

Environment Variables

Director:

  • DIRECTOR_ADDR - Bind address (default: 127.0.0.1:61000)
  • RUST_LOG - Log level (e.g., info, debug)

Worker:

  • WORKER_LABEL - Worker identifier (default: worker-1)
  • WORKER_ADDR - Bind address (default: 127.0.0.1:62001)
  • DIRECTOR_ADDR - Director address (default: 127.0.0.1:61000)
  • WORKER_FAILURE_ENABLED - Enable failure simulation (default: false)
  • RUST_LOG - Log level

Client:

  • DIRECTOR_ADDR - Director address (default: 127.0.0.1:61000)
  • RUST_LOG - Log level

Load Balancing Strategies

#![allow(unused)]
fn main() {
use rpcnet::cluster::LoadBalancingStrategy;

// Options:
LoadBalancingStrategy::RoundRobin       // Even distribution
LoadBalancingStrategy::Random           // Random selection
LoadBalancingStrategy::LeastConnections // Pick least loaded (recommended)
}

Cluster Configuration

#![allow(unused)]
fn main() {
use rpcnet::cluster::ClusterConfig;

let config = ClusterConfig::default()
    .with_gossip_interval(Duration::from_secs(1))
    .with_health_check_interval(Duration::from_secs(2));
}

Troubleshooting

Workers not discovered:

  • Ensure director starts first (it's the seed node)
  • Check firewall allows UDP for gossip
  • Verify workers connect to correct director address

Requests failing:

  • Check worker has role=worker tag
  • Verify compute handler is registered
  • Check logs for connection errors

Slow failover:

  • Adjust health check interval in config
  • Tune Phi Accrual threshold
  • Check network latency

Production Considerations

For production deployments:

  1. TLS Certificates: Use proper certificates, not test certs
  2. Monitoring: Integrate cluster events with your monitoring system
  3. Scaling: Add more workers dynamically as needed
  4. Persistence: Consider persisting cluster state if needed
  5. Security: Add authentication and authorization
  6. Network: Plan for network partitions and split-brain scenarios

Next Steps

  • Try different load balancing strategies
  • Add more workers dynamically
  • Test network partition scenarios
  • Add custom tags for routing (zone, GPU, etc.)
  • Integrate with your application logic

For full source code, see examples/cluster/ in the repository.