Introduction
Version: 0.1.0 | Features: Cluster Management, Streaming, Code Generation
RpcNet is a high-performance QUIC-based RPC library built on s2n-quic
. The library provides
high-level server and client primitives, TLS configuration helpers, rich support for
unary and streaming request flows, and complete distributed cluster management. This book
centralizes the user-facing materials so you can learn RpcNet in one place.
Key Capabilities
Core RPC
- TLS-first configuration for both client and server components
- Simple registration of request handlers with async closures
- Bidirectional, client-streaming, and server-streaming support
- Structured error reporting through
RpcError
- Test-friendly abstractions that allow mocking QUIC streams
Distributed Systems (v0.1.0+)
- Cluster Management: Built-in gossip protocol (SWIM) for node discovery
- Load Balancing: Multiple strategies (Round Robin, Random, Least Connections)
- Health Checking: Phi Accrual failure detection
- Tag-Based Routing: Route requests by worker capabilities
- Auto-Failover: Zero-downtime worker replacement
How To Read This Book
- Getting Started walks through installing RpcNet and creating your first service.
- Core Concepts introduces the configuration model, error types, and runtime fundamentals.
- Cluster Example demonstrates building distributed systems with automatic discovery and load balancing.
- Streaming Patterns covers bidirectional and one-way streaming.
- rpcnet-gen CLI explains the code generation tool and workflows.
Throughout the chapters you will find executable snippets based on the working examples in the repository.
Getting Started
This tutorial mirrors the examples/basic_greeting
sample and shows, step by
step, how to install RpcNet, run the rpcnet-gen
CLI, and integrate the
generated code into your own project.
Step 0: Prerequisites
- Rust 1.75+ (
rustup show
to confirm) cargo
on yourPATH
- macOS or Linux (QUIC/TLS support is bundled through
s2n-quic
)
Step 1: Create a new crate
cargo new hello-rpc
cd hello-rpc
Step 2: Add the RpcNet runtime crate
cargo add rpcnet
RpcNet enables the high-performance perf
feature by default. If you need to
opt out (e.g. another allocator is already selected), edit Cargo.toml
:
[dependencies]
rpcnet = { version = "0.1", default-features = false }
You will also want serde
for request/response types, just like the example:
serde = { version = "1", features = ["derive"] }
Step 3: Install the rpcnet-gen CLI
Starting with v0.1.0, the CLI is included by default when you install rpcnet:
cargo install rpcnet # CLI automatically included!
Verify the install:
rpcnet-gen --help
You should see the full usage banner:
Generate RPC client and server code from service definitions
Usage: rpcnet-gen [OPTIONS] --input <INPUT>
Options:
-i, --input <INPUT> Input .rpc file (Rust source with service trait)
-o, --output <OUTPUT> Output directory for generated code [default: src/generated]
--server-only Generate only server code
--client-only Generate only client code
--types-only Generate only type definitions
-h, --help Print help
-V, --version Print version
Step 4: Author a service definition
Create src/greeting.rpc.rs
describing your protocol. The syntax is ordinary
Rust with a #[rpcnet::service]
attribute, so you can leverage the compiler and
IDE tooling while you design the API:
#![allow(unused)] fn main() { // src/greeting.rpc.rs use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, Debug, Clone)] pub struct GreetRequest { pub name: String, } #[derive(Serialize, Deserialize, Debug, Clone)] pub struct GreetResponse { pub message: String, } #[derive(Serialize, Deserialize, Debug, Clone)] pub enum GreetingError { EmptyName, InvalidInput(String), } #[rpcnet::service] pub trait Greeting { async fn greet(&self, request: GreetRequest) -> Result<GreetResponse, GreetingError>; } }
Step 5: Generate client and server code
Point the CLI at the .rpc
file and choose an output directory. Here we mirror
examples/basic_greeting
by writing into src/generated
:
rpcnet-gen --input src/greeting.rpc.rs --output src/generated
The CLI confirms what it created:
π¦ Generating code for service: Greeting
β
Generated server: src/generated/greeting/server.rs
β
Generated client: src/generated/greeting/client.rs
β
Generated types: src/generated/greeting/types.rs
β¨ Code generation complete!
π Add the following to your code to use the generated service:
#[path = "generated/greeting/mod.rs"]
mod greeting;
use greeting::*;
Inspect the directory to see the modules that were createdβthis matches the
layout under examples/basic_greeting/generated/
:
src/generated/
βββ greeting/
βββ client.rs # async client wrapper for calling the service
βββ mod.rs # re-exports so `use greeting::*` pulls everything in
βββ server.rs # server harness plus `GreetingHandler` trait
βββ types.rs # request/response/error structs cloned from the .rpc file
client.rs
exposes GreetingClient
, server.rs
wires your implementation into
the transport via GreetingServer
, and types.rs
contains the shared data
structures.
Step 6: Wire the generated code into your project
Reference the generated module and bring the types into scope. For example,
in src/main.rs
:
#![allow(unused)] fn main() { #[path = "generated/greeting/mod.rs"] mod greeting; use greeting::client::GreetingClient; use greeting::server::{GreetingHandler, GreetingServer}; use greeting::{GreetRequest, GreetResponse, GreetingError}; use rpcnet::RpcConfig; }
From here there are two pieces to wire up:
-
Server β implement the generated
GreetingHandler
trait and launch the harness. This mirrorsexamples/basic_greeting/server.rs
:struct MyGreetingService; #[async_trait::async_trait] impl GreetingHandler for MyGreetingService { async fn greet(&self, request: GreetRequest) -> Result<GreetResponse, GreetingError> { Ok(GreetResponse { message: format!("Hello, {}!", request.name) }) } } #[tokio::main] async fn main() -> anyhow::Result<()> { let config = RpcConfig::new("certs/test_cert.pem", "127.0.0.1:8080") .with_key_path("certs/test_key.pem") .with_server_name("localhost"); GreetingServer::new(MyGreetingService, config).serve().await?; Ok(()) }
GreetingServer::serve
handles QUIC I/O, wiring your implementation to the generated protocol handlers.Tuning worker threads (optional). By default Tokio uses the number of available CPU cores. To override this for RpcNet services, set
RPCNET_SERVER_THREADS
and build your runtime manually:fn main() -> anyhow::Result<()> { let worker_threads = rpcnet::runtime::server_worker_threads(); let runtime = tokio::runtime::Builder::new_multi_thread() .worker_threads(worker_threads) .enable_all() .build()?; runtime.block_on(async { // existing async server logic goes here Ok::<_, anyhow::Error>(()) })?; Ok(()) }
Run the binary with a custom thread count:
RPCNET_SERVER_THREADS=8 cargo run
Adjust the command if your server lives in a different binary target (for example
cargo run --bin my-server
).If you keep using the
#[tokio::main]
macro, Tokio will also honour the upstreamTOKIO_WORKER_THREADS
environment variable. -
Client β construct
GreetingClient
to invoke the RPC. Compare withexamples/basic_greeting/client.rs
:#[tokio::main] async fn main() -> anyhow::Result<()> { let config = RpcConfig::new("certs/test_cert.pem", "127.0.0.1:0") .with_server_name("localhost"); let server_addr = "127.0.0.1:8080".parse()?; let client = GreetingClient::connect(server_addr, config).await?; let response = client.greet(GreetRequest { name: "World".into() }).await?; println!("Server replied: {}", response.message); Ok(()) }
The generated client takes care of serialization, TLS, and backpressure while presenting an async function per RPC method.
Step 7: Build and run
Compile and execute as usual:
cargo build
cargo run
While you experiment, keep the reference example nearby:
ls examples/basic_greeting
# client.rs generated/ greeting.rpc.rs server.rs
Comparing your project with the example is a quick way to confirm the wiring matches what the CLI expects.
Where to go next
- Read the rpcnet-gen CLI guide for advanced flags such as
--server-only
,--client-only
, and custom output paths. - Explore the Concepts chapter for runtime fundamentals, server/client wiring, and streaming patterns.
Concepts
This chapter collects the fundamental ideas behind RpcNet: the runtime building blocks, how servers and clients are constructed, and the streaming patterns that sit on top of QUIC.
Runtime Building Blocks
Configuration (RpcConfig
)
RpcConfig
encapsulates the TLS artifacts, socket bindings, and optional
keep-alive settings shared by clients and servers.
#![allow(unused)] fn main() { use rpcnet::RpcConfig; let config = RpcConfig::new("certs/server.pem", "127.0.0.1:0") .with_key_path("certs/server-key.pem") .with_server_name("localhost") .with_keep_alive_interval(std::time::Duration::from_secs(30)); }
Keep-alive is optional; when enabled the interval is mirrored on both ends of the connection so heartbeats stay in sync.
Error Handling (RpcError
)
RpcError
differentiates between connection, stream, TLS, configuration, IO,
and serialization failures so callers can branch on the exact condition instead
of parsing strings:
#![allow(unused)] fn main() { match client.call("ping", vec![]).await { Ok(bytes) => println!("pong: {}", String::from_utf8_lossy(&bytes)), Err(rpcnet::RpcError::Timeout) => eprintln!("server took too long"), Err(other) => eprintln!("unhandled rpc error: {other}") } }
Serialization Strategy
Requests and responses travel as Vec<u8>
. Examples use bincode
for compact
frames, but any serialization format can be layered on top.
Concurrency Model
Each accepted QUIC connection runs inside its own Tokio task. Within that
connection, every RPC request is processed on another task so long-running
handlers never block unrelated work. Clients open a fresh bidirectional stream
per call while sharing a single connection behind an Arc
+ RwLock
.
Server Essentials
Creating the Server
#![allow(unused)] fn main() { use rpcnet::{RpcServer, RpcConfig}; let config = RpcConfig::new("certs/server.pem", "127.0.0.1:8080") .with_key_path("certs/server-key.pem") .with_server_name("localhost"); let mut server = RpcServer::new(config); }
Binding to port 0
lets the OS allocate a free port. Once bind()
succeeds the
chosen address is stored on server.socket_addr
.
Registering Unary Handlers
Handlers receive raw Vec<u8>
payloads and return serialized responses. The
closure executes inside a Tokio task, so async IO is allowed.
#![allow(unused)] fn main() { use rpcnet::{RpcError, RpcServer}; server.register("add", |params| async move { let (a, b): (i32, i32) = bincode::deserialize(¶ms) .map_err(RpcError::SerializationError)?; let sum = a + b; Ok(bincode::serialize(&sum)? ) }).await; }
Registering a method again overwrites the previous handler.
Registering Streaming Handlers
Streaming handlers consume a stream of request payloads and produce a stream of
Result<Vec<u8>, RpcError>
responses. Use async_stream::stream!
or
tokio_stream
helpers to build the return value.
#![allow(unused)] fn main() { use async_stream::stream; use futures::StreamExt; server.register_streaming("echo_stream", |mut reqs| async move { stream! { while let Some(payload) = reqs.next().await { yield Ok(payload); // echo back exactly what we received } } }).await; }
Binding and Starting
Binding consumes the TLS material supplied in RpcConfig
and returns an
s2n_quic::Server
that feeds into start
:
#![allow(unused)] fn main() { let quic_server = server.bind()?; println!("listening on {}", server.socket_addr.unwrap()); server.start(quic_server).await?; }
start
runs until the QUIC provider stops delivering connections (typically
when your process shuts down). Every accepted connection and stream is served
concurrently.
Graceful Shutdown
Wrap the start
future inside a tokio::select!
with your shutdown signal.
When accept()
yields None
the loop exits and the server terminates cleanly.
Client Essentials
Connecting
#![allow(unused)] fn main() { use rpcnet::{RpcClient, RpcConfig}; use std::net::SocketAddr; let config = RpcConfig::new("certs/ca.pem", "127.0.0.1:0") .with_server_name("localhost"); let server_addr: SocketAddr = "127.0.0.1:8080".parse().unwrap(); let client = RpcClient::connect(server_addr, config).await?; }
Client configuration mirrors the server TLS settings, including optional keep-alive.
Unary Calls
#![allow(unused)] fn main() { let payload = bincode::serialize(&(21, 21))?; let response = client.call("add", payload).await?; let result: i32 = bincode::deserialize(&response)?; assert_eq!(result, 42); }
Errors surface as RpcError
values. Timeouts honour the DEFAULT_TIMEOUT
constant (30 seconds normally, 2 seconds under cfg(test)
).
Concurrent Calls
Clone the client (internally Arc
) and issue calls in parallel. Each call opens
a new bidirectional stream on the shared connection.
#![allow(unused)] fn main() { use std::sync::Arc; use tokio::join; let client = Arc::new(client); let (a, b) = join!( client.clone().call("first", vec![]), client.clone().call("second", vec![]) ); }
Inspecting Request IDs
RpcClient
maintains an atomic next_id
. Incrementing it per call keeps
request/response pairs aligned. You rarely need to touch this directly, but it
aids traffic debugging.
Streaming Patterns
RpcNet exposes three streaming helpers built on top of QUIC bidirectional streams. Each frame is length-prefixed followed by the payload bytes.
Bidirectional (call_streaming
)
#![allow(unused)] fn main() { use futures::stream; use futures::StreamExt; let requests = stream::iter(vec![ b"hello".to_vec(), b"world".to_vec(), ]); let responses = client.call_streaming("chat", requests).await?; let mut responses = Box::pin(responses); while let Some(frame) = responses.next().await { println!("response: {:?}", frame?); } }
The client sends the method name first, then each payload, finishing with a 0
length frame to signal completion. Sending continues even as responses arrive;
upload and download directions are independent.
Server Streaming (call_server_streaming
)
Server streaming wraps call_streaming
and sends a single request frame before
yielding the response stream:
#![allow(unused)] fn main() { use futures::StreamExt; let stream = client.call_server_streaming("list_items", Vec::new()).await?; let mut stream = Box::pin(stream); while let Some(frame) = stream.next().await { println!("item: {:?}", frame?); } }
Client Streaming (call_client_streaming
)
Client streaming uploads many payloads and waits for an aggregated result.
#![allow(unused)] fn main() { use futures::stream; let uploads = stream::iter(vec![b"chunk-a".to_vec(), b"chunk-b".to_vec()]); let digest = client.call_client_streaming("upload", uploads).await?; println!("digest bytes: {digest:?}"); }
Implementing Streaming Handlers
On the server, build a response stream with async_stream::stream!
or
tokio_stream
helpers. Returning Err
from the response stream maps to a
generic error frame; encode richer error payloads yourself when necessary.
#![allow(unused)] fn main() { use async_stream::stream; use futures::StreamExt; server.register_streaming("uppercase", |mut reqs| async move { stream! { while let Some(bytes) = reqs.next().await { let mut owned = bytes.clone(); owned.make_ascii_uppercase(); yield Ok(owned); } } }).await; }
Cluster Management (v0.1.0+)
RpcNet provides built-in distributed systems support for building scalable clusters with automatic discovery and failover.
Architecture Components
NodeRegistry
Tracks all nodes in the cluster with their metadata (address, tags, status). Filters nodes by tags for heterogeneous worker pools (e.g., GPU workers, CPU workers).
#![allow(unused)] fn main() { use rpcnet::cluster::NodeRegistry; let registry = NodeRegistry::new(cluster); let gpu_workers = registry.nodes_with_tag("gpu").await; }
WorkerRegistry
Automatically discovers workers via gossip and provides load-balanced worker selection.
#![allow(unused)] fn main() { use rpcnet::cluster::{WorkerRegistry, LoadBalancingStrategy}; let registry = WorkerRegistry::new( cluster, LoadBalancingStrategy::LeastConnections ); registry.start().await; let worker = registry.select_worker(Some("role=worker")).await?; }
Load Balancing Strategies
- Round Robin: Even distribution across workers
- Random: Random selection for stateless workloads
- Least Connections: Routes to least-loaded worker (recommended)
Health Checking
Phi Accrual failure detector provides accurate, adaptive health monitoring:
#![allow(unused)] fn main() { use rpcnet::cluster::HealthChecker; let health = HealthChecker::new(cluster, config); health.start().await; // Automatically marks nodes as failed/recovered }
Gossip Protocol
RpcNet uses SWIM (Scalable Weakly-consistent Infection-style Process Group Membership Protocol) for:
- Automatic node discovery
- Failure detection propagation
- Cluster state synchronization
- Network partition detection
ClusterClient
High-level client that combines worker discovery and load balancing:
#![allow(unused)] fn main() { use rpcnet::cluster::{ClusterClient, WorkerRegistry, LoadBalancingStrategy}; let registry = Arc::new(WorkerRegistry::new( cluster, LoadBalancingStrategy::LeastConnections )); registry.start().await; let client = Arc::new(ClusterClient::new(registry, config)); // Call any worker in the pool let result = client.call_worker("compute", data, Some("role=worker")).await?; }
Complete Example
See the Cluster Example chapter for a complete walkthrough of building a distributed worker pool with automatic discovery, load balancing, and failover.
rpcnet-gen CLI
The rpcnet-gen
binary turns a Rust service definition (*.rpc.rs
) into the
client, server, and type modules consumed by your application. This chapter
covers installation, day-to-day usage, and automation patterns.
Installing
Starting with v0.1.0, the CLI is included by default with rpcnet. Install it once and reuse it across workspaces:
cargo install rpcnet
The CLI is always available - no feature flags needed!
Add --locked
in CI to guarantee reproducible dependency resolution.
Input Files at a Glance
Service definitions are ordinary Rust modules annotated with #[rpcnet::service]
.
For example:
#![allow(unused)] fn main() { // src/greeting.rpc.rs use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, Debug, Clone)] pub struct GreetRequest { pub name: String, } #[derive(Serialize, Deserialize, Debug, Clone)] pub struct GreetResponse { pub message: String, } #[rpcnet::service] pub trait Greeting { async fn greet(&self, request: GreetRequest) -> Result<GreetResponse, GreetingError>; } }
Every request/response/error type must be Serialize
/Deserialize
, and all
trait methods must be async fn
returning Result<T, E>
.
Basic Invocation
Run the generator whenever you change a service trait:
rpcnet-gen --input src/greeting.rpc.rs --output src/generated
A successful run prints the generated paths and writes the following structure:
src/generated/
βββ greeting/
βββ client.rs # GreetingClient with typed async methods
βββ mod.rs # Module exports and re-exports
βββ server.rs # GreetingServer + GreetingHandler trait
βββ types.rs # Request/response/error definitions
Import the module once and re-export whatever you need:
#![allow(unused)] fn main() { #[path = "generated/greeting/mod.rs"] mod greeting; use greeting::{client::GreetingClient, server::{GreetingHandler, GreetingServer}}; }
Command-Line Options
rpcnet-gen --help
surfaces all switches:
Generate RPC client and server code from service definitions
Usage: rpcnet-gen [OPTIONS] --input <INPUT>
Options:
-i, --input <INPUT> Input .rpc file (Rust source with service trait)
-o, --output <OUTPUT> Output directory for generated code [default: src/generated]
--server-only Generate only server code
--client-only Generate only client code
--types-only Generate only type definitions
-h, --help Print help
-V, --version Print version
Key behaviours:
- Omit
--output
to usesrc/generated
. The generator creates a lowercase subdirectory named after the service (Greeting
βgreeting/
). - Combine
--server-only
,--client-only
, and--types-only
to tailor the outputs. The implicitmod.rs
only re-exports files that were produced. - Passing mutually exclusive flags (e.g.
--server-only --client-only
) produces only the directories you asked for;types.rs
is skipped when either flag is present.
Regenerating Automatically
Manual rebuilds
Run the command by hand after touching a .rpc.rs
file. Consider wiring a
cargo alias
or a shell script so teammates can regenerate with a single
command.
With cargo watch
Install cargo-watch
and keep generated code up to date during development:
cargo install cargo-watch
cargo watch -w src/greeting.rpc.rs -x "run --bin rpcnet-gen -- --input src/greeting.rpc.rs --output src/generated"
Through build.rs
For projects that must guarantee generated code exists before compilation,
invoke the builder API from a build script (requires the codegen
feature in
[build-dependencies]
):
// build.rs fn main() { println!("cargo:rerun-if-changed=src/greeting.rpc.rs"); rpcnet::codegen::Builder::new() .input("src/greeting.rpc.rs") .output("src/generated") .build() .expect("Failed to generate RPC code"); }
Cargo reruns the script when the .rpc.rs
file changes, keeping the generated
modules in sync.
Working With Multiple Services
Generate several services in one go by running the CLI multiple times or by stacking inputs in the builder:
// build.rs fn main() { for service in ["rpc/user.rpc.rs", "rpc/billing.rpc.rs", "rpc/audit.rpc.rs"] { println!("cargo:rerun-if-changed={service}"); } rpcnet::codegen::Builder::new() .input("rpc/user.rpc.rs") .input("rpc/billing.rpc.rs") .input("rpc/audit.rpc.rs") .output("src/generated") .build() .expect("Failed to generate RPC code"); }
Each input produces a sibling directory under src/generated/
(user/
,
billing/
, audit/
).
Version-Control Strategy
Generated code is ordinary Rust and can be committed. Most teams either:
- Commit the
src/generated/**
tree so downstream crates build without the generator, or - Ignore the directory and require the CLI (or
build.rs
) to run during CI.
Pick a single approach and document it for contributors.
Troubleshooting
- Missing input file β the CLI exits with
Error: Input file 'β¦' does not exist
. Double-check the path and ensure the file is tracked in git so collaborators receive it. - Invalid trait β methods must be
async fn
and returnResult
. The parser reports an error pointing at the offending signature. - Serialization failures at runtime β make sure your request/response/error
types derive
Serialize
andDeserialize
and keep both client and server on the same crate version so layouts match.
With these workflows in place you can treat rpcnet-gen
like any other build
step: edit the .rpc.rs
trait, regenerate, and keep building.
Cluster Example
This chapter demonstrates building a distributed RPC cluster with automatic worker discovery, load balancing, and failure detection using RpcNet's built-in cluster features.
Architecture Overview
The cluster example showcases three main components working together:
ββββββββββββββββββββββββββββ
β Director β
β (Coordinator Node) β
β β
β - WorkerRegistry β
β - ClusterClient β
β - Load Balancing β
ββββββββββ¬ββββββββββββββββββ
β
Gossip Protocol (SWIM)
β
ββββββββββββββββββΌβββββββββββββββββ
β β
βββββββββΌβββββββββ ββββββββββΌββββββββ
β Worker A β β Worker B β
β β β β
β - Auto-join β β - Auto-join β
β - Tag: worker β β - Tag: worker β
β - Process tasksβ β - Process tasksβ
βββββββββββββββββββ βββββββββββββββββββ
Components
1. Director - Coordinator node that:
- Uses
WorkerRegistry
for automatic worker discovery - Uses
ClusterClient
for load-balanced request routing - Employs
LeastConnections
strategy by default - Monitors worker pool status
- Routes client requests to healthy workers
2. Workers - Processing nodes that:
- Join cluster automatically via gossip protocol
- Tag themselves with
role=worker
for discovery - Process compute tasks from clients
- Monitor cluster events (node joined/left/failed)
- Support simulated failures for testing
3. Client - Application that:
- Connects to director
- Gets worker assignment
- Establishes direct connection to worker
- Handles failover automatically
Why Use Built-in Cluster Features?
Compared to manual worker management patterns:
Manual Approach β:
- Custom
HashMap<Uuid, WorkerInfo>
for tracking - Manual round-robin selection logic
- Explicit RPC calls for worker registration
- Custom ping-based health checks
- ~200 lines of boilerplate code
Built-in Cluster β :
- Built-in
WorkerRegistry
+ClusterClient
- Multiple load balancing strategies (Round Robin, Random, Least Connections)
- Automatic discovery via SWIM gossip protocol
- Phi Accrual failure detection (accurate, adaptive)
- ~50 lines to set up
- 75% code reduction!
Running the Example
Prerequisites
Ensure test certificates exist:
ls certs/test_cert.pem certs/test_key.pem
All commands should be run from the project root directory.
Basic Setup
Open four terminals and run each component:
Terminal 1 - Director:
DIRECTOR_ADDR=127.0.0.1:61000 \
RUST_LOG=info \
cargo run --manifest-path examples/cluster/Cargo.toml --bin director
Terminal 2 - Worker A:
WORKER_LABEL=worker-a \
WORKER_ADDR=127.0.0.1:62001 \
DIRECTOR_ADDR=127.0.0.1:61000 \
RUST_LOG=info \
cargo run --manifest-path examples/cluster/Cargo.toml --bin worker
Terminal 3 - Worker B:
WORKER_LABEL=worker-b \
WORKER_ADDR=127.0.0.1:62002 \
DIRECTOR_ADDR=127.0.0.1:61000 \
RUST_LOG=info \
cargo run --manifest-path examples/cluster/Cargo.toml --bin worker
Terminal 4 - Client:
DIRECTOR_ADDR=127.0.0.1:61000 \
RUST_LOG=info \
cargo run --manifest-path examples/cluster/Cargo.toml --bin client
What You'll See
Director Output:
π― Starting Director at 127.0.0.1:61000
π Loading certificates from "../../certs/test_cert.pem"
β
Director registered itself in cluster
β
Cluster enabled - Director is now discoverable
π Load balancing strategy: LeastConnections
π Worker pool status: 2 workers available
- worker-a at 127.0.0.1:62001 (0 connections)
- worker-b at 127.0.0.1:62002 (0 connections)
π Director ready - listening on 127.0.0.1:61000
Worker Output:
π· Starting Worker 'worker-a' at 127.0.0.1:62001
π Binding server to 127.0.0.1:62001...
β
Server bound successfully
π Enabling cluster, connecting to director at 127.0.0.1:61000...
β
Cluster enabled, connected to director
π·οΈ Tagging worker with role=worker and label=worker-a...
β
Worker 'worker-a' joined cluster with role=worker
π Worker 'worker-a' is running and ready to handle requests
Client Output:
π‘ Starting Client - connecting to director at 127.0.0.1:61000
β
connected to director
π director assigned worker - establishing direct connection
β
direct connection established to worker
π€ creating request stream
π stream opened successfully, starting to consume responses
π¦ received token (sequence=1, text="token-1", total=1)
π¦ received token (sequence=2, text="token-2", total=2)
...
Testing Failure Scenarios
Simulated Worker Failures
Enable periodic failures to test automatic failover:
Worker with Failures:
WORKER_LABEL=worker-a \
WORKER_ADDR=127.0.0.1:62001 \
DIRECTOR_ADDR=127.0.0.1:61000 \
WORKER_FAILURE_ENABLED=true \ # Enable failure simulation
RUST_LOG=info \
cargo run --manifest-path examples/cluster/Cargo.toml --bin worker
Failure Cycle (~18 seconds):
- Run: 10 seconds of normal operation
- Warning: "β οΈ Simulating worker failure in 3 seconds..."
- Failed: 5 seconds in failed state - "π₯ Worker failed!"
- Recovery: "π Worker recovering..."
- Ready: "β Worker recovered and ready to serve!"
- Repeat
Client Behavior:
- Detects failure via error response
- Returns to director for new worker assignment
- Switches to healthy worker seamlessly
- Streaming continues with minimal interruption
Hard Kill Test
Test network-level failure detection:
# In a worker terminal, press Ctrl+C
Observe:
- Director detects failure via gossip protocol
WorkerRegistry
removes worker from pool- Client requests automatically route to remaining workers
- Zero downtime for ongoing operations
Worker Restart Test
After killing a worker, restart it to see re-discovery:
WORKER_LABEL=worker-a \
WORKER_ADDR=127.0.0.1:62001 \
DIRECTOR_ADDR=127.0.0.1:61000 \
RUST_LOG=info \
cargo run --manifest-path examples/cluster/Cargo.toml --bin worker
Observe:
- Worker automatically rejoins cluster
- Gossip spreads worker availability
- Director adds worker back to registry
- Client requests resume to all available workers
How It Works
1. Automatic Discovery
Workers don't manually register - they just join the cluster:
#![allow(unused)] fn main() { // Worker code (simplified) let cluster = ClusterMembership::new(config).await?; cluster.join(vec![director_addr]).await?; // Tag for discovery cluster.set_tag("role", "worker"); cluster.set_tag("label", worker_label); // That's it! Director discovers automatically via gossip }
2. Load Balancing
Director uses WorkerRegistry
for automatic load balancing:
#![allow(unused)] fn main() { // Director code let registry = Arc::new(WorkerRegistry::new( cluster, LoadBalancingStrategy::LeastConnections )); registry.start().await; // Automatically tracks workers and balances load }
3. Failure Detection
Phi Accrual algorithm provides accurate health monitoring:
- Adapts to network conditions
- Distinguishes slow nodes from failed nodes
- No false positives from temporary delays
- Automatic recovery when nodes return
4. Tag-Based Routing
Filter workers by capabilities:
#![allow(unused)] fn main() { // Get only GPU workers let gpu_worker = registry.select_worker(Some("gpu=true")).await?; // Get any worker let any_worker = registry.select_worker(Some("role=worker")).await?; }
Key Cluster Features Demonstrated
β Automatic Discovery
No manual registration needed - gossip protocol handles everything
β Load Balancing
Choose from:
- Round Robin: Even distribution
- Random: Stateless workload distribution
- Least Connections: Balance based on current load (recommended)
β Failure Detection
Phi Accrual algorithm provides accurate, adaptive health monitoring
β Tag-Based Routing
Route by worker capabilities (GPU, CPU, zone, etc.)
β Event Monitoring
Subscribe to cluster events:
NodeJoined
- New worker availableNodeLeft
- Worker gracefully departedNodeFailed
- Worker detected as failed
Configuration Options
Environment Variables
Director:
DIRECTOR_ADDR
- Bind address (default:127.0.0.1:61000
)RUST_LOG
- Log level (e.g.,info
,debug
)
Worker:
WORKER_LABEL
- Worker identifier (default:worker-1
)WORKER_ADDR
- Bind address (default:127.0.0.1:62001
)DIRECTOR_ADDR
- Director address (default:127.0.0.1:61000
)WORKER_FAILURE_ENABLED
- Enable failure simulation (default:false
)RUST_LOG
- Log level
Client:
DIRECTOR_ADDR
- Director address (default:127.0.0.1:61000
)RUST_LOG
- Log level
Load Balancing Strategies
#![allow(unused)] fn main() { use rpcnet::cluster::LoadBalancingStrategy; // Options: LoadBalancingStrategy::RoundRobin // Even distribution LoadBalancingStrategy::Random // Random selection LoadBalancingStrategy::LeastConnections // Pick least loaded (recommended) }
Cluster Configuration
#![allow(unused)] fn main() { use rpcnet::cluster::ClusterConfig; let config = ClusterConfig::default() .with_gossip_interval(Duration::from_secs(1)) .with_health_check_interval(Duration::from_secs(2)); }
Troubleshooting
Workers not discovered:
- Ensure director starts first (it's the seed node)
- Check firewall allows UDP for gossip
- Verify workers connect to correct director address
Requests failing:
- Check worker has
role=worker
tag - Verify compute handler is registered
- Check logs for connection errors
Slow failover:
- Adjust health check interval in config
- Tune Phi Accrual threshold
- Check network latency
Production Considerations
For production deployments:
- TLS Certificates: Use proper certificates, not test certs
- Monitoring: Integrate cluster events with your monitoring system
- Scaling: Add more workers dynamically as needed
- Persistence: Consider persisting cluster state if needed
- Security: Add authentication and authorization
- Network: Plan for network partitions and split-brain scenarios
Next Steps
- Try different load balancing strategies
- Add more workers dynamically
- Test network partition scenarios
- Add custom tags for routing (zone, GPU, etc.)
- Integrate with your application logic
For full source code, see examples/cluster/
in the repository.
Cluster Overview
RpcNet provides built-in support for building distributed RPC clusters with automatic service discovery, intelligent load balancing, and robust failure detection. This chapter introduces the core concepts and components of RpcNet's cluster architecture.
What is a Cluster?
A cluster in RpcNet is a group of interconnected nodes that work together to provide distributed RPC services. Nodes automatically discover each other, share information about their state, and coordinate to handle client requests efficiently.
Key Benefits
Automatic Discovery π
- No manual node registration required
- Nodes join and leave seamlessly
- Gossip protocol spreads information automatically
Intelligent Load Balancing βοΈ
- Multiple strategies (Round Robin, Random, Least Connections)
- Tracks active connections per node
- Prevents overload on individual nodes
Robust Failure Detection π
- Phi Accrual failure detection algorithm
- Adapts to network conditions
- Distinguishes between slow and failed nodes
Tag-Based Routing π·οΈ
- Route requests by node capabilities
- Filter by zone, hardware type, role, etc.
- Enables heterogeneous worker pools
Architecture Components
RpcNet's cluster architecture consists of several key components that work together:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Application Layer β
β (Your RPC handlers, business logic) β
ββββββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββ
β
ββββββββββββββββββββββββββΌβββββββββββββββββββββββββββββββββββββ
β ClusterClient β
β - High-level API for cluster operations β
β - Load-balanced request routing β
β - Efficient request routing β
ββββββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββ
β
β
βββββββββΌββββββββββ
β WorkerRegistry β
β - Tracks nodes β
β - Load balance β
β - Filter tags β
βββββββββ¬ββββββββββ
β
βββββββββΌββββββββββ
β NodeRegistry β
β - All nodes β
β - Health state β
β - Metadata β
βββββββββ¬ββββββββββ
β
βββββββββΌββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β ClusterMembership (SWIM) β
β - Gossip protocol for node discovery β
β - Phi Accrual failure detection β
β - Event notifications (NodeJoined/Left/Failed) β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
1. ClusterMembership (SWIM)
The foundation of RpcNet's cluster is the SWIM (Scalable Weakly-consistent Infection-style Process Group Membership) protocol. This provides:
- Gossip-based communication: Nodes periodically exchange information
- Failure detection: Phi Accrual algorithm detects node failures accurately
- Partition detection: Identifies network splits and handles them gracefully
- Event system: Notifies about node state changes
Key characteristics:
- Eventually consistent membership information
- Scales to thousands of nodes
- Low network overhead (UDP-based gossip)
- Handles network partitions and node churn
2. NodeRegistry
The NodeRegistry maintains a comprehensive view of all nodes in the cluster:
#![allow(unused)] fn main() { use rpcnet::cluster::{NodeRegistry, ClusterMembership}; let registry = Arc::new(NodeRegistry::new(cluster)); registry.start().await; // Get all nodes let nodes = registry.nodes().await; // Subscribe to cluster events let mut events = registry.subscribe(); while let Some(event) = events.recv().await { match event { ClusterEvent::NodeJoined(node) => println!("Node joined: {}", node.id), ClusterEvent::NodeLeft(node) => println!("Node left: {}", node.id), ClusterEvent::NodeFailed(node) => println!("Node failed: {}", node.id), } } }
Features:
- Real-time node tracking
- Metadata storage per node
- Event subscription for state changes
- Thread-safe access via
Arc
3. WorkerRegistry
The WorkerRegistry extends NodeRegistry to track worker nodes specifically:
#![allow(unused)] fn main() { use rpcnet::cluster::{WorkerRegistry, LoadBalancingStrategy}; let registry = Arc::new(WorkerRegistry::new( cluster, LoadBalancingStrategy::LeastConnections )); registry.start().await; // Select a worker (with optional tag filter) let worker = registry.select_worker(Some("role=worker")).await?; println!("Selected worker: {} at {}", worker.label, worker.addr); }
Features:
- Filters nodes by tags (e.g.,
role=worker
) - Applies load balancing strategy
- Tracks active connections per worker
- Automatic removal of failed workers
4. ClusterClient
The ClusterClient provides a high-level API that combines all components:
#![allow(unused)] fn main() { use rpcnet::cluster::{ClusterClient, ClusterClientConfig}; let client = Arc::new(ClusterClient::new(registry, config)); // Call any worker matching the filter let result = client.call_worker("compute", request, Some("role=worker")).await?; }
Features:
- Automatic worker selection
- Load-balanced request routing
- Efficient connection management
- Retry logic for failed requests
When to Use Clusters
RpcNet clusters are ideal for scenarios where you need:
β Good Use Cases
Distributed Workload Processing
- Multiple workers processing tasks in parallel
- Automatic load distribution across workers
- Example: Video transcoding farm, data processing pipeline
High Availability Services
- Services that must tolerate node failures
- Automatic failover to healthy nodes
- Example: API gateway, microservices mesh
Dynamic Scaling
- Add/remove nodes based on load
- Automatic discovery of new capacity
- Example: Auto-scaling worker pools, elastic compute clusters
Heterogeneous Worker Pools
- Different node types (GPU vs CPU, different zones)
- Tag-based routing to appropriate nodes
- Example: ML inference with GPU/CPU workers, multi-region deployments
β When NOT to Use Clusters
Single Node Deployments
- If you only have one server, use direct RPC instead
- Cluster overhead isn't justified
Strict Consistency Requirements
- SWIM provides eventual consistency
- Not suitable for strong consistency needs (use consensus protocols like Raft)
Low-Latency Single-Hop
- Direct RPC is faster for single client-server communication
- Cluster adds minimal overhead, but every bit counts for ultra-low latency
Cluster Modes
RpcNet supports different cluster deployment patterns:
1. Coordinator-Worker Pattern
One or more coordinator nodes route requests to worker nodes:
ββββββββββββββββ
β Coordinator β
β (Director) β
ββββββββ¬ββββββββ
β
βββββββββββββΌββββββββββββ
β β β
βββββΌββββ ββββΌβββββ ββββΌβββββ
βWorker β βWorker β βWorker β
βββββββββ βββββββββ βββββββββ
Use when:
- Clients don't need to track worker pool
- Centralized routing and monitoring
- Example: Load balancer + worker pool
2. Peer-to-Peer Pattern
All nodes are equal and can route to each other:
ββββββββ ββββββββ
β Node βββββββ€ Node β
βββββ¬βββ ββββ¬ββββ
β β
βββββββ¬ββββββ
βββββΌββββ
β Node β
βββββββββ
Use when:
- No single point of coordination needed
- Nodes serve both as clients and servers
- Example: Distributed cache, gossip-based database
3. Hierarchical Pattern
Multiple layers with different roles:
ββββββββββ
β Master β
βββββ¬βββββ
β
ββββββββΌβββββββ
βββββΌββββ βββββΌββββ
βRegion β βRegion β
βLeader β βLeader β
βββββ¬ββββ βββββ¬ββββ
β β
βββββΌββββ βββββΌββββ
βWorker β βWorker β
βββββββββ βββββββββ
Use when:
- Multi-region deployments
- Different node tiers (leaders, workers, storage)
- Example: Global CDN, multi-tenant systems
Performance Characteristics
RpcNet clusters maintain high performance while providing distributed coordination:
Throughput
- 172K+ requests/second in benchmarks
- Minimal overhead compared to direct RPC
- Scales linearly with number of workers
Latency
- < 0.1ms additional latency for load balancing
- Efficient connection handling reduces overhead
- QUIC's 0-RTT mode for warm connections
Scalability
- Tested with 1000+ nodes in gossip cluster
- Sub-linear gossip overhead (O(log N) per node)
- Configurable gossip intervals for tuning
Resource Usage
- Low memory: ~10KB per tracked node
- Low CPU: < 1% for gossip maintenance
- Low network: ~1KB/s per node for gossip
Next Steps
Now that you understand the cluster architecture, you can:
- Follow the Tutorial - Build your first cluster step-by-step
- Learn About Discovery - Deep dive into SWIM gossip protocol
- Explore Load Balancing - Choose the right strategy
- Understand Health Checking - How Phi Accrual works
- Handle Failures - Partition detection and recovery
Or jump directly to the Cluster Example to see a complete working system.
Cluster Tutorial
This hands-on tutorial guides you through building a complete distributed RPC cluster from scratch. You'll create a coordinator (director) that manages a pool of worker nodes, with automatic discovery, load balancing, and failure handling.
What You'll Build
By the end of this tutorial, you'll have:
- Director: Coordinator node that manages worker discovery and routes client requests
- Workers: Processing nodes that join automatically and handle compute tasks
- Client: Application that connects through the director and handles failover
- Failure Testing: Simulate worker failures and observe automatic recovery
Time: ~30 minutes
Difficulty: Intermediate
Prerequisites
1. Install RpcNet
cargo install rpcnet
This installs both the library and the rpcnet-gen
CLI tool.
2. Create Test Certificates
RpcNet requires TLS certificates. For development:
mkdir certs
cd certs
# Generate self-signed certificate
openssl req -x509 -newkey rsa:4096 -nodes \
-keyout test_key.pem -out test_cert.pem \
-days 365 -subj "/CN=localhost"
cd ..
3. Create Project Structure
cargo new --bin cluster_tutorial
cd cluster_tutorial
# Add RpcNet dependency
cargo add rpcnet --features cluster
cargo add tokio --features full
cargo add anyhow
Your Cargo.toml
should include:
[dependencies]
rpcnet = { version = "0.2", features = ["cluster"] }
tokio = { version = "1", features = ["full"] }
anyhow = "1"
Step 1: Define the RPC Interface
Create compute.rpc.rs
to define the worker interface:
#![allow(unused)] fn main() { use rpcnet::prelude::*; #[rpc_trait] pub trait ComputeService { async fn process_task(&self, task_id: String, data: Vec<u8>) -> Result<ComputeResult>; } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ComputeResult { pub task_id: String, pub result: Vec<u8>, pub worker_label: String, } }
Generate code:
rpcnet-gen --input compute.rpc.rs --output src/generated
This creates src/generated/compute_service.rs
with client and server stubs.
Step 2: Implement the Worker
Create src/bin/worker.rs
:
use anyhow::Result; use rpcnet::prelude::*; use rpcnet::cluster::{ClusterMembership, ClusterConfig}; use std::sync::Arc; use std::env; mod generated; use generated::compute_service::*; struct WorkerHandler { label: String, } #[rpc_impl] impl ComputeService for WorkerHandler { async fn process_task(&self, task_id: String, data: Vec<u8>) -> Result<ComputeResult> { println!("π [{}] Processing task: {}", self.label, task_id); // Simulate work tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; // Return result with worker identity Ok(ComputeResult { task_id, result: data, // Echo data for demo worker_label: self.label.clone(), }) } } #[tokio::main] async fn main() -> Result<()> { env_logger::init(); // Get configuration from environment let worker_label = env::var("WORKER_LABEL").unwrap_or_else(|_| "worker-1".to_string()); let worker_addr = env::var("WORKER_ADDR").unwrap_or_else(|_| "127.0.0.1:62001".to_string()); let director_addr = env::var("DIRECTOR_ADDR").unwrap_or_else(|_| "127.0.0.1:61000".to_string()); println!("π· Starting Worker '{}' at {}", worker_label, worker_addr); // Load certificates let cert = std::fs::read("certs/test_cert.pem")?; let key = std::fs::read("certs/test_key.pem")?; // Create RPC server let config = ServerConfig::builder() .with_cert_and_key(cert, key)? .build(); let mut server = Server::new(config); // Register compute handler let handler = Arc::new(WorkerHandler { label: worker_label.clone(), }); server.register_service(handler); // Bind server println!("π Binding server to {}...", worker_addr); server.bind(&worker_addr).await?; println!("β Server bound successfully"); // Enable cluster and join println!("π Enabling cluster, connecting to director at {}...", director_addr); let cluster_config = ClusterConfig::default() .with_bind_addr(worker_addr.parse()?); let cluster = server.enable_cluster(cluster_config).await?; cluster.join(vec![director_addr.parse()?]).await?; println!("β Cluster enabled, connected to director"); // Tag worker for discovery println!("π·οΈ Tagging worker with role=worker and label={}...", worker_label); cluster.set_tag("role", "worker"); cluster.set_tag("label", &worker_label); println!("β Worker '{}' joined cluster with role=worker", worker_label); println!("π Worker '{}' is running and ready to handle requests", worker_label); // Run server server.run().await?; Ok(()) }
Step 3: Implement the Director
Create src/bin/director.rs
:
use anyhow::Result; use rpcnet::prelude::*; use rpcnet::cluster::{ ClusterMembership, ClusterConfig, WorkerRegistry, LoadBalancingStrategy, ClusterClient, ClusterClientConfig }; use std::sync::Arc; use std::env; mod generated; use generated::compute_service::*; #[rpc_trait] pub trait DirectorService { async fn get_worker(&self) -> Result<String>; } struct DirectorHandler { registry: Arc<WorkerRegistry>, } #[rpc_impl] impl DirectorService for DirectorHandler { async fn get_worker(&self) -> Result<String> { println!("π¨ Client requesting worker assignment"); // Select worker using registry let worker = self.registry .select_worker(Some("role=worker")) .await .map_err(|e| anyhow::anyhow!("No workers available: {}", e))?; println!("β Assigned worker: {} at {}", worker.label, worker.addr); Ok(worker.addr.to_string()) } } #[tokio::main] async fn main() -> Result<()> { env_logger::init(); let director_addr = env::var("DIRECTOR_ADDR") .unwrap_or_else(|_| "127.0.0.1:61000".to_string()); println!("π― Starting Director at {}", director_addr); // Load certificates println!("π Loading certificates from certs/"); let cert = std::fs::read("certs/test_cert.pem")?; let key = std::fs::read("certs/test_key.pem")?; // Create server let config = ServerConfig::builder() .with_cert_and_key(cert, key)? .build(); let mut server = Server::new(config); // Enable cluster first let cluster_config = ClusterConfig::default() .with_bind_addr(director_addr.parse()?); let cluster = server.enable_cluster(cluster_config).await?; println!("β Director registered itself in cluster"); println!("β Cluster enabled - Director is now discoverable"); // Create worker registry with load balancing let registry = Arc::new(WorkerRegistry::new( cluster, LoadBalancingStrategy::LeastConnections )); registry.start().await; println!("π Load balancing strategy: LeastConnections"); // Register director service let handler = Arc::new(DirectorHandler { registry: registry.clone(), }); server.register_service(handler); // Bind and run server.bind(&director_addr).await?; // Monitor worker pool tokio::spawn({ let registry = registry.clone(); async move { loop { tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; let workers = registry.workers().await; println!("π Worker pool status: {} workers available", workers.len()); for worker in workers { println!(" - {} at {} ({} connections)", worker.label, worker.addr, worker.active_connections); } } } }); println!("π Director ready - listening on {}", director_addr); server.run().await?; Ok(()) }
Step 4: Implement the Client
Create src/bin/client.rs
:
use anyhow::Result; use rpcnet::prelude::*; use std::env; mod generated; use generated::compute_service::*; use generated::director_service::*; #[tokio::main] async fn main() -> Result<()> { env_logger::init(); let director_addr = env::var("DIRECTOR_ADDR") .unwrap_or_else(|_| "127.0.0.1:61000".to_string()); println!("π‘ Starting Client - connecting to director at {}", director_addr); // Load certificate for TLS let cert = std::fs::read("certs/test_cert.pem")?; let config = ClientConfig::builder() .with_server_cert(cert)? .build(); // Connect to director let director_client = DirectorClient::connect(&director_addr, config.clone()).await?; println!("β Connected to director"); // Main loop: get worker, process tasks, handle failures let mut task_counter = 0; loop { // Get worker assignment from director println!("π Asking director for worker assignment"); let worker_addr = match director_client.get_worker().await { Ok(addr) => { println!("π Director assigned worker at {}", addr); addr } Err(e) => { println!("β Failed to get worker: {}", e); tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; continue; } }; // Connect to worker directly println!("β Establishing direct connection to worker"); let worker_client = match ComputeClient::connect(&worker_addr, config.clone()).await { Ok(client) => { println!("β Direct connection established"); client } Err(e) => { println!("β Failed to connect to worker: {}", e); continue; } }; // Process tasks until worker fails loop { task_counter += 1; let task_id = format!("task-{}", task_counter); let data = format!("data-{}", task_counter).into_bytes(); println!("π€ Sending task: {}", task_id); match worker_client.process_task(task_id.clone(), data).await { Ok(result) => { println!("β Task {} completed by worker: {}", result.task_id, result.worker_label); // Wait before next task tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; } Err(e) => { println!("β οΈ Worker failed: {} - returning to director", e); break; // Get new worker from director } } } } }
Step 5: Update Cargo.toml
Add the binary definitions to Cargo.toml
:
[[bin]]
name = "director"
path = "src/bin/director.rs"
[[bin]]
name = "worker"
path = "src/bin/worker.rs"
[[bin]]
name = "client"
path = "src/bin/client.rs"
Also add the generated module to src/lib.rs
:
#![allow(unused)] fn main() { pub mod generated; }
Step 6: Run the Cluster
Open four terminals and run each component:
Terminal 1: Start Director
DIRECTOR_ADDR=127.0.0.1:61000 \
RUST_LOG=info \
cargo run --bin director
Wait for: π Director ready - listening on 127.0.0.1:61000
Terminal 2: Start Worker A
WORKER_LABEL=worker-a \
WORKER_ADDR=127.0.0.1:62001 \
DIRECTOR_ADDR=127.0.0.1:61000 \
RUST_LOG=info \
cargo run --bin worker
Wait for: π Worker 'worker-a' is running and ready to handle requests
Terminal 3: Start Worker B
WORKER_LABEL=worker-b \
WORKER_ADDR=127.0.0.1:62002 \
DIRECTOR_ADDR=127.0.0.1:61000 \
RUST_LOG=info \
cargo run --bin worker
Wait for: π Worker 'worker-b' is running and ready to handle requests
Terminal 4: Run Client
DIRECTOR_ADDR=127.0.0.1:61000 \
RUST_LOG=info \
cargo run --bin client
Step 7: Observe the System
Director Output
π― Starting Director at 127.0.0.1:61000
π Loading certificates from certs/
β
Director registered itself in cluster
β
Cluster enabled - Director is now discoverable
π Load balancing strategy: LeastConnections
π Director ready - listening on 127.0.0.1:61000
π Worker pool status: 2 workers available
- worker-a at 127.0.0.1:62001 (0 connections)
- worker-b at 127.0.0.1:62002 (0 connections)
π¨ Client requesting worker assignment
β
Assigned worker: worker-a at 127.0.0.1:62001
Worker Output
π· Starting Worker 'worker-a' at 127.0.0.1:62001
π Binding server to 127.0.0.1:62001...
β
Server bound successfully
π Enabling cluster, connecting to director at 127.0.0.1:61000...
β
Cluster enabled, connected to director
π·οΈ Tagging worker with role=worker and label=worker-a...
β
Worker 'worker-a' joined cluster with role=worker
π Worker 'worker-a' is running and ready to handle requests
π [worker-a] Processing task: task-1
π [worker-a] Processing task: task-2
Client Output
π‘ Starting Client - connecting to director at 127.0.0.1:61000
β
Connected to director
π Asking director for worker assignment
π Director assigned worker at 127.0.0.1:62001
β
Establishing direct connection to worker
β
Direct connection established
π€ Sending task: task-1
β
Task task-1 completed by worker: worker-a
π€ Sending task: task-2
β
Task task-2 completed by worker: worker-a
Step 8: Test Failure Handling
Scenario 1: Kill a Worker
In Worker A terminal, press Ctrl+C to kill it.
Observe:
- Director detects failure via gossip:
Node worker-a failed
- Director updates worker pool:
π Worker pool status: 1 workers available
- Client detects error:
β οΈ Worker failed - returning to director
- Client gets new worker:
π Director assigned worker at 127.0.0.1:62002
- Tasks continue on Worker B with no data loss
Scenario 2: Restart Worker
Restart Worker A:
WORKER_LABEL=worker-a \
WORKER_ADDR=127.0.0.1:62001 \
DIRECTOR_ADDR=127.0.0.1:61000 \
RUST_LOG=info \
cargo run --bin worker
Observe:
- Worker rejoins automatically
- Gossip spreads availability
- Director adds back to pool:
π Worker pool status: 2 workers available
- Future client requests can use either worker
What You Learned
Congratulations! You've built a complete distributed RPC cluster. You now understand:
β
Automatic Discovery: Workers join via gossip, no manual registration
β
Load Balancing: Director uses LeastConnections strategy automatically
β
Failure Detection: Gossip protocol detects and handles node failures
β
Client Failover: Clients handle worker failures gracefully
β
Tag-Based Routing: Filter workers by role (role=worker
)
Next Steps
Add More Workers
Scale up by adding more workers with different labels:
WORKER_LABEL=worker-c \
WORKER_ADDR=127.0.0.1:62003 \
DIRECTOR_ADDR=127.0.0.1:61000 \
cargo run --bin worker
Try Different Load Balancing
Change the strategy in director.rs
:
#![allow(unused)] fn main() { LoadBalancingStrategy::RoundRobin // Even distribution LoadBalancingStrategy::Random // Random selection LoadBalancingStrategy::LeastConnections // Pick least loaded (default) }
Add Custom Tags
Tag workers by capability:
#![allow(unused)] fn main() { cluster.set_tag("gpu", "true"); cluster.set_tag("zone", "us-west"); }
Then filter in client:
#![allow(unused)] fn main() { registry.select_worker(Some("gpu=true")).await?; }
Monitor Cluster Events
Subscribe to events in director or workers:
#![allow(unused)] fn main() { let mut events = cluster.subscribe(); while let Some(event) = events.recv().await { match event { ClusterEvent::NodeJoined(node) => println!("Node joined: {:?}", node), ClusterEvent::NodeLeft(node) => println!("Node left: {:?}", node), ClusterEvent::NodeFailed(node) => println!("Node failed: {:?}", node), } } }
Further Reading
- Discovery - Learn how SWIM gossip protocol works
- Load Balancing - Deep dive into strategies
- Health Checking - Understand Phi Accrual algorithm
- Failure Handling - Advanced partition detection
Or explore the Complete Cluster Example with streaming and advanced features.
Automatic Discovery
RpcNet uses the SWIM (Scalable Weakly-consistent Infection-style Process Group Membership) protocol for automatic node discovery. This chapter explains how nodes find each other without central coordination or manual registration.
How Discovery Works
The Problem
In distributed systems, you need to know:
- Which nodes are currently alive?
- Which nodes just joined?
- Which nodes have failed or left?
Traditional solutions have limitations:
- Centralized registry: Single point of failure
- Broadcast: Doesn't scale (O(NΒ²) messages)
- Heartbeats: Network overhead grows with cluster size
The SWIM Solution
SWIM provides scalable membership with constant overhead per node:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Node A discovers new nodes through gossip β
β without contacting every node in the cluster β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Node A Node B Node C
β β β
β 1. Ping (health) β β
ββββββββββββββββββββββββββΊβ β
β β β
β 2. Ack + Gossip β β
βββββββββββββββββββββββββββ€ β
β (includes info β β
β about Node C) β β
β β β
β 3. Now A knows C β β
β exists without β β
β direct contact! β β
β β β
βββββββββββββββ¬ββββββββββββ΄ββββββββββββββββββββββββββ
β
Information spreads
exponentially fast
SWIM Protocol Basics
1. Gossip-Based Communication
Nodes periodically exchange information with random peers:
#![allow(unused)] fn main() { // Simplified gossip cycle (every 1 second by default) loop { // Pick random node let peer = select_random_node(); // Send health check + gossip payload let gossip = GossipMessage { sender: my_node_id, members: my_known_members.clone(), incarnation: my_incarnation, }; peer.ping(gossip).await?; // Receive ack + peer's gossip let ack = receive_ack().await?; merge_member_information(ack.members); tokio::time::sleep(Duration::from_secs(1)).await; } }
Key properties:
- Constant overhead per node: O(1) messages per cycle
- Information spreads exponentially: O(log N) time
- No single point of failure
- Works with network partitions
2. Three Node States
SWIM tracks nodes in three states:
#![allow(unused)] fn main() { pub enum NodeState { Alive, // Node is healthy and responding Suspect, // Node might be failed (under investigation) Failed, // Node confirmed failed } }
State transitions:
ββββββββββββββββββββββββββββββββββββββββ
β β
β Join cluster β Gossip confirms alive
β β
ββββββΌββββββ No response after 3 pings βββΌβββββββ
β Alive βββββββββββββββββββββββββββββΊ βSuspect β
ββββββ¬ββββββ βββββ¬βββββ
β β
β Voluntary leave β Confirmed by multiple nodes
β β or timeout
β βββββΌβββββ
βββββββββββββββββββββββββββββββββββββΊβ Failed β
ββββββββββ
3. Failure Detection Protocol
SWIM uses indirect probing to avoid false positives:
Direct Probe (normal case):
Node A Node B
β β
β 1. Ping β
ββββββββββββββββββββββββΊβ
β β
β 2. Ack β
βββββββββββββββββββββββββ€
β β
β B is alive β β
Indirect Probe (when direct fails):
Node A Node C Node B
β β β
β 1. Ping (timeout) β β
ββββββββββββββββββββββXββ€ β
β β β
β 2. Ask C to probe B β β
ββββββββββββββββββββββββΊβ β
β β 3. Ping β
β ββββββββββββββββββββββββΊβ
β β β
β β 4. Ack β
β βββββββββββββββββββββββββ€
β 5. B is alive via C β β
βββββββββββββββββββββββββ€ β
β β β
β B is alive β β β
This prevents false positives from temporary network issues.
RpcNet Implementation
Joining a Cluster
When a node starts, it joins by contacting one or more seed nodes:
#![allow(unused)] fn main() { use rpcnet::cluster::{ClusterMembership, ClusterConfig}; // Create cluster membership let cluster_config = ClusterConfig::default() .with_bind_addr("0.0.0.0:7946".parse()?); let cluster = ClusterMembership::new(cluster_config).await?; // Join via seed nodes (directors, known workers, etc.) let seeds = vec![ "director.example.com:7946".parse()?, "worker-1.example.com:7946".parse()?, ]; cluster.join(seeds).await?; }
What happens during join:
- Contact seed nodes: Node sends join request to all seeds
- Receive member list: Seed responds with known cluster members
- Merge member info: Node learns about entire cluster
- Start gossip: Node begins exchanging info with all members
- Spread join event: Other nodes learn about new member via gossip
Time to full discovery: ~O(log N) gossip cycles (typically 2-5 seconds)
Tagging Nodes
Nodes can advertise capabilities via tags:
#![allow(unused)] fn main() { // Tag worker with role and capabilities cluster.set_tag("role", "worker"); cluster.set_tag("label", "worker-gpu-1"); cluster.set_tag("gpu", "true"); cluster.set_tag("zone", "us-west-2a"); cluster.set_tag("memory", "64GB"); }
Tags are gossiped to all nodes, enabling:
- Service discovery (find all nodes with
role=worker
) - Capability-based routing (find nodes with
gpu=true
) - Zone-aware load balancing (prefer nodes in
zone=us-west-2a
)
Subscribing to Events
Monitor cluster changes in real-time:
#![allow(unused)] fn main() { use rpcnet::cluster::ClusterEvent; let mut events = cluster.subscribe(); while let Some(event) = events.recv().await { match event { ClusterEvent::NodeJoined(node) => { println!("New node: {} at {}", node.id, node.addr); println!("Tags: {:?}", node.tags); } ClusterEvent::NodeLeft(node) => { println!("Node left gracefully: {}", node.id); } ClusterEvent::NodeFailed(node) => { println!("Node failed: {}", node.id); // Take action: remove from pool, alert monitoring, etc. } } } }
Gossip Internals
Gossip Message Structure
Each gossip message contains:
#![allow(unused)] fn main() { struct GossipMessage { // Sender identification sender_id: Uuid, sender_addr: SocketAddr, incarnation: u64, // Anti-entropy counter // Member information members: Vec<MemberInfo>, // Piggyback information events: Vec<ClusterEvent>, } struct MemberInfo { id: Uuid, addr: SocketAddr, state: NodeState, incarnation: u64, tags: HashMap<String, String>, last_seen: SystemTime, } }
Gossip Cycle
Every gossip interval (default: 1 second):
- Select target: Pick random node from member list
- Prepare message: Collect recent events and member updates
- Send ping: UDP datagram with gossip payload
- Wait for ack: Timeout after 500ms (configurable)
- Merge information: Update local member list with received data
- Detect failures: Check for nodes that haven't responded
Information Spread Speed
With N nodes and gossip interval T:
- 1 node knows: T seconds (initial)
- 2 nodes know: 2T seconds (1st gossip)
- 4 nodes know: 3T seconds (2nd gossip)
- 8 nodes know: 4T seconds (3rd gossip)
- N nodes know: (logβ N) Γ T seconds
Example: 1000-node cluster, 1-second interval:
- Full propagation: ~10 seconds (logβ 1000 β 10)
Advanced Features
Incarnation Numbers
Each node maintains an incarnation counter to handle:
Problem: Node A suspects Node B is failed, but B is actually alive.
Solution: B increments its incarnation number and gossips "I'm alive with incarnation N+1". This overrides stale failure suspicion.
#![allow(unused)] fn main() { // Node B refutes failure suspicion if cluster.is_suspected() { cluster.increment_incarnation(); cluster.broadcast_alive(); } }
Anti-Entropy
Periodically, nodes perform full state synchronization to:
- Fix inconsistencies from packet loss
- Recover from network partitions
- Ensure eventual consistency
#![allow(unused)] fn main() { // Every 10 gossip cycles, do full sync with random node if cycle_count % 10 == 0 { let peer = select_random_node(); let full_state = get_all_members(); peer.sync(full_state).await?; } }
Partition Detection
SWIM can detect network partitions:
Before partition: After partition:
Cluster Cluster A | Cluster B
β β | β
βββββββΌββββββ βββββββΌββββββ|βββββββΌββββββ
A B C A B || C D
β β β β β || β β
βββββββΌββββββ βββββββ |βββββββ
D |
SPLIT!
Detection: Nodes in partition A can't reach nodes in partition B after multiple indirect probes.
Handling:
- Each partition continues operating independently
- When partition heals, gossip merges the views
- Application must handle split-brain scenarios
Configuration
Tuning Gossip Parameters
#![allow(unused)] fn main() { use rpcnet::cluster::ClusterConfig; use std::time::Duration; let config = ClusterConfig::default() .with_bind_addr("0.0.0.0:7946".parse()?) .with_gossip_interval(Duration::from_secs(1)) // How often to gossip .with_probe_timeout(Duration::from_millis(500)) // Ping timeout .with_indirect_probes(3) // How many indirect probes .with_suspicion_timeout(Duration::from_secs(5)) // Suspect β Failed timeout .with_gossip_fanout(3); // How many nodes to gossip to cluster = ClusterMembership::new(config).await?; }
Tuning Guidelines
Small clusters (< 10 nodes):
- Longer intervals (2-3 seconds)
- Faster timeouts (200ms)
- Lower fanout (1-2 nodes)
Medium clusters (10-100 nodes):
- Default settings (1 second, 500ms, 3 fanout)
Large clusters (100-1000 nodes):
- Shorter intervals (500ms)
- More indirect probes (5+)
- Higher fanout (5-7 nodes)
Very large clusters (1000+ nodes):
- Consider hierarchical clustering
- Adjust suspicion timeout upward
- Use regional seed nodes
Failure Scenarios
Temporary Network Glitch
Node A pings B β timeout (network glitch)
Node A β Suspect B
Node A asks C to probe B
Node C β B responds β
Node A β B is Alive (false alarm avoided)
Result: No false positive due to indirect probing.
Actual Node Failure
Node A pings B β timeout
Node A β Suspect B
Node A asks C, D, E to probe B β all timeout
Suspicion timeout expires (5 seconds)
Node A β B is Failed
Gossip spreads: B failed
All nodes remove B from active pool
Result: B marked failed within ~6 seconds (1s ping + 5s suspicion).
Network Partition
Partition occurs: {A, B} | {C, D}
In partition {A, B}:
- A and B communicate normally
- C and D marked as Failed
In partition {C, D}:
- C and D communicate normally
- A and B marked as Failed
Partition heals:
- Gossip exchanges full state
- All nodes marked Alive again
- Incarnation numbers resolve conflicts
Result: Both partitions continue operating; merge when healed.
Best Practices
1. Use Multiple Seed Nodes
#![allow(unused)] fn main() { // β Good: Multiple seeds for reliability let seeds = vec![ "seed-1.cluster.local:7946".parse()?, "seed-2.cluster.local:7946".parse()?, "seed-3.cluster.local:7946".parse()?, ]; // β Bad: Single seed (single point of failure) let seeds = vec!["seed-1.cluster.local:7946".parse()?]; }
2. Monitor Cluster Events
#![allow(unused)] fn main() { // Log all cluster changes for debugging tokio::spawn(async move { let mut events = cluster.subscribe(); while let Some(event) = events.recv().await { log::info!("Cluster event: {:?}", event); metrics.record_cluster_event(&event); } }); }
3. Tag Nodes with Rich Metadata
#![allow(unused)] fn main() { // Provide detailed tags for routing decisions cluster.set_tag("role", "worker"); cluster.set_tag("version", env!("CARGO_PKG_VERSION")); cluster.set_tag("zone", get_availability_zone()); cluster.set_tag("instance_type", "m5.xlarge"); cluster.set_tag("capabilities", "gpu,video-encode"); }
4. Handle Partition Detection
#![allow(unused)] fn main() { // Detect partitions and alert let mut events = cluster.subscribe(); while let Some(event) = events.recv().await { if let ClusterEvent::PartitionDetected = event { alert_ops_team("Network partition detected!"); enable_read_only_mode(); // Prevent split-brain writes } } }
5. Graceful Shutdown
#![allow(unused)] fn main() { // Leave cluster gracefully when shutting down cluster.leave().await?; // This tells other nodes "I'm leaving intentionally" // rather than waiting for failure detection timeout }
Comparison to Other Protocols
Feature | SWIM (RpcNet) | Raft | Consul | Kubernetes |
---|---|---|---|---|
Consistency | Eventual | Strong | Strong | Eventual |
Failure Detection | Phi Accrual | Leader heartbeat | Gossip | kubelet heartbeat |
Scalability | 1000+ nodes | ~10 nodes | 100s of nodes | 1000s of nodes |
Partition Handling | Both sides live | Majority only | Both sides live | Both sides live |
Network Overhead | O(1) per node | O(N) from leader | O(1) per node | O(1) per node |
Setup Complexity | Low | Medium | Medium | High |
When to use SWIM:
- Large clusters (100+ nodes)
- Partition tolerance required
- Eventual consistency acceptable
- Decentralized architecture preferred
When NOT to use SWIM:
- Strong consistency required β Use Raft
- Small clusters (< 5 nodes) β Direct RPC simpler
- Centralized control desired β Use coordinator pattern
Troubleshooting
Nodes Not Discovering
Symptom: Workers join but director doesn't see them.
Debug:
#![allow(unused)] fn main() { // Enable debug logging RUST_LOG=rpcnet::cluster=debug cargo run // Check what nodes are known let members = cluster.members().await; println!("Known members: {:?}", members); }
Common causes:
- Firewall blocking UDP gossip port
- Wrong seed node address
- Network partition
Slow Propagation
Symptom: Takes 30+ seconds for nodes to discover each other.
Debug:
#![allow(unused)] fn main() { // Check gossip interval let config = ClusterConfig::default() .with_gossip_interval(Duration::from_millis(500)); // Faster }
Common causes:
- Gossip interval too long
- High packet loss
- Too few gossip fanout targets
False Failure Detection
Symptom: Nodes marked failed but they're actually alive.
Debug:
#![allow(unused)] fn main() { // Increase timeouts let config = ClusterConfig::default() .with_probe_timeout(Duration::from_secs(1)) // More lenient .with_suspicion_timeout(Duration::from_secs(10)); }
Common causes:
- Network latency spikes
- Node overloaded (GC pauses)
- Timeout too aggressive
Next Steps
- Load Balancing - Use discovered nodes for routing
- Health Checking - Understand Phi Accrual algorithm
- Failures - Handle partitions and split-brain scenarios
References
- SWIM Paper (Cornell) - Original SWIM protocol
- Phi Accrual Paper - Advanced failure detection
- Gossip Protocols Overview - General gossip concepts
Load Balancing
Load balancing distributes requests across worker nodes to optimize resource utilization, minimize response time, and prevent overload. RpcNet provides multiple strategies to suit different workload patterns.
Available Strategies
RpcNet includes three built-in load balancing strategies:
#![allow(unused)] fn main() { use rpcnet::cluster::LoadBalancingStrategy; // Available strategies LoadBalancingStrategy::RoundRobin // Even distribution LoadBalancingStrategy::Random // Random selection LoadBalancingStrategy::LeastConnections // Pick least loaded (recommended) }
1. Round Robin
Distributes requests evenly across all available workers in sequence.
Request Flow:
Request 1 β Worker A
Request 2 β Worker B
Request 3 β Worker C
Request 4 β Worker A (cycle repeats)
Request 5 β Worker B
...
Algorithm:
#![allow(unused)] fn main() { fn select_worker(&mut self, workers: &[Worker]) -> &Worker { let worker = &workers[self.index % workers.len()]; self.index += 1; worker } }
When to use:
- β Workers have identical capabilities
- β Requests have similar processing time
- β Simple, predictable distribution needed
- β Workers have different performance characteristics
- β Requests vary significantly in complexity
Pros:
- Simple and deterministic
- Perfect load distribution over time
- No state tracking required
Cons:
- Doesn't account for current load
- Doesn't handle heterogeneous workers well
- Can send requests to overloaded nodes
2. Random
Selects a random worker for each request.
Request Flow:
Request 1 β Worker B (random)
Request 2 β Worker A (random)
Request 3 β Worker B (random)
Request 4 β Worker C (random)
...
Algorithm:
#![allow(unused)] fn main() { fn select_worker(&self, workers: &[Worker]) -> &Worker { let idx = rand::thread_rng().gen_range(0..workers.len()); &workers[idx] } }
When to use:
- β Stateless workloads
- β Workers have identical capabilities
- β No session affinity required
- β Want to avoid coordinating state across requestors
- β Need predictable distribution
Pros:
- No coordination required (fully stateless)
- Good distribution with large request counts
- Simple implementation
Cons:
- Uneven short-term distribution
- Doesn't account for current load
- Probabilistic rather than deterministic
3. Least Connections (Recommended)
Selects the worker with the fewest active connections.
Worker Status:
Worker A: 5 active connections
Worker B: 2 active connections β SELECTED
Worker C: 8 active connections
Next request β Worker B (has least connections)
Algorithm:
#![allow(unused)] fn main() { fn select_worker(&self, workers: &[Worker]) -> &Worker { workers .iter() .min_by_key(|w| w.active_connections.load(Ordering::Relaxed)) .unwrap() } }
When to use:
- β Long-lived connections (streaming, websockets)
- β Variable request processing time
- β Workers have different capacities
- β Recommended default for most use cases
- β Very short requests (overhead not worth it)
Pros:
- Adapts to actual load in real-time
- Handles heterogeneous workers well
- Prevents overload automatically
Cons:
- Slight overhead tracking connection counts
- Requires connection counting infrastructure
Using Load Balancing
With WorkerRegistry
#![allow(unused)] fn main() { use rpcnet::cluster::{WorkerRegistry, LoadBalancingStrategy}; // Create registry with desired strategy let registry = Arc::new(WorkerRegistry::new( cluster, LoadBalancingStrategy::LeastConnections // Change strategy here )); registry.start().await; // Select worker automatically using configured strategy let worker = registry.select_worker(Some("role=worker")).await?; println!("Selected worker: {} at {}", worker.label, worker.addr); }
With ClusterClient
#![allow(unused)] fn main() { use rpcnet::cluster::{ClusterClient, ClusterClientConfig}; // ClusterClient uses the registry's configured strategy let config = ClusterClientConfig::default(); let client = Arc::new(ClusterClient::new(registry, config)); // Automatic load-balanced routing let result = client.call_worker("compute", request, Some("role=worker")).await?; }
Strategy Comparison
Performance Characteristics
Strategy | Selection Time | Memory | Accuracy | Best For |
---|---|---|---|---|
Round Robin | O(1) | O(1) | Low | Uniform loads |
Random | O(1) | O(1) | Medium | Stateless |
Least Connections | O(N) | O(N) | High | Variable loads |
Distribution Quality
Test scenario: 1000 requests to 3 workers with varying processing times
Strategy | Worker A | Worker B | Worker C | Std Dev |
---|---|---|---|---|
Round Robin | 333 | 333 | 334 | 0.58 |
Random | 328 | 345 | 327 | 9.86 |
Least Connections | 280 | 390 | 330 | 55.52 |
Note: Round Robin appears most even, but this ignores actual load (processing time per request). Least Connections adapts to real load.
Real-World Scenarios
Scenario 1: Identical Workers, Uniform Requests
Workers: 3x m5.large (identical)
Requests: 1KB data, 50ms processing
Best strategy: Round Robin or Random
- All strategies perform similarly
- Round Robin slightly more predictable
Scenario 2: Heterogeneous Workers
Workers:
- 2x m5.large (2 CPU, 8GB RAM)
- 1x m5.xlarge (4 CPU, 16GB RAM)
Requests: CPU-intensive (100-500ms)
Best strategy: Least Connections
- Larger worker naturally gets more requests
- Prevents overload on smaller workers
Scenario 3: Variable Request Complexity
Workers: 3x m5.large (identical)
Requests:
- 70% simple (10ms)
- 20% medium (100ms)
- 10% complex (1000ms)
Best strategy: Least Connections
- Workers with complex requests get fewer new ones
- Prevents queue buildup
Scenario 4: Streaming Workloads
Workers: 3x GPU instances
Requests: Long-lived video transcoding streams
Best strategy: Least Connections
- Critical to balance active streams
- Round Robin would overload sequentially
Advanced Techniques
Weighted Load Balancing
Weight workers by capacity:
#![allow(unused)] fn main() { // Tag workers with capacity cluster.set_tag("capacity", "100"); // Large worker cluster.set_tag("capacity", "50"); // Small worker // Custom selection logic fn select_weighted_worker(workers: &[Worker]) -> &Worker { let total_capacity: u32 = workers.iter() .map(|w| w.tags.get("capacity").unwrap().parse::<u32>().unwrap()) .sum(); let mut rand_val = rand::thread_rng().gen_range(0..total_capacity); for worker in workers { let capacity = worker.tags.get("capacity").unwrap().parse::<u32>().unwrap(); if rand_val < capacity { return worker; } rand_val -= capacity; } unreachable!() } }
Locality-Aware Load Balancing
Prefer workers in the same zone/region:
#![allow(unused)] fn main() { async fn select_local_worker( registry: &WorkerRegistry, client_zone: &str, ) -> Result<Worker> { // Try local workers first let filter = format!("role=worker,zone={}", client_zone); if let Ok(worker) = registry.select_worker(Some(&filter)).await { return Ok(worker); } // Fall back to any worker registry.select_worker(Some("role=worker")).await } }
Affinity-Based Load Balancing
Route requests from the same client to the same worker:
#![allow(unused)] fn main() { use std::collections::hash_map::DefaultHasher; use std::hash::{Hash, Hasher}; fn select_with_affinity(client_id: &str, workers: &[Worker]) -> &Worker { let mut hasher = DefaultHasher::new(); client_id.hash(&mut hasher); let hash = hasher.finish() as usize; &workers[hash % workers.len()] } }
Use cases:
- Session-based workloads
- Client-specific caching
- Stateful processing
Load Shedding
Reject requests when all workers are overloaded:
#![allow(unused)] fn main() { async fn select_with_shedding( registry: &WorkerRegistry, max_connections: usize, ) -> Result<Worker> { let worker = registry.select_worker(Some("role=worker")).await?; if worker.active_connections >= max_connections { return Err(anyhow::anyhow!("All workers at capacity")); } Ok(worker) } }
Monitoring and Metrics
Track Load Distribution
#![allow(unused)] fn main() { use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; use std::collections::HashMap; struct LoadBalancerMetrics { requests_per_worker: Arc<Mutex<HashMap<Uuid, AtomicUsize>>>, } impl LoadBalancerMetrics { async fn record_request(&self, worker_id: Uuid) { let mut map = self.requests_per_worker.lock().await; map.entry(worker_id) .or_insert_with(|| AtomicUsize::new(0)) .fetch_add(1, Ordering::Relaxed); } async fn get_distribution(&self) -> HashMap<Uuid, usize> { let map = self.requests_per_worker.lock().await; map.iter() .map(|(id, count)| (*id, count.load(Ordering::Relaxed))) .collect() } } }
Monitor Worker Health
#![allow(unused)] fn main() { async fn monitor_worker_load(registry: Arc<WorkerRegistry>) { loop { tokio::time::sleep(Duration::from_secs(10)).await; let workers = registry.workers().await; for worker in workers { let load_pct = (worker.active_connections as f64 / worker.capacity as f64) * 100.0; if load_pct > 80.0 { log::warn!( "Worker {} at {}% capacity ({} connections)", worker.label, load_pct, worker.active_connections ); } // Report to metrics system metrics::gauge!("worker.load_pct", load_pct, "worker" => worker.label.clone()); metrics::gauge!("worker.connections", worker.active_connections as f64, "worker" => worker.label.clone()); } } } }
Best Practices
1. Choose the Right Strategy
#![allow(unused)] fn main() { // Default recommendation LoadBalancingStrategy::LeastConnections // Handles most cases well // Use Round Robin if: // - All workers identical // - All requests uniform // - Need deterministic distribution // Use Random if: // - Completely stateless // - Multiple load balancers // - Want to avoid coordination overhead }
2. Tag Workers Appropriately
#![allow(unused)] fn main() { // Provide rich metadata for routing decisions cluster.set_tag("role", "worker"); cluster.set_tag("capacity", "100"); cluster.set_tag("zone", "us-west-2a"); cluster.set_tag("instance_type", "m5.xlarge"); cluster.set_tag("gpu", "true"); }
3. Monitor Load Distribution
#![allow(unused)] fn main() { // Log worker selection for debugging let worker = registry.select_worker(Some("role=worker")).await?; log::debug!( "Selected worker {} (connections: {})", worker.label, worker.active_connections ); }
4. Handle No Workers Available
#![allow(unused)] fn main() { // Gracefully handle empty worker pool match registry.select_worker(Some("role=worker")).await { Ok(worker) => { // Process with worker } Err(e) => { log::error!("No workers available: {}", e); // Return error to client or queue request } } }
5. Test Under Load
#![allow(unused)] fn main() { // Benchmark different strategies #[tokio::test] async fn bench_load_balancing() { let strategies = vec![ LoadBalancingStrategy::RoundRobin, LoadBalancingStrategy::Random, LoadBalancingStrategy::LeastConnections, ]; for strategy in strategies { let registry = WorkerRegistry::new(cluster.clone(), strategy); registry.start().await; let start = Instant::now(); for _ in 0..10_000 { registry.select_worker(Some("role=worker")).await?; } let duration = start.elapsed(); println!("{:?}: {:?}", strategy, duration); } } }
Troubleshooting
Uneven Load Distribution
Symptom: One worker consistently gets more requests than others.
Debug:
#![allow(unused)] fn main() { // Check active connections let workers = registry.workers().await; for worker in workers { println!("{}: {} connections", worker.label, worker.active_connections); } }
Common causes:
- Using Least Connections with short-lived requests (connections finish before next selection)
- Worker capacity differences not accounted for
- Some workers slower to release connections
Solution:
- Try Round Robin for uniform short requests
- Use weighted load balancing for heterogeneous workers
- Ensure connections are properly closed
Worker Overload
Symptom: Workers running out of resources despite load balancing.
Debug:
#![allow(unused)] fn main() { // Monitor worker metrics for worker in registry.workers().await { println!( "{}: {} connections (capacity: {})", worker.label, worker.active_connections, worker.capacity ); } }
Common causes:
- Too few workers for load
- Worker capacity set too high
- Requests taking longer than expected
Solution:
- Add more workers
- Implement load shedding
- Scale worker resources
Strategy Not Applied
Symptom: Load balancing seems random despite configuring strategy.
Debug:
#![allow(unused)] fn main() { // Verify registry configuration println!("Strategy: {:?}", registry.strategy()); }
Common causes:
- Wrong registry instance used
- Strategy changed after initialization
- Multiple registries with different configs
Solution:
- Use single registry instance
- Configure strategy at creation time
- Pass registry via Arc for sharing
Performance Impact
Overhead by Strategy
Measured on 3-node cluster, 100K requests:
Strategy | Avg Selection Time | Memory per Request | Total Overhead |
---|---|---|---|
Round Robin | 15ns | 0 bytes | 0.0015ms |
Random | 42ns | 0 bytes | 0.0042ms |
Least Connections | 180ns | 8 bytes | 0.018ms |
Conclusion: All strategies add negligible overhead (< 0.02ms) compared to network latency (~0.1-1ms).
Throughput Impact
Load balancing does not reduce throughput:
Direct RPC (no load balancing): 172K RPS
With Round Robin: 171K RPS (-0.5%)
With Random: 170K RPS (-1.1%)
With Least Connections: 168K RPS (-2.3%)
Conclusion: Load balancing overhead is minimal, well worth the improved distribution.
Next Steps
- Health Checking - Ensure selected workers are healthy
- Failures - Handle worker failures gracefully
References
- Load Balancing Algorithms - Overview of strategies
- Least Connections Algorithm - Industry standard
- Consistent Hashing - Advanced affinity technique
Health Checking
RpcNet uses the Phi Accrual Failure Detector algorithm for accurate and adaptive health checking. This chapter explains how RpcNet determines which nodes are healthy and when to mark them as failed.
The Problem with Binary Health Checks
Traditional health checks use binary logic:
if (ping_timeout):
node_is_failed = True
else:
node_is_healthy = True
Problems:
- Fixed threshold: 500ms timeout doesn't adapt to network conditions
- False positives: Temporary slowdown triggers failure
- False negatives: Slow node stays "healthy" until timeout
- No confidence: Can't express "probably failed" vs "definitely failed"
Phi Accrual Solution
The Phi Accrual algorithm provides a continuous suspicion level instead of binary alive/dead:
Phi Value (Ξ¦) = Suspicion Level
Ξ¦ = 0 β Node is responding normally
Ξ¦ = 5 β Moderate suspicion (50% chance failed)
Ξ¦ = 8 β High suspicion (97.7% chance failed) β Typical threshold
Ξ¦ = 10 β Very high suspicion (99.99% chance failed)
Ξ¦ = 15+ β Almost certainly failed
How It Works
1. Track Heartbeat History
#![allow(unused)] fn main() { struct HeartbeatHistory { intervals: Vec<Duration>, // Last N intervals between heartbeats last_heartbeat: Instant, // When we last heard from node } }
2. Calculate Expected Interval
#![allow(unused)] fn main() { fn mean_interval(&self) -> Duration { self.intervals.iter().sum::<Duration>() / self.intervals.len() } fn std_deviation(&self) -> Duration { let mean = self.mean_interval(); let variance = self.intervals .iter() .map(|&interval| { let diff = interval.as_secs_f64() - mean.as_secs_f64(); diff * diff }) .sum::<f64>() / self.intervals.len() as f64; Duration::from_secs_f64(variance.sqrt()) } }
3. Compute Phi
#![allow(unused)] fn main() { fn phi(&self) -> f64 { let now = Instant::now(); let time_since_last = now.duration_since(self.last_heartbeat); let mean = self.mean_interval(); let std_dev = self.std_deviation(); // How many standard deviations away is current delay? let z_score = (time_since_last.as_secs_f64() - mean.as_secs_f64()) / std_dev.as_secs_f64(); // Convert to phi (log probability) -z_score.ln() / 2.0_f64.ln() } }
4. Determine Failure
#![allow(unused)] fn main() { const PHI_THRESHOLD: f64 = 8.0; // Configurable if phi() > PHI_THRESHOLD { mark_node_as_failed(); } }
Visualization
Example 1: Healthy Node
Heartbeats arrive regularly every ~1 second:
Time (s): 0 1 2 3 4 5 6 7 8
Heartbeat: β β β β β β β β β
Phi: 0 0 0 0 0 0 0 0 0
Status: Healthy (Ξ¦ = 0)
Example 2: Temporary Network Glitch
Heartbeats delayed but node recovers:
Time (s): 0 1 2 3 4 5 6 7 8
Heartbeat: β β β . . β β β β
Phi: 0 0 0 2 5 2 0 0 0
β²
Elevated but below threshold
Status: Suspect briefly, but recovers (no failure declared)
Example 3: Actual Failure
Heartbeats stop after node crashes:
Time (s): 0 1 2 3 4 5 6 7 8
Heartbeat: β β β X . . . . .
Phi: 0 0 0 2 5 8 11 14 17
β²
Exceeds threshold β FAILED
Status: Failed (Ξ¦ = 8+)
Adaptive Behavior
Phi Accrual adapts to network conditions automatically:
Stable Network
History: [1.0s, 1.0s, 1.0s, 1.0s, 1.0s]
Mean: 1.0s
Std Dev: 0.0s (very predictable)
Current delay: 1.5s
Phi: 8.0 β FAILURE (unusual for this stable network)
Variable Network
History: [0.8s, 1.2s, 0.9s, 1.4s, 1.0s]
Mean: 1.06s
Std Dev: 0.24s (more variable)
Current delay: 1.5s
Phi: 3.2 β HEALTHY (normal variation)
Key insight: Same 1.5s delay is interpreted differently based on historical patterns.
RpcNet Implementation
Configuration
#![allow(unused)] fn main() { use rpcnet::cluster::{ClusterConfig, HealthCheckConfig}; use std::time::Duration; let health_config = HealthCheckConfig::default() .with_interval(Duration::from_secs(1)) // Check every 1 second .with_phi_threshold(8.0) // Suspicion threshold .with_history_size(100) // Track last 100 intervals .with_min_std_deviation(Duration::from_millis(50)); // Min variation let cluster_config = ClusterConfig::default() .with_health_check(health_config); let cluster = ClusterMembership::new(cluster_config).await?; }
Monitoring Health
#![allow(unused)] fn main() { // Subscribe to health events let mut events = cluster.subscribe(); while let Some(event) = events.recv().await { match event { ClusterEvent::NodeSuspect(node, phi) => { println!("Node {} suspect (Ξ¦ = {:.2})", node.id, phi); } ClusterEvent::NodeFailed(node) => { println!("Node {} failed (Ξ¦ exceeded threshold)", node.id); } ClusterEvent::NodeRecovered(node) => { println!("Node {} recovered (Ξ¦ back to normal)", node.id); } _ => {} } } }
Custom Phi Threshold
Different thresholds for different applications:
#![allow(unused)] fn main() { // Conservative (fewer false positives, slower detection) .with_phi_threshold(10.0) // 99.99% confidence // Aggressive (faster detection, more false positives) .with_phi_threshold(5.0) // 50% confidence // Recommended default .with_phi_threshold(8.0) // 97.7% confidence }
Choosing Phi Threshold
Threshold | Confidence | False Positive Rate | Detection Time | Use Case |
---|---|---|---|---|
3.0 | 12.5% | Very High | Very Fast | Testing only |
5.0 | 50% | High | Fast | Aggressive failover |
8.0 | 97.7% | Low | Moderate | Recommended |
10.0 | 99.99% | Very Low | Slower | Critical systems |
12.0 | 99.9999% | Extremely Low | Slow | High-latency networks |
Threshold Selection Guide
Low threshold (3-5) if:
- Fast failover is critical
- False positives are acceptable
- Network is very stable
Medium threshold (6-9) if:
- Balance between speed and accuracy
- Typical production environments
- Recommended for most use cases
High threshold (10+) if:
- False positives are very costly
- Network has high variance
- Graceful degradation preferred over fast failover
Integration with SWIM
Phi Accrual works alongside SWIM's failure detection:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SWIM Protocol β
β β
β 1. Gossip β Heartbeats to Phi Accrual β
β 2. Phi Accrual β Computes suspicion level β
β 3. Ξ¦ > threshold β Mark node as Suspect β
β 4. Indirect probes β Verify with other nodes β
β 5. Multiple confirmations β Mark node as Failed β
β 6. Gossip spreads failure β All nodes updated β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Process:
- Regular operation: Nodes exchange gossip messages (heartbeats)
- Phi calculation: Each heartbeat updates Phi Accrual history
- Suspicion: When Ξ¦ exceeds threshold, node marked Suspect
- Verification: SWIM performs indirect probes to confirm
- Failure declaration: Multiple nodes agree β Node marked Failed
- Recovery: If heartbeats resume, Ξ¦ drops and node marked Alive again
Performance Characteristics
Computational Overhead
#![allow(unused)] fn main() { // Phi calculation per node per check: // - Mean: O(1) with running average // - Std dev: O(1) with running variance // - Phi: O(1) math operations // Total overhead: ~500ns per node per health check }
For 100 nodes checked every 1 second: 0.05ms total CPU time (negligible)
Memory Overhead
#![allow(unused)] fn main() { struct NodeHealth { intervals: VecDeque<Duration>, // 100 entries Γ 16 bytes = 1.6 KB last_heartbeat: Instant, // 16 bytes running_mean: Duration, // 16 bytes running_variance: f64, // 8 bytes } // Total per node: ~1.7 KB }
For 100 nodes: ~170 KB memory (negligible)
Detection Time
Measured time from actual failure to detection:
Network Stability | Heartbeat Interval | Phi Threshold | Detection Time |
---|---|---|---|
Stable (Ο=10ms) | 1s | 8.0 | 2-3s |
Variable (Ο=200ms) | 1s | 8.0 | 4-6s |
Unstable (Ο=500ms) | 1s | 8.0 | 8-12s |
Tuning for faster detection: Reduce heartbeat interval (e.g., 500ms)
Comparison to Alternatives
vs Fixed Timeout
Fixed Timeout:
β Doesn't adapt to network conditions
β Binary alive/dead (no confidence)
β Simple implementation
Phi Accrual:
β Adapts automatically
β Continuous suspicion level
β Fewer false positives
β More complex
vs Heartbeat Count
Heartbeat Count (miss N in a row):
β Slow detection (N Γ interval)
β Doesn't account for network variance
β Simple logic
Phi Accrual:
β Faster detection
β Accounts for network patterns
β Adaptive threshold
vs Gossip Only
Gossip Only (no Phi):
β Hard threshold (suspect β failed)
β Doesn't adapt to network
β Simpler protocol
Gossip + Phi:
β Smooth suspicion curve
β Adapts to network conditions
β More accurate detection
Best Practices
1. Tune for Your Network
#![allow(unused)] fn main() { // Measure your network characteristics first async fn measure_network_latency() -> (Duration, Duration) { let mut latencies = Vec::new(); for _ in 0..100 { let start = Instant::now(); ping_peer().await.unwrap(); latencies.push(start.elapsed()); } let mean = latencies.iter().sum::<Duration>() / latencies.len(); let variance = latencies.iter() .map(|&d| (d.as_secs_f64() - mean.as_secs_f64()).powi(2)) .sum::<f64>() / latencies.len() as f64; let std_dev = Duration::from_secs_f64(variance.sqrt()); println!("Network latency: {:.2?} Β± {:.2?}", mean, std_dev); (mean, std_dev) } // Then configure accordingly let (mean, std_dev) = measure_network_latency().await; let health_config = HealthCheckConfig::default() .with_interval(mean * 2) // Check at 2Γ mean latency .with_phi_threshold(8.0) .with_min_std_deviation(std_dev); }
2. Monitor Phi Values
#![allow(unused)] fn main() { // Log phi values to understand patterns async fn monitor_phi_values(cluster: Arc<ClusterMembership>) { loop { tokio::time::sleep(Duration::from_secs(10)).await; for node in cluster.nodes().await { let phi = cluster.phi(node.id).await.unwrap_or(0.0); if phi > 5.0 { log::warn!("Node {} phi elevated: {:.2}", node.id, phi); } metrics::gauge!("cluster.node.phi", phi, "node" => node.id.to_string()); } } } }
3. Handle Suspicion State
#![allow(unused)] fn main() { // Don't immediately fail on suspicion - investigate first let mut events = cluster.subscribe(); while let Some(event) = events.recv().await { match event { ClusterEvent::NodeSuspect(node, phi) => { log::warn!("Node {} suspect (Ξ¦ = {:.2}), investigating...", node.id, phi); // Trigger additional checks tokio::spawn(async move { if let Err(e) = verify_node_health(&node).await { log::error!("Node {} verification failed: {}", node.id, e); } }); } ClusterEvent::NodeFailed(node) => { log::error!("Node {} failed, removing from pool", node.id); remove_from_worker_pool(node.id).await; } _ => {} } } }
4. Adjust History Size
#![allow(unused)] fn main() { // Larger history = more stable, slower adaptation .with_history_size(200) // For very stable networks // Smaller history = faster adaptation to changes .with_history_size(50) // For dynamic networks // Default (recommended) .with_history_size(100) }
5. Set Minimum Standard Deviation
#![allow(unused)] fn main() { // Prevent division by zero and overly sensitive detection .with_min_std_deviation(Duration::from_millis(50)) // Higher min = less sensitive to small variations .with_min_std_deviation(Duration::from_millis(100)) }
Troubleshooting
False Positives (Node marked failed but is alive)
Symptoms:
- Nodes frequently marked failed and recovered
- Phi threshold exceeded during normal operation
Debug:
#![allow(unused)] fn main() { // Log phi values and intervals for node in cluster.nodes().await { let phi = cluster.phi(node.id).await.unwrap_or(0.0); let history = cluster.heartbeat_history(node.id).await; println!("Node {}: Ξ¦ = {:.2}, intervals = {:?}", node.id, phi, history); } }
Solutions:
- Increase phi threshold (8.0 β 10.0)
- Increase heartbeat interval to match network latency
- Increase min_std_deviation for variable networks
Slow Detection (Failures take too long to detect)
Symptoms:
- Nodes crash but stay marked alive for minutes
- Requests keep routing to failed nodes
Debug:
#![allow(unused)] fn main() { // Measure actual detection time let failure_time = Instant::now(); // ... node fails ... let detection_time = cluster.wait_for_failure(node_id).await; println!("Detection took: {:?}", detection_time.duration_since(failure_time)); }
Solutions:
- Decrease phi threshold (8.0 β 6.0)
- Decrease heartbeat interval (1s β 500ms)
- Decrease suspicion timeout
Memory Growth
Symptoms:
- Memory usage grows over time
- History buffers not bounded
Debug:
#![allow(unused)] fn main() { // Check history sizes for node in cluster.nodes().await { let history = cluster.heartbeat_history(node.id).await; println!("Node {}: {} intervals tracked", node.id, history.len()); } }
Solutions:
- Ensure history_size is set (default: 100)
- Verify old entries are removed
- Check for node ID leaks
Advanced Topics
Combining Multiple Detectors
Use Phi Accrual for heartbeats AND application-level health:
#![allow(unused)] fn main() { struct CompositeHealthCheck { phi_detector: PhiAccrualDetector, app_health: Arc<Mutex<HashMap<Uuid, bool>>>, } impl CompositeHealthCheck { async fn is_healthy(&self, node_id: Uuid) -> bool { // Both phi and application health must be good let phi = self.phi_detector.phi(node_id); let app_healthy = self.app_health.lock().await.get(&node_id).copied().unwrap_or(false); phi < PHI_THRESHOLD && app_healthy } } }
Weighted Phi Thresholds
Different thresholds for different node types:
#![allow(unused)] fn main() { fn get_phi_threshold(node: &Node) -> f64 { match node.tags.get("criticality") { Some("high") => 10.0, // Very conservative for critical nodes Some("low") => 6.0, // Aggressive for non-critical _ => 8.0, // Default } } }
Next Steps
References
- Phi Accrual Paper - Original algorithm
- Cassandra Failure Detection - Production implementation
- Akka Cluster Phi - Akka's usage
Failure Handling
Distributed systems must gracefully handle node failures, network partitions, and other failure scenarios. This chapter explains how RpcNet detects and recovers from failures in cluster deployments.
Types of Failures
1. Node Crashes
Scenario: Worker process terminates unexpectedly
Before: After:
[Director] [Director]
| |
βββββ΄ββββ ββββββ΄βββββ
A B C A C
X β Crashed
Detection:
- Gossip protocol detects missing heartbeats
- Phi Accrual marks node as failed (typically 4-8 seconds)
- Failure event propagated to all nodes
Recovery:
#![allow(unused)] fn main() { // Automatic handling via WorkerRegistry let mut events = registry.subscribe(); while let Some(event) = events.recv().await { match event { ClusterEvent::NodeFailed(node) => { log::error!("Worker {} failed", node.id); // WorkerRegistry automatically removes from pool // Future requests route to remaining workers } _ => {} } } }
2. Network Partitions
Scenario: Network split divides cluster
Before partition: After partition:
Director Director |
/ \ / |
A B A | B
Cluster view splits into two independent groups
Detection:
- Nodes on each side detect "failures" of nodes on other side
- Partition detector identifies split-brain scenario
- Both sides continue operating independently
Handling:
#![allow(unused)] fn main() { // Monitor for partitions let mut events = cluster.subscribe(); while let Some(event) = events.recv().await { if let ClusterEvent::PartitionDetected(minority, majority) = event { log::error!("Network partition detected!"); if minority.contains(&my_node_id) { // I'm in minority partition log::warn!("In minority partition, entering degraded mode"); enter_read_only_mode().await; } else { // I'm in majority partition log::info!("In majority partition, continuing normal operation"); } } } }
3. Slow Nodes (Degraded Performance)
Scenario: Node responding but very slowly
Normal response: 100ms
Degraded response: 5000ms (50x slower)
Detection:
- Phi Accrual increases suspicion level but may not mark as failed
- Request timeouts at application level
- Load balancer (Least Connections) naturally avoids slow nodes
Handling:
#![allow(unused)] fn main() { // Set request timeout let timeout = Duration::from_secs(5); match tokio::time::timeout(timeout, worker.call("compute", data)).await { Ok(Ok(result)) => { // Success } Ok(Err(e)) => { log::error!("Worker returned error: {}", e); retry_with_different_worker(data).await?; } Err(_) => { log::warn!("Worker timeout, trying another"); retry_with_different_worker(data).await?; } } }
4. Cascading Failures
Scenario: Failure of one node causes others to fail
Worker A crashes
β Remaining workers overloaded
β Worker B crashes from overload
β Worker C also crashes
β Complete system failure
Prevention:
#![allow(unused)] fn main() { // Load shedding to prevent cascading failures async fn select_worker_with_shedding( registry: &WorkerRegistry, max_load: f64, ) -> Result<Worker> { let worker = registry.select_worker(Some("role=worker")).await?; let load = worker.active_connections as f64 / worker.capacity as f64; if load > max_load { // Reject request to prevent overload return Err(anyhow::anyhow!("All workers at capacity, shedding load")); } Ok(worker) } }
Failure Detection Timeline
Node Crash Detection
Time: 0s 1s 2s 3s 4s 5s 6s 7s 8s
| | | | | | | | |
Gossip: β β β X . . . . .
Phi: 0 0 0 2 4 6 8 10 12
^
Threshold (8.0)
Node marked FAILED
Events: - - - - - - NodeFailed propagated
Registry:- - - - - - Worker removed from pool
Clients: - - - - - - Requests route elsewhere
Total time to full recovery: ~6-8 seconds with default settings
Partition Detection Timeline
Time: 0s 5s 10s 15s 20s
| | | | |
Partition occurs
|
Side A can't reach Side B
Side B can't reach Side A
|
Both sides mark other as "suspect"
|
Multiple nodes confirm partition
|
PartitionDetected event
|
Both sides operate independently
|
Partition heals
Gossip merges views
Detection time: 10-15 seconds
Recovery time: 5-10 seconds after partition heals
Retry Strategies
Automatic Retry
#![allow(unused)] fn main() { use tokio::time::{sleep, Duration}; async fn call_with_retry<T>( f: impl Fn() -> Pin<Box<dyn Future<Output = Result<T>>>>, max_retries: usize, ) -> Result<T> { let mut retries = 0; loop { match f().await { Ok(result) => return Ok(result), Err(e) if retries < max_retries => { retries += 1; log::warn!("Retry {}/{} after error: {}", retries, max_retries, e); // Exponential backoff let delay = Duration::from_millis(100 * 2_u64.pow(retries as u32)); sleep(delay).await; } Err(e) => return Err(e), } } } // Usage let result = call_with_retry( || Box::pin(worker.call("compute", data.clone())), 3 ).await?; }
Failover to Different Worker
#![allow(unused)] fn main() { async fn call_with_failover( registry: Arc<WorkerRegistry>, method: &str, data: Vec<u8>, max_attempts: usize, ) -> Result<Response> { let mut attempted_workers = HashSet::new(); for attempt in 0..max_attempts { // Select worker we haven't tried yet let worker = loop { let w = registry.select_worker(Some("role=worker")).await?; if !attempted_workers.contains(&w.id) { break w; } if attempted_workers.len() >= registry.worker_count().await { return Err(anyhow::anyhow!("All workers failed")); } }; attempted_workers.insert(worker.id); log::info!("Attempt {}: trying worker {}", attempt + 1, worker.label); match worker.call(method, data.clone()).await { Ok(response) => return Ok(response), Err(e) => { log::warn!("Worker {} failed: {}", worker.label, e); continue; } } } Err(anyhow::anyhow!("Failed after {} attempts", max_attempts)) } }
Circuit Breaker
Prevent cascading failures by temporarily stopping requests to failed nodes:
#![allow(unused)] fn main() { use std::sync::Arc; use tokio::sync::RwLock; use std::collections::HashMap; #[derive(Clone)] enum CircuitState { Closed, // Normal operation Open, // Failing, reject requests HalfOpen, // Testing recovery } struct CircuitBreaker { states: Arc<RwLock<HashMap<Uuid, CircuitState>>>, failure_threshold: usize, timeout: Duration, } impl CircuitBreaker { async fn call<T>( &self, worker_id: Uuid, f: impl Future<Output = Result<T>>, ) -> Result<T> { let state = self.states.read().await .get(&worker_id) .cloned() .unwrap_or(CircuitState::Closed); match state { CircuitState::Open => { // Circuit open, reject immediately Err(anyhow::anyhow!("Circuit breaker open for worker {}", worker_id)) } CircuitState::HalfOpen | CircuitState::Closed => { match f.await { Ok(result) => { // Success, close circuit self.states.write().await.insert(worker_id, CircuitState::Closed); Ok(result) } Err(e) => { // Failure, open circuit self.states.write().await.insert(worker_id, CircuitState::Open); // Schedule transition to half-open let states = self.states.clone(); let timeout = self.timeout; tokio::spawn(async move { sleep(timeout).await; states.write().await.insert(worker_id, CircuitState::HalfOpen); }); Err(e) } } } } } } }
Partition Handling
Split-Brain Prevention
Problem: During partition, both sides may accept writes, leading to conflicts.
Solution 1: Majority quorum
#![allow(unused)] fn main() { async fn handle_partition_with_quorum( cluster: Arc<ClusterMembership>, total_nodes: usize, ) -> Result<()> { let visible_nodes = cluster.visible_nodes().await.len(); let majority = total_nodes / 2 + 1; if visible_nodes < majority { log::error!("Lost majority quorum ({}/{}), entering read-only mode", visible_nodes, total_nodes); // Enter read-only mode set_read_only(true).await; // Wait for partition to heal loop { sleep(Duration::from_secs(5)).await; let current = cluster.visible_nodes().await.len(); if current >= majority { log::info!("Regained quorum, resuming writes"); set_read_only(false).await; break; } } } Ok(()) } }
Solution 2: Designated leader
#![allow(unused)] fn main() { // Only one node (leader) accepts writes async fn handle_partition_with_leader( cluster: Arc<ClusterMembership>, leader_id: Uuid, ) -> Result<()> { let my_id = cluster.local_node_id(); if my_id == leader_id { // I'm the leader, check if I can reach majority if !can_reach_majority(&cluster).await { log::error!("Leader lost majority, stepping down"); set_read_only(true).await; } } else { // I'm not the leader, check if I can reach leader if !can_reach_node(&cluster, leader_id).await { log::error!("Lost connection to leader, entering read-only mode"); set_read_only(true).await; } } Ok(()) } }
Partition Recovery
When partition heals, nodes must reconcile state:
#![allow(unused)] fn main() { async fn handle_partition_recovery( cluster: Arc<ClusterMembership>, ) -> Result<()> { let mut events = cluster.subscribe(); while let Some(event) = events.recv().await { if let ClusterEvent::PartitionHealed = event { log::info!("Partition healed, reconciling state"); // Re-sync cluster state cluster.resync().await?; // Reconcile application state reconcile_application_state().await?; // Resume normal operation set_read_only(false).await; log::info!("Partition recovery complete"); } } Ok(()) } async fn reconcile_application_state() -> Result<()> { // Application-specific reconciliation logic // Examples: // - Compare vector clocks // - Merge CRDTs // - Apply conflict resolution rules // - Manual operator intervention Ok(()) } }
Client-Side Handling
Transparent Failover
Clients should automatically failover to healthy workers:
#![allow(unused)] fn main() { // Client implementation with automatic failover struct ResilientClient { registry: Arc<WorkerRegistry>, client: Arc<ClusterClient>, } impl ResilientClient { async fn call(&self, method: &str, data: Vec<u8>) -> Result<Response> { const MAX_ATTEMPTS: usize = 3; for attempt in 1..=MAX_ATTEMPTS { // Get healthy worker let worker = match self.registry.select_worker(Some("role=worker")).await { Ok(w) => w, Err(e) if attempt < MAX_ATTEMPTS => { log::warn!("No workers available, retrying..."); sleep(Duration::from_millis(100)).await; continue; } Err(e) => return Err(e), }; // Get pooled connection let conn = self.connection_pool.get_or_connect(worker.addr).await?; // Make request match conn.call(method, data.clone()).await { Ok(response) => return Ok(response), Err(e) => { log::warn!("Worker {} failed (attempt {}): {}", worker.label, attempt, e); // Mark worker as potentially failed self.registry.report_failure(worker.id).await; if attempt < MAX_ATTEMPTS { sleep(Duration::from_millis(100 * attempt as u64)).await; } } } } Err(anyhow::anyhow!("All attempts failed")) } } }
Request Hedging
Send duplicate requests to multiple workers, use first response:
#![allow(unused)] fn main() { async fn hedged_call( registry: Arc<WorkerRegistry>, method: &str, data: Vec<u8>, hedge_after: Duration, ) -> Result<Response> { let worker1 = registry.select_worker(Some("role=worker")).await?; // Start first request let req1 = worker1.call(method, data.clone()); tokio::select! { result = req1 => result, _ = sleep(hedge_after) => { // First request taking too long, send hedge request log::info!("Hedging request to second worker"); let worker2 = registry.select_worker(Some("role=worker")).await?; let req2 = worker2.call(method, data.clone()); // Return whichever completes first tokio::select! { result = req1 => result, result = req2 => result, } } } } }
Monitoring Failures
Track Failure Metrics
#![allow(unused)] fn main() { struct FailureMetrics { node_failures: Counter, partition_count: Counter, retry_count: Counter, circuit_breaks: Counter, } async fn monitor_failures(cluster: Arc<ClusterMembership>) { let mut events = cluster.subscribe(); while let Some(event) = events.recv().await { match event { ClusterEvent::NodeFailed(node) => { metrics::increment_counter!("cluster.node_failures"); log::error!("Node {} failed", node.id); // Alert if critical worker if node.tags.get("critical") == Some(&"true".to_string()) { alert_ops_team(&format!("Critical node {} failed", node.id)); } } ClusterEvent::PartitionDetected(_) => { metrics::increment_counter!("cluster.partitions"); alert_ops_team("Network partition detected"); } _ => {} } } } }
Health Dashboard
#![allow(unused)] fn main() { async fn health_dashboard(registry: Arc<WorkerRegistry>) -> String { let workers = registry.workers().await; let total = workers.len(); let healthy = workers.iter().filter(|w| w.is_healthy()).count(); let degraded = workers.iter().filter(|w| w.is_degraded()).count(); let failed = total - healthy - degraded; format!( "Cluster Health:\n\ Total Workers: {}\n\ Healthy: {} ({}%)\n\ Degraded: {} ({}%)\n\ Failed: {} ({}%)\n", total, healthy, (healthy * 100 / total), degraded, (degraded * 100 / total), failed, (failed * 100 / total) ) } }
Best Practices
1. Design for Failure
#![allow(unused)] fn main() { // Assume failures will happen // β Good: Handle failures gracefully async fn process(data: Vec<u8>) -> Result<Response> { match call_worker(data.clone()).await { Ok(response) => Ok(response), Err(e) => { log::error!("Worker call failed: {}", e); fallback_processing(data).await } } } // β Bad: No failure handling async fn process(data: Vec<u8>) -> Result<Response> { call_worker(data).await // Will panic/error if worker fails } }
2. Set Appropriate Timeouts
#![allow(unused)] fn main() { // β Good: Timeout prevents hanging let result = tokio::time::timeout( Duration::from_secs(5), worker.call("compute", data) ).await??; // β Bad: No timeout, could hang forever let result = worker.call("compute", data).await?; }
3. Implement Idempotency
#![allow(unused)] fn main() { // β Good: Idempotent operations safe to retry #[rpc_trait] pub trait ComputeService { async fn process(&self, request_id: Uuid, data: Vec<u8>) -> Result<Response>; // ^^^^^^^^^^^^ request ID makes it idempotent } // Check if already processed if let Some(cached) = self.check_cache(request_id).await { return Ok(cached); } }
4. Monitor Everything
#![allow(unused)] fn main() { // Track all failure types metrics::increment_counter!("failures.node_crash"); metrics::increment_counter!("failures.timeout"); metrics::increment_counter!("failures.partition"); metrics::gauge!("cluster.healthy_nodes", healthy_count as f64); }
5. Test Failure Scenarios
#![allow(unused)] fn main() { #[tokio::test] async fn test_worker_failure() { // Start cluster let (director, workers) = setup_cluster().await; // Kill one worker workers[0].shutdown().await; // Verify requests still succeed let client = ResilientClient::new(director.registry()); let result = client.call("compute", vec![1, 2, 3]).await; assert!(result.is_ok()); } }
Next Steps
- Discovery - Understand how nodes discover failures
- Health Checking - Learn about Phi Accrual detection
- Production Guide - Deploy resilient clusters
References
- Fallacies of Distributed Computing - Common mistakes
- CAP Theorem - Consistency vs Availability trade-offs
- Circuit Breaker Pattern - Martin Fowler's article
Streaming Overview
RpcNet builds streaming on top of QUIC bidirectional streams, letting clients and servers exchange sequences of frames concurrently. This chapter explains the core terminology, how the helpers map to underlying QUIC behaviour, and which features to reach for when designing real-time APIs.
What βstreamingβ means in RpcNet
Each streaming RPC opens a fresh QUIC bidirectional stream:
- Frames are transported as length-prefixed
Vec<u8>
payloads. - Upload and download directions operate independently; the client can keep sending while the server responds, and vice versa.
- Either side sends a zero-length frame to signal end-of-stream.
RpcNet exposes three convenience helpers that mirror gRPC-style semantics:
Pattern | Helper on RpcClient | Typical use case |
---|---|---|
Bidirectional streaming | call_streaming | Chat, collaborative editing, turn-taking |
Server streaming | call_server_streaming | Live dashboards, subscriptions, long poll |
Client streaming | call_client_streaming | Batched uploads, telemetry aggregation |
The server registers a single handler API (register_streaming
) for all three
patterns; the difference lies in how the client constructs the request stream
and how many responses it expects.
Frame format
RpcNetβs streaming frames follow this layout:
<u32 payload_length in little endian><payload bytes>
payload_length == 0
means βno more framesβ.- Payloads contain arbitrary user-defined bytes; most examples serialize using
bincode
orserde_json
. - The library allocates buffers lazily and only keeps a single frame in memory per direction.
Bidirectional streaming in detail
Use RpcClient::call_streaming
when both sides continuously trade messages:
#![allow(unused)] fn main() { let responses = client.call_streaming("chat", outbound_frames).await?; }
The client passes an async Stream<Item = Vec<u8>>
and receives another stream
for responses. RpcNet multiplexes both directions on a single QUIC stream. The
server handler receives an async stream of request frames and must return an
async stream of Result<Vec<u8>, RpcError>
responses.
Choose this mode when:
- Each request needs a corresponding response (command/reply flow).
- Both parties produce data over time (whiteboard sessions, multiplayer games).
- You want to push updates without closing the upload direction.
Server streaming
RpcClient::call_server_streaming
wraps call_streaming
for the common case
where the client sends one request and the server streams many responses:
#![allow(unused)] fn main() { let stream = client.call_server_streaming("subscribe", request_bytes).await?; }
On the server, the handler still observes a request stream; most implementations read the first frame as the subscription and ignore additional frames. Use this pattern when the server drives the timeline (market data, notifications, progress updates).
Client streaming
RpcClient::call_client_streaming
handles the inverse: the client uploads many
frames and waits for a single aggregated response.
#![allow(unused)] fn main() { let response = client.call_client_streaming("upload", outbound_frames).await?; }
The server consumes every inbound frame before yielding exactly one response frame. This pattern pairs well with compression or summarisation (log shipping, bulk metrics, video chunk ingestion).
Keep-alive and flow control
RpcConfig::with_keep_alive_interval
controls heartbeat frames at the QUIC layer, keeping otherwise idle streams alive.- Flow control is managed by s2n-quic; RpcNet reads and writes asynchronously, so slow consumers only backpressure their own stream, not the entire connection.
- Because each RPC lives on a separate QUIC stream, you can run many streaming calls in parallel without head-of-line blocking.
Error handling semantics
-
Returning
Err(RpcError)
from a server response stream sends a generic error frame to the client and terminates the stream. Encode domain-specific errors inside your payloads when you need richer context. -
If the client drops its output stream early, the server handler eventually
sees
None
from the inbound iterator and can clean up resources. -
Timeouts follow the same
DEFAULT_TIMEOUT
as unary calls, so linger only as long as your app requires.
Choosing between streaming helpers
Ask yourself:
- Does the client expect multiple responses? β Use server streaming.
- Does the server expect multiple requests? β Use client streaming.
- Do both sides talk repeatedly? β Use bidirectional streaming.
When none of the above apply, stick with unary RPCsβthey offer simpler error handling and deterministic retry behaviour.
Whatβs next
- Jump to the Streaming Walkthrough for a complete telemetry example that covers every helper.
- Revisit Concepts if you need low-level API reminders or code snippets.
Armed with the terminology and behaviour described here, you can design streaming endpoints with confidence and implement them using the detailed guide in the next chapter.
Streaming Walkthrough
This end-to-end example builds a telemetry service that exercises every streaming mode RpcNet offers: bidirectional chat, server streaming updates, and client streaming uploads. Follow along to scaffold the project, implement the handlers, and drive the flows from a client binary.
Step 0: Prerequisites
- Rust 1.75+ (
rustup show
to confirm) cargo
on yourPATH
- macOS or Linux (TLS support is bundled via
s2n-quic
)
Step 1: Create the project layout
cargo new telemetry-streams --bin
cd telemetry-streams
mkdir -p certs src/bin
rm src/main.rs # we'll rely on explicit binaries instead of the default main
The example uses two binaries: src/bin/server.rs
and src/bin/client.rs
.
Step 2: Declare dependencies
Edit Cargo.toml
to pull in RpcNet and helper crates:
[package]
name = "telemetry-streams"
version = "0.1.0"
edition = "2021"
[dependencies]
rpcnet = "0.2"
serde = { version = "1", features = ["derive"] }
bincode = "1.3"
async-stream = "0.3"
futures = "0.3"
tokio = { version = "1", features = ["rt-multi-thread", "macros", "time"] }
rpcnet
provides the client/server runtime.async-stream
andfutures
help produce response streams on the server.serde
/bincode
handle payload serialization.- Tokio is required because RpcNet is async-first.
Step 3: Generate development certificates
RpcNet requires TLS material for QUIC. Create a self-signed pair for local experiments:
openssl req -x509 -newkey rsa:4096 \
-keyout certs/server-key.pem \
-out certs/server-cert.pem \
-days 365 -nodes \
-subj "/CN=localhost"
The client reuses the public certificate file to trust the server.
Step 4: Define shared data types
Expose a library module that both binaries can import. Create src/lib.rs
:
#![allow(unused)] fn main() { // src/lib.rs pub mod telemetry; }
Now add the telemetry definitions in src/telemetry.rs
:
#![allow(unused)] fn main() { // src/telemetry.rs use rpcnet::RpcError; use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, Debug, Clone)] pub struct MetricReading { pub sensor: String, pub value: f64, } #[derive(Serialize, Deserialize, Debug, Clone)] pub struct LiveUpdate { pub sensor: String, pub rolling_avg: f64, } #[derive(Serialize, Deserialize, Debug, Clone)] pub struct ChatMessage { pub from: String, pub body: String, } #[derive(Serialize, Deserialize, Debug, Clone)] pub struct Ack { pub accepted: usize, } pub fn encode<T: Serialize>(value: &T) -> Result<Vec<u8>, RpcError> { Ok(bincode::serialize(value)?) } pub fn decode<T: for<'de> Deserialize<'de>>(bytes: &[u8]) -> Result<T, RpcError> { Ok(bincode::deserialize(bytes)?) } }
These helpers convert structures to and from the Vec<u8>
payloads that
RpcNet transports.
Step 5: Implement the streaming server
Create src/bin/server.rs
with three handlersβone per streaming pattern:
// src/bin/server.rs use async_stream::stream; use futures::StreamExt; use rpcnet::{RpcConfig, RpcServer}; use telemetry_streams::telemetry::{self, Ack, ChatMessage, LiveUpdate, MetricReading}; use tokio::time::{sleep, Duration}; #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let config = RpcConfig::new("certs/server-cert.pem", "127.0.0.1:9000") .with_key_path("certs/server-key.pem") .with_server_name("localhost"); let mut server = RpcServer::new(config); // Bidirectional chat: echo each message with a server tag. server .register_streaming("chat", |mut inbound| async move { stream! { while let Some(frame) = inbound.next().await { let msg: ChatMessage = telemetry::decode(&frame)?; let reply = ChatMessage { from: "server".into(), body: format!("ack: {}", msg.body), }; yield telemetry::encode(&reply); } } }) .await; // Server streaming: emit rolling averages for a requested sensor. server .register_streaming("subscribe_metrics", |mut inbound| async move { stream! { if let Some(frame) = inbound.next().await { let req: MetricReading = telemetry::decode(&frame)?; let mut window = vec![req.value]; for step in 1..=5 { sleep(Duration::from_millis(500)).await; window.push(req.value + step as f64); let avg = window.iter().copied().sum::<f64>() / window.len() as f64; let update = LiveUpdate { sensor: req.sensor.clone(), rolling_avg: avg }; yield telemetry::encode(&update); } } } }) .await; // Client streaming: collect readings and acknowledge how many we processed. server .register_streaming("upload_batch", |mut inbound| async move { stream! { let mut readings: Vec<MetricReading> = Vec::new(); while let Some(frame) = inbound.next().await { let reading: MetricReading = telemetry::decode(&frame)?; readings.push(reading); } let ack = Ack { accepted: readings.len() }; yield telemetry::encode(&ack); } }) .await; let quic_server = server.bind()?; println!("Telemetry server listening on 127.0.0.1:9000"); server.start(quic_server).await?; Ok(()) }
Key points:
register_streaming
receives a stream of request frames (Vec<u8>
) and must return a stream ofResult<Vec<u8>, RpcError>
responses.- The bidirectional handler echoes every inbound payload.
- The server-streaming handler reads a single subscription request and then pushes periodic updates without further client input.
- The client-streaming handler drains all incoming frames before returning one acknowledgement.
Step 6: Implement the client
Create src/bin/client.rs
to exercise each streaming helper:
// src/bin/client.rs use futures::{stream, StreamExt}; use rpcnet::{RpcClient, RpcConfig, RpcError}; use telemetry_streams::telemetry::{self, Ack, ChatMessage, LiveUpdate, MetricReading}; #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let config = RpcConfig::new("certs/server-cert.pem", "127.0.0.1:0") .with_server_name("localhost"); let client = RpcClient::connect("127.0.0.1:9000".parse()?, config).await?; chat_demo(&client).await?; server_stream_demo(&client).await?; client_stream_demo(&client).await?; Ok(()) } async fn chat_demo(client: &RpcClient) -> Result<(), RpcError> { println!("\n--- Bidirectional chat ---"); let messages = vec![ ChatMessage { from: "operator".into(), body: "ping".into() }, ChatMessage { from: "operator".into(), body: "status?".into() }, ]; let outbound_frames: Vec<Vec<u8>> = messages .into_iter() .map(|msg| telemetry::encode(&msg).expect("serialize chat message")) .collect(); let outbound = stream::iter(outbound_frames); let mut inbound = client.call_streaming("chat", outbound).await?; while let Some(frame) = inbound.next().await { let bytes = frame?; let reply: ChatMessage = telemetry::decode(&bytes)?; println!("reply: {}", reply.body); } Ok(()) } async fn server_stream_demo(client: &RpcClient) -> Result<(), RpcError> { println!("\n--- Server streaming ---"); let request = telemetry::encode(&MetricReading { sensor: "temp".into(), value: 21.0 })?; let mut updates = client .call_server_streaming("subscribe_metrics", request) .await?; while let Some(frame) = updates.next().await { let bytes = frame?; let update: LiveUpdate = telemetry::decode(&bytes)?; println!("rolling avg: {:.2}", update.rolling_avg); } Ok(()) } async fn client_stream_demo(client: &RpcClient) -> Result<(), RpcError> { println!("\n--- Client streaming ---"); let readings: Vec<Vec<u8>> = vec![ MetricReading { sensor: "temp".into(), value: 21.0 }, MetricReading { sensor: "temp".into(), value: 21.5 }, MetricReading { sensor: "temp".into(), value: 22.0 }, ] .into_iter() .map(|reading| telemetry::encode(&reading).expect("serialize reading")) .collect(); let outbound = stream::iter(readings); let ack_frame = client .call_client_streaming("upload_batch", outbound) .await?; let ack: Ack = telemetry::decode(&ack_frame)?; println!("server accepted {} readings", ack.accepted); Ok(()) }
The client demonstrates:
call_streaming
for true bidirectional messaging.call_server_streaming
when only the server produces a stream of frames.call_client_streaming
to upload many frames and receive one response.
Step 7: Run the scenario
Terminal 1 β start the server:
cargo run --bin server
Terminal 2 β launch the client:
cargo run --bin client
Expected output (trimmed for brevity):
--- Bidirectional chat ---
reply: ack: ping
reply: ack: status?
--- Server streaming ---
rolling avg: 21.00
rolling avg: 21.50
...
--- Client streaming ---
server accepted 3 readings
Where to go next
- Revisit the Concepts chapter for API reference material.
- Combine streaming RPCs with code-generated unary services from the Getting Started tutorial.
- Layer authentication, backpressure, or persistence around these handlers to match your production needs.
Performance Tuning
RpcNet achieves 172,000+ requests/second with proper configuration. This chapter provides concrete tips and techniques to maximize performance in production deployments.
Baseline Performance
Out-of-the-box performance with default settings:
Metric | Value | Notes |
---|---|---|
Throughput | 130K-150K RPS | Single director + 3 workers |
Latency (P50) | 0.5-0.8ms | With efficient connection handling |
Latency (P99) | 2-5ms | Under moderate load |
CPU (per node) | 40-60% | At peak throughput |
Memory | 50-100MB | Per worker node |
Target after tuning: 172K+ RPS, < 0.5ms P50 latency, < 35% CPU
Quick Wins
1. Optimize Connection Management
Impact: Significant throughput increase, reduced latency
#![allow(unused)] fn main() { use rpcnet::cluster::ClusterClientConfig; // Use built-in connection optimization let config = ClusterClientConfig::default(); }
Why it works:
- Efficient connection reuse
- Reduces handshake overhead
- Minimizes connection setup time
2. Use Least Connections Load Balancing
Impact: 15-20% throughput increase under variable load
#![allow(unused)] fn main() { use rpcnet::cluster::{WorkerRegistry, LoadBalancingStrategy}; // Before (Round Robin): uneven load distribution let registry = WorkerRegistry::new(cluster, LoadBalancingStrategy::RoundRobin); // After (Least Connections): optimal distribution let registry = WorkerRegistry::new(cluster, LoadBalancingStrategy::LeastConnections); }
Why it works:
- Prevents overloading individual workers
- Adapts to actual load in real-time
- Handles heterogeneous workers better
3. Tune Gossip Interval
Impact: 10-15% CPU reduction, minimal latency impact
#![allow(unused)] fn main() { use rpcnet::cluster::ClusterConfig; // Before (default 1s): higher CPU let config = ClusterConfig::default() .with_gossip_interval(Duration::from_secs(1)); // After (2s for stable networks): lower CPU let config = ClusterConfig::default() .with_gossip_interval(Duration::from_secs(2)); }
Why it works:
- Gossip overhead scales with frequency
- Stable networks don't need aggressive gossip
- Failure detection still fast enough (4-8s)
4. Increase Worker Pool Size
Impact: Linear throughput scaling
#![allow(unused)] fn main() { // Before: 3 workers β 150K RPS // After: 5 workers β 250K+ RPS // Each worker adds ~50K RPS capacity }
Guidelines:
- Add workers until you hit network/director bottleneck
- Monitor director CPU - scale director if > 80%
- Ensure network bandwidth sufficient
Detailed Tuning
Connection Management Optimization
RpcNet handles connection management automatically, but you can optimize for your specific use case:
#![allow(unused)] fn main() { use rpcnet::cluster::ClusterClientConfig; // Default configuration is optimized for most use cases let config = ClusterClientConfig::default(); }
QUIC Tuning
Stream Limits
#![allow(unused)] fn main() { use rpcnet::ServerConfig; let config = ServerConfig::builder() .with_max_concurrent_streams(100) // More streams = higher throughput .with_max_stream_bandwidth(10 * 1024 * 1024) // 10 MB/s per stream .build(); }
Guidelines:
- max_concurrent_streams: Set to expected concurrent requests + 20%
- max_stream_bandwidth: Set based on your largest message size
Congestion Control
#![allow(unused)] fn main() { // Aggressive (high-bandwidth networks) .with_congestion_control(CongestionControl::Cubic) // Conservative (variable networks) .with_congestion_control(CongestionControl::NewReno) // Recommended default .with_congestion_control(CongestionControl::Bbr) // Best overall }
TLS Optimization
Session Resumption
#![allow(unused)] fn main() { // Enable TLS session tickets for 0-RTT let config = ServerConfig::builder() .with_cert_and_key(cert, key)? .with_session_tickets_enabled(true) // β Enables 0-RTT .build(); }
Impact: First request after reconnect goes from 2-3 RTT to 0 RTT
Cipher Suite Selection
#![allow(unused)] fn main() { // Prefer fast ciphers (AES-GCM with hardware acceleration) .with_cipher_suites(&[ CipherSuite::TLS13_AES_128_GCM_SHA256, // Fast with AES-NI CipherSuite::TLS13_CHACHA20_POLY1305_SHA256, // Good for ARM ]) }
Message Serialization
Use Efficient Formats
#![allow(unused)] fn main() { // Fastest: bincode (binary) use bincode; let bytes = bincode::serialize(&data)?; // Fast: rmp-serde (MessagePack) use rmp_serde; let bytes = rmp_serde::to_vec(&data)?; // Slower: serde_json (human-readable, but slower) let bytes = serde_json::to_vec(&data)?; }
Benchmark (10KB struct):
Format | Serialize | Deserialize | Size |
---|---|---|---|
bincode | 12 ΞΌs | 18 ΞΌs | 10240 bytes |
MessagePack | 28 ΞΌs | 35 ΞΌs | 9800 bytes |
JSON | 85 ΞΌs | 120 ΞΌs | 15300 bytes |
Minimize Allocations
#![allow(unused)] fn main() { // β Bad: Multiple allocations fn build_request(id: u64, data: Vec<u8>) -> Request { Request { id: id.to_string(), // Allocation timestamp: SystemTime::now(), payload: format!("data-{}", String::from_utf8_lossy(&data)), // Multiple allocations } } // β Good: Reuse buffers fn build_request(id: u64, data: &[u8], buffer: &mut Vec<u8>) -> Request { buffer.clear(); buffer.extend_from_slice(b"data-"); buffer.extend_from_slice(data); Request { id, timestamp: SystemTime::now(), payload: buffer.clone(), // Single allocation } } }
Platform-Specific Optimizations
Linux
UDP/QUIC Tuning
# Increase network buffer sizes
sudo sysctl -w net.core.rmem_max=536870912
sudo sysctl -w net.core.wmem_max=536870912
sudo sysctl -w net.ipv4.tcp_rmem='4096 87380 536870912'
sudo sysctl -w net.ipv4.tcp_wmem='4096 87380 536870912'
# Increase UDP buffer (QUIC uses UDP)
sudo sysctl -w net.core.netdev_max_backlog=5000
# Increase connection tracking
sudo sysctl -w net.netfilter.nf_conntrack_max=1000000
# Make permanent: add to /etc/sysctl.conf
CPU Affinity
#![allow(unused)] fn main() { use core_affinity; // Pin worker threads to specific CPUs fn pin_to_core(core_id: usize) { let core_ids = core_affinity::get_core_ids().unwrap(); core_affinity::set_for_current(core_ids[core_id]); } // Usage in worker startup tokio::task::spawn_blocking(|| { pin_to_core(0); // Pin to CPU 0 // Worker processing logic }); }
macOS
Increase File Descriptors
# Check current limits
ulimit -n
# Increase (temporary)
ulimit -n 65536
# Make permanent: add to ~/.zshrc or ~/.bash_profile
echo "ulimit -n 65536" >> ~/.zshrc
Profiling and Monitoring
CPU Profiling
# Install perf (Linux)
sudo apt install linux-tools-common linux-tools-generic
# Profile RpcNet application
sudo perf record -F 99 -a -g -- cargo run --release --bin worker
sudo perf report
# Identify hot paths and optimize
Memory Profiling
# Use valgrind for memory analysis
cargo build --release
valgrind --tool=massif --massif-out-file=massif.out ./target/release/worker
# Visualize with massif-visualizer
ms_print massif.out
Tokio Console
# Add to Cargo.toml
[dependencies]
console-subscriber = "0.2"
#![allow(unused)] fn main() { // In main.rs console_subscriber::init(); // Run application and connect with tokio-console // cargo install tokio-console // tokio-console }
Benchmarking
Throughput Test
#![allow(unused)] fn main() { use std::time::Instant; async fn benchmark_throughput(client: Arc<ClusterClient>, duration_secs: u64) { let start = Instant::now(); let mut count = 0; while start.elapsed().as_secs() < duration_secs { match client.call_worker("compute", vec![], Some("role=worker")).await { Ok(_) => count += 1, Err(e) => eprintln!("Request failed: {}", e), } } let elapsed = start.elapsed().as_secs_f64(); let rps = count as f64 / elapsed; println!("Throughput: {:.0} requests/second", rps); println!("Total requests: {}", count); println!("Duration: {:.2}s", elapsed); } }
Latency Test
#![allow(unused)] fn main() { use hdrhistogram::Histogram; async fn benchmark_latency(client: Arc<ClusterClient>, num_requests: usize) { let mut histogram = Histogram::<u64>::new(3).unwrap(); for _ in 0..num_requests { let start = Instant::now(); let _ = client.call_worker("compute", vec![], Some("role=worker")).await; let latency_us = start.elapsed().as_micros() as u64; histogram.record(latency_us).unwrap(); } println!("Latency percentiles (ΞΌs):"); println!(" P50: {}", histogram.value_at_quantile(0.50)); println!(" P90: {}", histogram.value_at_quantile(0.90)); println!(" P99: {}", histogram.value_at_quantile(0.99)); println!(" P99.9: {}", histogram.value_at_quantile(0.999)); println!(" Max: {}", histogram.max()); } }
Load Test Script
#![allow(unused)] fn main() { // Concurrent load test async fn load_test( client: Arc<ClusterClient>, num_concurrent: usize, requests_per_task: usize, ) { let start = Instant::now(); let tasks: Vec<_> = (0..num_concurrent) .map(|_| { let client = client.clone(); tokio::spawn(async move { for _ in 0..requests_per_task { let _ = client.call_worker("compute", vec![], Some("role=worker")).await; } }) }) .collect(); for task in tasks { task.await.unwrap(); } let elapsed = start.elapsed().as_secs_f64(); let total_requests = num_concurrent * requests_per_task; let rps = total_requests as f64 / elapsed; println!("Load test results:"); println!(" Concurrency: {}", num_concurrent); println!(" Total requests: {}", total_requests); println!(" Duration: {:.2}s", elapsed); println!(" Throughput: {:.0} RPS", rps); } }
Performance Checklist
Before Production
- Use default connection management (already optimized)
- Use Least Connections load balancing
- Tune gossip interval for your network
- Configure QUIC stream limits
- Enable TLS session resumption
-
Profile with release build (
--release
) - Test under expected peak load
- Monitor CPU, memory, network utilization
- Set up latency tracking (P50, P99, P99.9)
- Configure OS-level network tuning
Monitoring in Production
#![allow(unused)] fn main() { // Essential metrics to track metrics::gauge!("rpc.throughput_rps", current_rps); metrics::gauge!("rpc.latency_p50_us", latency_p50); metrics::gauge!("rpc.latency_p99_us", latency_p99); metrics::gauge!("rpc.cpu_usage_pct", cpu_usage); metrics::gauge!("rpc.memory_mb", memory_mb); metrics::gauge!("pool.hit_rate", pool_hit_rate); metrics::gauge!("cluster.healthy_workers", healthy_count); }
Troubleshooting Performance Issues
High Latency
Symptoms: P99 latency > 10ms
Debug:
#![allow(unused)] fn main() { // Add timing to identify bottleneck let start = Instant::now(); let select_time = Instant::now(); let worker = registry.select_worker(Some("role=worker")).await?; println!("Worker selection: {:?}", select_time.elapsed()); let connect_time = Instant::now(); let conn = pool.get_or_connect(worker.addr).await?; println!("Connection: {:?}", connect_time.elapsed()); let call_time = Instant::now(); let result = conn.call("compute", data).await?; println!("RPC call: {:?}", call_time.elapsed()); println!("Total: {:?}", start.elapsed()); }
Common causes:
- Connection management issues (check network configuration)
- Slow workers (check worker CPU/memory)
- Network latency (move closer or add local workers)
Low Throughput
Symptoms: < 100K RPS with multiple workers
Debug:
#![allow(unused)] fn main() { // Check bottlenecks println!("Pool metrics: {:?}", pool.metrics()); println!("Worker count: {}", registry.worker_count().await); println!("Active connections: {}", pool.active_connections()); }
Common causes:
- Too few workers (add more)
- Network connectivity issues (check network configuration)
- Director CPU saturated (scale director)
- Network bandwidth limit (upgrade network)
High CPU Usage
Symptoms: > 80% CPU at low load
Debug:
# Profile with perf
sudo perf record -F 99 -a -g -- cargo run --release
sudo perf report
# Look for hot functions
Common causes:
- Too frequent gossip (increase interval)
- Excessive serialization (optimize message format)
- Inefficient connection handling (use latest RpcNet version)
- Debug build instead of release
Real-World Results
Case Study: Video Transcoding Cluster
Setup:
- 1 director
- 10 GPU workers
- 1000 concurrent clients
Before tuning: 45K RPS, 15ms P99 latency
After tuning: 180K RPS, 2ms P99 latency
Changes:
- Used optimized connection management
- Tuned gossip interval (1s β 2s)
- Used Least Connections strategy
- Optimized message serialization (JSON β bincode)
Next Steps
- Production Guide - Deploy optimized clusters
- Load Balancing - Strategy selection
References
- QUIC Performance - Protocol optimizations
- Linux Network Tuning - OS-level tuning
- Tokio Performance - Async runtime tips
Production Deployment
This guide covers best practices for deploying RpcNet clusters in production environments, including security, monitoring, high availability, and operational procedures.
Architecture Patterns
1. Basic Production Setup
Minimum viable production deployment:
Load Balancer (L4)
|
ββββββββββββββΌβββββββββββββ
β β β
ββββββΌββββ ββββββΌββββ ββββββΌββββ
βDirectorβ βDirectorβ βDirectorβ (3+ for HA)
β (HA) β β (HA) β β (HA) β
ββββββ¬ββββ ββββββ¬ββββ ββββββ¬ββββ
β β β
βββββββββ΄βββββββββββββ΄βββββββββββββ΄ββββββββ
β β
βββββΌβββββ ββββββββββ ββββββββββ ββββββββββΌβ
βWorker 1β βWorker 2β βWorker 3β βWorker N β
ββββββββββ ββββββββββ ββββββββββ βββββββββββ
Components:
- Load Balancer: Routes clients to healthy directors
- Directors (3+): Coordinator nodes in HA configuration
- Workers (N): Processing nodes, scale horizontally
2. Multi-Region Setup
For global deployments:
Region US-EAST Region EU-WEST
ββββββββββββββββββββββββββββ ββββββββββββββββββββββββββββ
β Director Cluster (3) β β Director Cluster (3) β
β Worker Pool (10+) β β Worker Pool (10+) β
ββββββββββββ¬ββββββββββββββββ βββββββββββββ¬βββββββββββββββ
β β
βββββββββββββ¬ββββββββββββββββββββ
β
Cross-region
Gossip Protocol
(optional coordination)
Benefits:
- Lower latency for regional clients
- Fault isolation (region failure doesn't affect others)
- Regulatory compliance (data locality)
3. Hybrid Edge Deployment
For edge computing scenarios:
Cloud (Central)
βββββββββββββββββββββββ
β Director Cluster β
β Worker Pool β
ββββββββββββ¬βββββββββββ
β
ββββββββββββΌβββββββββββ
β β β
ββββββΌββββ βββββΌβββββ βββββΌβββββ
β Edge 1 β β Edge 2 β β Edge 3 β
βWorkers β βWorkers β βWorkers β
ββββββββββ ββββββββββ ββββββββββ
Use cases:
- IoT workloads
- Low-latency requirements
- Bandwidth optimization
Security
TLS Configuration
Production Certificates
#![allow(unused)] fn main() { // β Bad: Self-signed certificates let cert = std::fs::read("self_signed.pem")?; // β Good: Proper CA-signed certificates let cert = std::fs::read("/etc/rpcnet/certs/server.crt")?; let key = std::fs::read("/etc/rpcnet/certs/server.key")?; let ca = std::fs::read("/etc/rpcnet/certs/ca.crt")?; let config = ServerConfig::builder() .with_cert_and_key(cert, key)? .with_ca_cert(ca)? // Verify clients .build(); }
Certificate Rotation
#![allow(unused)] fn main() { use tokio::time::{interval, Duration}; async fn rotate_certificates(server: Arc<Server>) { let mut check_interval = interval(Duration::from_secs(3600)); // Check hourly loop { check_interval.tick().await; // Check certificate expiry if certificate_expires_soon("/etc/rpcnet/certs/server.crt", 30).await? { log::warn!("Certificate expiring soon, rotating..."); // Load new certificate let new_cert = std::fs::read("/etc/rpcnet/certs/server.crt.new")?; let new_key = std::fs::read("/etc/rpcnet/certs/server.key.new")?; // Hot-reload without downtime server.reload_certificate(new_cert, new_key).await?; log::info!("Certificate rotated successfully"); } } } }
Authentication & Authorization
#![allow(unused)] fn main() { #[rpc_trait] pub trait SecureService { async fn process(&self, auth_token: String, data: Vec<u8>) -> Result<Response>; } #[rpc_impl] impl SecureService for Handler { async fn process(&self, auth_token: String, data: Vec<u8>) -> Result<Response> { // Verify token let claims = verify_jwt(&auth_token)?; // Check permissions if !claims.has_permission("compute:execute") { return Err(anyhow::anyhow!("Insufficient permissions")); } // Process request Ok(self.do_process(data).await?) } } }
Network Segmentation
βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Public Network β
β (Clients, Load Balancer) β
ββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββ
β Firewall
ββββββββββββββββββββββΌβββββββββββββββββββββββββββββββββ
β Management Network β
β (Directors, Monitoring, Logging) β
ββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββ
β Firewall
ββββββββββββββββββββββΌβββββββββββββββββββββββββββββββββ
β Worker Network β
β (Workers, Internal Communication) β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Firewall Rules:
# Public β Management: Only load balancer ports
iptables -A FORWARD -i public -o management -p tcp --dport 8080 -j ACCEPT
# Management β Workers: Full access
iptables -A FORWARD -i management -o workers -j ACCEPT
# Workers β Workers: Gossip protocol
iptables -A FORWARD -i workers -o workers -p udp --dport 7946 -j ACCEPT
Monitoring
Essential Metrics
#![allow(unused)] fn main() { use prometheus::{register_gauge, register_counter, register_histogram}; // Throughput let request_counter = register_counter!("rpc_requests_total", "Total RPC requests"); request_counter.inc(); // Latency let latency_histogram = register_histogram!( "rpc_latency_seconds", "RPC latency distribution", vec![0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0] ); latency_histogram.observe(duration.as_secs_f64()); // Health let healthy_workers = register_gauge!("cluster_healthy_workers", "Number of healthy workers"); healthy_workers.set(registry.healthy_count().await as f64); // Errors let error_counter = register_counter!("rpc_errors_total", "Total RPC errors", &["type"]); error_counter.with_label_values(&["timeout"]).inc(); }
Prometheus Integration
#![allow(unused)] fn main() { use prometheus::{Encoder, TextEncoder}; use warp::Filter; async fn start_metrics_server() { let metrics_route = warp::path!("metrics").map(|| { let encoder = TextEncoder::new(); let metric_families = prometheus::gather(); let mut buffer = vec![]; encoder.encode(&metric_families, &mut buffer).unwrap(); warp::reply::with_header( buffer, "Content-Type", "text/plain; charset=utf-8", ) }); warp::serve(metrics_route) .run(([0, 0, 0, 0], 9090)) .await; } }
Prometheus config (prometheus.yml
):
scrape_configs:
- job_name: 'rpcnet_directors'
static_configs:
- targets: ['director-1:9090', 'director-2:9090', 'director-3:9090']
- job_name: 'rpcnet_workers'
static_configs:
- targets: ['worker-1:9090', 'worker-2:9090', 'worker-3:9090']
Grafana Dashboards
Key panels:
- Throughput:
rate(rpc_requests_total[1m])
- Latency P99:
histogram_quantile(0.99, rpc_latency_seconds)
- Error Rate:
rate(rpc_errors_total[1m])
- Worker Health:
cluster_healthy_workers
Alerting
# alerts.yml
groups:
- name: rpcnet
interval: 30s
rules:
- alert: HighErrorRate
expr: rate(rpc_errors_total[5m]) > 0.05
for: 2m
annotations:
summary: "High RPC error rate detected"
- alert: LowWorkerCount
expr: cluster_healthy_workers < 3
for: 1m
annotations:
summary: "Less than 3 healthy workers available"
- alert: HighLatency
expr: histogram_quantile(0.99, rpc_latency_seconds) > 0.1
for: 5m
annotations:
summary: "P99 latency above 100ms"
Logging
Structured Logging
#![allow(unused)] fn main() { use tracing::{info, warn, error, instrument}; #[instrument(skip(data))] async fn process_request(request_id: Uuid, worker_id: Uuid, data: Vec<u8>) -> Result<Response> { info!( request_id = %request_id, worker_id = %worker_id, data_size = data.len(), "Processing request" ); match worker.call("compute", data).await { Ok(response) => { info!( request_id = %request_id, worker_id = %worker_id, response_size = response.len(), "Request completed" ); Ok(response) } Err(e) => { error!( request_id = %request_id, worker_id = %worker_id, error = %e, "Request failed" ); Err(e) } } } }
Log Aggregation
Fluentd config (fluent.conf
):
<source>
@type forward
port 24224
</source>
<match rpcnet.**>
@type elasticsearch
host elasticsearch.example.com
port 9200
index_name rpcnet
type_name logs
</match>
High Availability
Director HA Setup
#![allow(unused)] fn main() { // Each director is identical, configured via environment let director_id = Uuid::new_v4(); let cluster_config = ClusterConfig::default() .with_bind_addr(env::var("BIND_ADDR")?.parse()?) .with_seeds(parse_seeds(&env::var("SEED_NODES")?)?); let cluster = server.enable_cluster(cluster_config).await?; // Tag as director cluster.set_tag("role", "director"); cluster.set_tag("id", &director_id.to_string()); // All directors operate identically, clients can use any one }
Graceful Shutdown
#![allow(unused)] fn main() { use tokio::signal; async fn run_server(mut server: Server) -> Result<()> { // Spawn server task let server_handle = tokio::spawn(async move { server.run().await }); // Wait for shutdown signal signal::ctrl_c().await?; log::info!("Shutdown signal received, gracefully shutting down..."); // 1. Stop accepting new connections server.stop_accepting().await; // 2. Wait for in-flight requests (with timeout) tokio::time::timeout( Duration::from_secs(30), server.wait_for_in_flight() ).await?; // 3. Leave cluster gracefully cluster.leave().await?; // 4. Close connections server.shutdown().await?; log::info!("Shutdown complete"); Ok(()) } }
Health Checks
#![allow(unused)] fn main() { #[rpc_trait] pub trait HealthService { async fn health(&self) -> Result<HealthStatus>; async fn ready(&self) -> Result<ReadyStatus>; } #[derive(Serialize, Deserialize)] pub struct HealthStatus { pub healthy: bool, pub version: String, pub uptime_secs: u64, } #[derive(Serialize, Deserialize)] pub struct ReadyStatus { pub ready: bool, pub workers_available: usize, pub cluster_size: usize, } #[rpc_impl] impl HealthService for Handler { async fn health(&self) -> Result<HealthStatus> { Ok(HealthStatus { healthy: true, version: env!("CARGO_PKG_VERSION").to_string(), uptime_secs: self.start_time.elapsed().as_secs(), }) } async fn ready(&self) -> Result<ReadyStatus> { let workers = self.registry.worker_count().await; let cluster_size = self.cluster.node_count().await; Ok(ReadyStatus { ready: workers > 0, workers_available: workers, cluster_size, }) } } }
Kubernetes probes:
livenessProbe:
exec:
command:
- /usr/local/bin/health-check
- --endpoint=health
initialDelaySeconds: 10
periodSeconds: 10
readinessProbe:
exec:
command:
- /usr/local/bin/health-check
- --endpoint=ready
initialDelaySeconds: 5
periodSeconds: 5
Deployment
Docker
Dockerfile:
FROM rust:1.75 as builder
WORKDIR /app
COPY Cargo.toml Cargo.lock ./
COPY src ./src
RUN cargo build --release
FROM debian:bookworm-slim
RUN apt-get update && apt-get install -y \
ca-certificates \
&& rm -rf /var/lib/apt/lists/*
COPY --from=builder /app/target/release/director /usr/local/bin/
COPY --from=builder /app/target/release/worker /usr/local/bin/
# Expose ports
EXPOSE 8080 7946/udp
CMD ["director"]
Docker Compose (docker-compose.yml
):
version: '3.8'
services:
director-1:
image: rpcnet:latest
command: director
environment:
- DIRECTOR_ADDR=0.0.0.0:8080
- RUST_LOG=info
ports:
- "8080:8080"
- "7946:7946/udp"
worker-1:
image: rpcnet:latest
command: worker
environment:
- WORKER_LABEL=worker-1
- WORKER_ADDR=0.0.0.0:8081
- DIRECTOR_ADDR=director-1:8080
- RUST_LOG=info
depends_on:
- director-1
Kubernetes
Deployment (director-deployment.yaml
):
apiVersion: apps/v1
kind: Deployment
metadata:
name: rpcnet-director
spec:
replicas: 3
selector:
matchLabels:
app: rpcnet-director
template:
metadata:
labels:
app: rpcnet-director
spec:
containers:
- name: director
image: rpcnet:latest
command: ["director"]
env:
- name: DIRECTOR_ADDR
value: "0.0.0.0:8080"
- name: RUST_LOG
value: "info"
ports:
- containerPort: 8080
name: rpc
- containerPort: 7946
name: gossip
protocol: UDP
resources:
requests:
memory: "256Mi"
cpu: "500m"
limits:
memory: "512Mi"
cpu: "1000m"
Service (director-service.yaml
):
apiVersion: v1
kind: Service
metadata:
name: rpcnet-director
spec:
type: LoadBalancer
selector:
app: rpcnet-director
ports:
- name: rpc
port: 8080
targetPort: 8080
- name: gossip
port: 7946
targetPort: 7946
protocol: UDP
HorizontalPodAutoscaler:
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: rpcnet-worker-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: rpcnet-worker
minReplicas: 3
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
Configuration Management
Environment-Based Config
#![allow(unused)] fn main() { use config::{Config, Environment, File}; #[derive(Debug, Deserialize)] struct Settings { server: ServerSettings, cluster: ClusterSettings, monitoring: MonitoringSettings, } #[derive(Debug, Deserialize)] struct ServerSettings { bind_addr: String, cert_path: String, key_path: String, } fn load_config() -> Result<Settings> { let settings = Config::builder() // Default config .add_source(File::with_name("config/default")) // Environment-specific config (optional) .add_source(File::with_name(&format!("config/{}", env!("ENV"))).required(false)) // Environment variables (override) .add_source(Environment::with_prefix("RPCNET")) .build()?; settings.try_deserialize() } }
Secret Management
#![allow(unused)] fn main() { use aws_sdk_secretsmanager::Client as SecretsClient; async fn load_tls_certs_from_secrets() -> Result<(Vec<u8>, Vec<u8>)> { let config = aws_config::load_from_env().await; let client = SecretsClient::new(&config); // Load certificate let cert_secret = client .get_secret_value() .secret_id("rpcnet/production/tls_cert") .send() .await?; let cert = cert_secret.secret_binary().unwrap().as_ref().to_vec(); // Load key let key_secret = client .get_secret_value() .secret_id("rpcnet/production/tls_key") .send() .await?; let key = key_secret.secret_binary().unwrap().as_ref().to_vec(); Ok((cert, key)) } }
Operational Procedures
Rolling Updates
#!/bin/bash
# Rolling update script for workers
WORKERS=("worker-1" "worker-2" "worker-3" "worker-4")
for worker in "${WORKERS[@]}"; do
echo "Updating $worker..."
# Gracefully shutdown worker
kubectl exec $worker -- kill -SIGTERM 1
# Wait for worker to leave cluster
sleep 10
# Update image
kubectl set image deployment/rpcnet-worker worker=rpcnet:new-version
# Wait for new pod to be ready
kubectl wait --for=condition=ready pod -l app=$worker --timeout=60s
# Verify worker joined cluster
kubectl exec director-1 -- check-worker-registered $worker
echo "$worker updated successfully"
done
Backup and Restore
#![allow(unused)] fn main() { // Backup cluster state (metadata only, not data) async fn backup_cluster_state(cluster: Arc<ClusterMembership>) -> Result<()> { let state = ClusterState { nodes: cluster.nodes().await, timestamp: SystemTime::now(), }; let backup = serde_json::to_vec(&state)?; std::fs::write("/backup/cluster_state.json", backup)?; Ok(()) } // Restore from backup (for disaster recovery) async fn restore_cluster_state(path: &str) -> Result<ClusterState> { let backup = std::fs::read(path)?; let state: ClusterState = serde_json::from_slice(&backup)?; Ok(state) } }
Runbooks
Worker Node Failure:
- Verify failure:
kubectl get pods | grep worker
- Check logs:
kubectl logs <worker-pod>
- If recoverable:
kubectl delete pod <worker-pod>
(auto-restarts) - If not: Investigate root cause, fix, redeploy
- Verify cluster health:
kubectl exec director-1 -- cluster-health
High Latency:
- Check Grafana: Identify which nodes have high latency
- SSH to affected nodes:
ssh worker-5
- Check CPU/memory:
top
,free -h
- Check network:
netstat -s
,iftop
- Review logs:
journalctl -u rpcnet-worker -n 1000
- If needed: Scale up workers or restart affected nodes
Cost Optimization
Resource Sizing
#![allow(unused)] fn main() { // Right-size based on actual usage async fn recommend_sizing(metrics: &Metrics) -> Recommendation { let avg_cpu = metrics.avg_cpu_usage(); let avg_memory = metrics.avg_memory_usage(); let p99_cpu = metrics.p99_cpu_usage(); if avg_cpu < 30.0 && p99_cpu < 60.0 { Recommendation::DownsizeWorkers } else if p99_cpu > 80.0 { Recommendation::UpsizeWorkers } else { Recommendation::CurrentSizingOptimal } } }
Auto-Scaling
# Scale workers based on request rate
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: rpcnet-worker-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: rpcnet-worker
minReplicas: 2
maxReplicas: 20
metrics:
- type: Pods
pods:
metric:
name: rpc_requests_per_second
target:
type: AverageValue
averageValue: "5000" # Scale when > 5K RPS per worker
Checklist
Pre-Deployment
- TLS certificates from trusted CA
- Secrets stored in secret manager (not env vars)
- Monitoring and alerting configured
- Log aggregation set up
- Health checks implemented
- Graceful shutdown handling
- Resource limits configured
- Auto-scaling rules defined
- Backup procedures tested
- Runbooks documented
Post-Deployment
- Verify all nodes healthy
- Check metrics dashboards
- Test failover scenarios
- Validate performance (latency, throughput)
- Review logs for errors
- Test rolling updates
- Verify backups working
- Update documentation
Next Steps
- Performance Tuning - Optimize for production load
- Failure Handling - Handle production incidents
- Migration Guide - Migrate existing systems
References
- Kubernetes Best Practices - K8s configuration
- Prometheus Monitoring - Metrics best practices
- AWS Well-Architected - Cloud architecture patterns
Migration Guide
This guide helps you migrate from manual worker management patterns to RpcNet's built-in cluster features, reducing code complexity and improving reliability.
Why Migrate?
Before: Manual Worker Management
Typical manual pattern requires ~200 lines of boilerplate:
#![allow(unused)] fn main() { // Custom worker tracking struct WorkerPool { workers: Arc<Mutex<HashMap<Uuid, WorkerInfo>>>, next_idx: Arc<Mutex<usize>>, } struct WorkerInfo { id: Uuid, addr: SocketAddr, label: String, last_ping: Instant, } impl WorkerPool { // Manual registration async fn register_worker(&self, info: WorkerInfo) -> Uuid { let id = Uuid::new_v4(); self.workers.lock().await.insert(id, info); id } // Manual round-robin selection async fn get_next_worker(&self) -> Option<WorkerInfo> { let workers = self.workers.lock().await; if workers.is_empty() { return None; } let mut idx = self.next_idx.lock().await; let worker_list: Vec<_> = workers.values().collect(); let worker = worker_list[*idx % worker_list.len()].clone(); *idx += 1; Some(worker) } // Manual health checking async fn check_health(&self) { let mut workers = self.workers.lock().await; workers.retain(|_, worker| { worker.last_ping.elapsed() < Duration::from_secs(30) }); } } }
Problems:
- β No automatic discovery
- β Basic round-robin only
- β Simple timeout-based health checks
- β Manual connection management
- β No partition detection
- β ~200+ lines of error-prone code
After: Built-in Cluster Features
With RpcNet's cluster - only ~50 lines:
#![allow(unused)] fn main() { use rpcnet::cluster::{WorkerRegistry, LoadBalancingStrategy, ClusterClient}; // Automatic discovery + load balancing + health checking let registry = Arc::new(WorkerRegistry::new( cluster, LoadBalancingStrategy::LeastConnections )); registry.start().await; let client = Arc::new(ClusterClient::new(registry, config)); // That's it! Everything else is automatic: let result = client.call_worker("compute", data, Some("role=worker")).await?; }
Benefits:
- β Automatic discovery via gossip
- β Multiple load balancing strategies
- β Phi Accrual failure detection
- β Efficient connection management
- β Partition detection
- β 75% code reduction
Migration Steps
Step 1: Add Cluster Feature
Update Cargo.toml
:
[dependencies]
# Before
rpcnet = "0.2"
# After
rpcnet = { version = "0.2", features = ["cluster"] }
Step 2: Enable Cluster on Server
Replace manual worker registration with cluster:
#![allow(unused)] fn main() { // Before: Manual RPC endpoint for registration #[rpc_trait] pub trait DirectorService { async fn register_worker(&self, info: WorkerInfo) -> Result<Uuid>; } // After: Enable cluster on server let cluster_config = ClusterConfig::default() .with_bind_addr(bind_addr.parse()?); let cluster = server.enable_cluster(cluster_config).await?; // Tag for discovery cluster.set_tag("role", "director"); }
Step 3: Replace WorkerPool with WorkerRegistry
#![allow(unused)] fn main() { // Before: Custom WorkerPool let worker_pool = Arc::new(WorkerPool::new()); // Spawn health checker tokio::spawn({ let pool = worker_pool.clone(); async move { loop { pool.check_health().await; tokio::time::sleep(Duration::from_secs(10)).await; } } }); // After: Built-in WorkerRegistry let registry = Arc::new(WorkerRegistry::new( cluster, LoadBalancingStrategy::LeastConnections )); registry.start().await; // Automatic health checking included! }
Step 4: Update Worker Startup
#![allow(unused)] fn main() { // Before: Worker calls register RPC let director_client = DirectorClient::connect(&director_addr, config).await?; let worker_id = director_client.register_worker(WorkerInfo { label: worker_label, addr: worker_addr, }).await?; // After: Worker joins cluster let cluster_config = ClusterConfig::default() .with_bind_addr(worker_addr.parse()?); let cluster = server.enable_cluster(cluster_config).await?; cluster.join(vec![director_addr.parse()?]).await?; // Tag for discovery cluster.set_tag("role", "worker"); cluster.set_tag("label", &worker_label); }
Step 5: Replace Manual Selection with ClusterClient
#![allow(unused)] fn main() { // Before: Manual worker selection + connection let worker = worker_pool.get_next_worker().await .ok_or_else(|| anyhow::anyhow!("No workers available"))?; let conn = Connection::connect(&worker.addr, client_config).await?; let result = conn.call("compute", data).await?; // After: Automatic selection + pooled connection let result = cluster_client.call_worker("compute", data, Some("role=worker")).await?; }
Step 6: Remove Manual Health Checks
#![allow(unused)] fn main() { // Before: Periodic ping to check health tokio::spawn(async move { loop { for worker in workers.iter() { match ping_worker(&worker.addr).await { Ok(_) => worker.last_ping = Instant::now(), Err(_) => remove_worker(worker.id).await, } } tokio::time::sleep(Duration::from_secs(10)).await; } }); // After: Nothing! Phi Accrual + gossip handles it automatically // Just subscribe to events if you want notifications: let mut events = cluster.subscribe(); tokio::spawn(async move { while let Some(event) = events.recv().await { match event { ClusterEvent::NodeFailed(node) => { log::error!("Worker {} failed", node.id); } _ => {} } } }); }
Migration Examples
Example 1: Simple Director-Worker
Before (Manual)
// director.rs - ~150 lines struct Director { workers: Arc<Mutex<HashMap<Uuid, WorkerInfo>>>, next_idx: Arc<Mutex<usize>>, } #[rpc_impl] impl DirectorService for Director { async fn register_worker(&self, info: WorkerInfo) -> Result<Uuid> { let id = Uuid::new_v4(); self.workers.lock().await.insert(id, info); Ok(id) } async fn get_worker(&self) -> Result<WorkerInfo> { let workers = self.workers.lock().await; if workers.is_empty() { return Err(anyhow::anyhow!("No workers")); } let mut idx = self.next_idx.lock().await; let worker_list: Vec<_> = workers.values().collect(); let worker = worker_list[*idx % worker_list.len()].clone(); *idx += 1; Ok(worker) } } // worker.rs - ~50 lines async fn main() -> Result<()> { let mut server = Server::new(config); server.register_service(Arc::new(WorkerHandler)); server.bind(&worker_addr).await?; // Register with director let director_client = DirectorClient::connect(&director_addr, config).await?; director_client.register_worker(WorkerInfo { label: worker_label, addr: worker_addr, }).await?; server.run().await?; Ok(()) }
Total: ~200 lines
After (Cluster)
// director.rs - ~50 lines async fn main() -> Result<()> { let mut server = Server::new(config); // Enable cluster let cluster = server.enable_cluster(cluster_config).await?; cluster.set_tag("role", "director"); // Create registry let registry = Arc::new(WorkerRegistry::new( cluster, LoadBalancingStrategy::LeastConnections )); registry.start().await; server.bind(&director_addr).await?; server.run().await?; Ok(()) } // worker.rs - ~30 lines async fn main() -> Result<()> { let mut server = Server::new(config); server.register_service(Arc::new(WorkerHandler)); server.bind(&worker_addr).await?; // Join cluster let cluster = server.enable_cluster(cluster_config).await?; cluster.join(vec![director_addr.parse()?]).await?; cluster.set_tag("role", "worker"); cluster.set_tag("label", &worker_label); server.run().await?; Ok(()) }
Total: ~80 lines (60% reduction)
Example 2: Connection Swap Pattern
The old connection_swap
example has been replaced by the cluster
example which uses built-in features.
Migration Path
- Remove custom WorkerPool β Use
WorkerRegistry
- Remove manual registration RPC β Use gossip discovery
- Remove health check pings β Use Phi Accrual
- Keep application logic unchanged β RPC interfaces stay the same
See: examples/cluster/
for complete working example
Feature Comparison
Feature | Manual Pattern | Built-in Cluster |
---|---|---|
Discovery | Manual RPC registration | Automatic via gossip |
Load Balancing | Basic round-robin | Round Robin, Random, Least Connections |
Health Checking | Timeout-based ping | Phi Accrual algorithm |
Failure Detection | Simple timeout | Indirect probes + Phi |
Connection Management | Manual implementation | Built-in optimization |
Partition Detection | Not available | Automatic |
Code Complexity | ~200 lines | ~50 lines |
Maintenance | High (custom code) | Low (battle-tested) |
Common Migration Issues
Issue 1: Port Conflicts
Problem: Gossip protocol uses UDP, might conflict with existing services.
Solution: Configure gossip port explicitly
#![allow(unused)] fn main() { let cluster_config = ClusterConfig::default() .with_bind_addr("0.0.0.0:7946".parse()?) // Gossip on different port .with_gossip_port(7947); // Custom gossip port }
Issue 2: Firewall Rules
Problem: Gossip UDP traffic blocked by firewall.
Solution: Allow UDP traffic between cluster nodes
# Allow gossip protocol
iptables -A INPUT -p udp --dport 7946 -j ACCEPT
iptables -A OUTPUT -p udp --sport 7946 -j ACCEPT
Issue 3: Existing Health Check Logic
Problem: Have custom health check logic that needs to be preserved.
Solution: Combine with cluster events
#![allow(unused)] fn main() { // Keep custom health checks async fn custom_health_check(worker: &Worker) -> bool { // Your custom logic worker.cpu_usage < 80.0 && worker.memory_available > 1_000_000 } // Use alongside cluster events let mut events = cluster.subscribe(); while let Some(event) = events.recv().await { if let ClusterEvent::NodeFailed(node) = event { // Cluster detected failure handle_failure(node).await; } } // Periodic custom checks tokio::spawn(async move { loop { for worker in registry.workers().await { if !custom_health_check(&worker).await { log::warn!("Custom health check failed for {}", worker.label); } } tokio::time::sleep(Duration::from_secs(30)).await; } }); }
Issue 4: Different Node Roles
Problem: Have multiple types of nodes (coordinator, worker, storage, etc.).
Solution: Use tags to differentiate
#![allow(unused)] fn main() { // Coordinator cluster.set_tag("role", "coordinator"); // GPU worker cluster.set_tag("role", "worker"); cluster.set_tag("gpu", "true"); // CPU worker cluster.set_tag("role", "worker"); cluster.set_tag("cpu_only", "true"); // Select by role let gpu_worker = registry.select_worker(Some("gpu=true")).await?; let any_worker = registry.select_worker(Some("role=worker")).await?; }
Testing After Migration
Unit Tests
#![allow(unused)] fn main() { #[tokio::test] async fn test_worker_discovery() { // Start director let director = start_test_director().await; // Start worker let worker = start_test_worker().await; worker.join(vec![director.addr()]).await.unwrap(); // Wait for discovery tokio::time::sleep(Duration::from_secs(2)).await; // Verify worker discovered let workers = director.registry().workers().await; assert_eq!(workers.len(), 1); assert_eq!(workers[0].tags.get("role"), Some(&"worker".to_string())); } #[tokio::test] async fn test_load_balancing() { let director = start_test_director().await; // Start 3 workers let worker1 = start_test_worker("worker-1").await; let worker2 = start_test_worker("worker-2").await; let worker3 = start_test_worker("worker-3").await; // Make 100 requests let mut worker_counts = HashMap::new(); for _ in 0..100 { let result = director.call_worker("compute", vec![]).await.unwrap(); *worker_counts.entry(result.worker_label).or_insert(0) += 1; } // Verify distribution (should be roughly equal) assert!(worker_counts.get("worker-1").unwrap() > &20); assert!(worker_counts.get("worker-2").unwrap() > &20); assert!(worker_counts.get("worker-3").unwrap() > &20); } }
Integration Tests
# Test full cluster
cargo test --features cluster --test integration_tests
# Test failure scenarios
cargo test --features cluster --test failure_tests
# Test with actual network (examples)
cd examples/cluster
cargo run --bin director &
cargo run --bin worker &
cargo run --bin client
Rollback Plan
If migration causes issues, you can rollback:
Option 1: Feature Flag
#![allow(unused)] fn main() { #[cfg(feature = "use-cluster")] use rpcnet::cluster::{WorkerRegistry, ClusterClient}; #[cfg(not(feature = "use-cluster"))] use crate::manual_pool::WorkerPool; // Toggle between old and new with feature flag }
Option 2: Gradual Migration
#![allow(unused)] fn main() { // Run both systems in parallel temporarily let manual_pool = Arc::new(WorkerPool::new()); // Old system let cluster_registry = Arc::new(WorkerRegistry::new(cluster, strategy)); // New system // Route percentage of traffic to new system if rand::random::<f64>() < 0.10 { // 10% to new system cluster_registry.select_worker(filter).await } else { manual_pool.get_next_worker().await // 90% to old system } // Gradually increase percentage over time }
Checklist
Pre-Migration
- Review current worker management code
- Identify custom health check logic to preserve
- Plan firewall rule changes for gossip
- Write tests for current behavior
- Create rollback plan
During Migration
- Add cluster feature to Cargo.toml
- Enable cluster on servers
- Replace WorkerPool with WorkerRegistry
- Update worker startup (join instead of register)
- Remove manual health checks
- Test in staging environment
Post-Migration
- Verify worker discovery working
- Check load balancing distribution
- Monitor failure detection
- Validate performance metrics
- Remove old worker pool code
- Update documentation
Performance Impact
Before migration:
- Manual round-robin: ~100K RPS
- Timeout-based health: 30s detection time
- Manual connection handling: 20-50ms latency
After migration:
- Least Connections: 172K+ RPS (70% increase)
- Phi Accrual: 6-8s detection time (better accuracy)
- Built-in connection management: <1ms latency (98% reduction)
Next Steps
- Cluster Tutorial - Build cluster from scratch
- Production Guide - Deploy migrated cluster
- Performance Tuning - Optimize new setup
References
- Cluster Example - Complete working example
- SWIM Paper - Gossip protocol details
- Phi Accrual Paper - Failure detection algorithm
API Reference
Quick reference for RpcNet's most commonly used APIs. For complete documentation, see the API docs.
Core Types
Server
Creates and manages RPC servers.
#![allow(unused)] fn main() { use rpcnet::{Server, ServerConfig}; // Create server let config = ServerConfig::builder() .with_cert_and_key(cert, key)? .build(); let mut server = Server::new(config); // Register services server.register_service(Arc::new(MyService)); // Bind and run server.bind("0.0.0.0:8080").await?; server.run().await?; }
Key methods:
new(config)
- Create server with configurationregister_service(service)
- Register RPC service handlerbind(addr)
- Bind to addressenable_cluster(config)
- Enable cluster featuresrun()
- Start server (blocks until shutdown)shutdown()
- Gracefully shut down server
Client
Connects to RPC servers and makes requests.
#![allow(unused)] fn main() { use rpcnet::{Client, ClientConfig}; // Create client let config = ClientConfig::builder() .with_server_cert(cert)? .build(); // Connect let client = MyServiceClient::connect("server.example.com:8080", config).await?; // Make request let response = client.my_method(args).await?; }
Key methods:
connect(addr, config)
- Connect to server- Generated methods per RPC trait
- Auto-reconnect on connection loss
Cluster APIs
ClusterMembership
Manages node membership via SWIM gossip protocol.
#![allow(unused)] fn main() { use rpcnet::cluster::ClusterMembership; // Create cluster let config = ClusterConfig::default() .with_bind_addr("0.0.0.0:7946".parse()?); let cluster = ClusterMembership::new(config).await?; // Join via seed nodes cluster.join(vec!["seed.example.com:7946".parse()?]).await?; // Tag node cluster.set_tag("role", "worker"); // Subscribe to events let mut events = cluster.subscribe(); while let Some(event) = events.recv().await { // Handle cluster events } }
Key methods:
new(config)
- Create cluster membershipjoin(seeds)
- Join cluster via seed nodesleave()
- Gracefully leave clusterset_tag(key, value)
- Set metadata tagget_tag(key)
- Get metadata tagnodes()
- Get all cluster nodessubscribe()
- Subscribe to cluster eventslocal_node_id()
- Get local node ID
WorkerRegistry
Tracks worker nodes with load balancing.
#![allow(unused)] fn main() { use rpcnet::cluster::{WorkerRegistry, LoadBalancingStrategy}; // Create registry let registry = Arc::new(WorkerRegistry::new( cluster, LoadBalancingStrategy::LeastConnections )); // Start monitoring registry.start().await; // Select worker let worker = registry.select_worker(Some("role=worker")).await?; println!("Selected: {} at {}", worker.label, worker.addr); // Get all workers let workers = registry.workers().await; }
Key methods:
new(cluster, strategy)
- Create registrystart()
- Start monitoring cluster eventsselect_worker(filter)
- Select worker by tag filterworkers()
- Get all workersworker_count()
- Get number of workerssubscribe()
- Subscribe to registry events
NodeRegistry
Tracks all cluster nodes.
#![allow(unused)] fn main() { use rpcnet::cluster::NodeRegistry; // Create registry let registry = Arc::new(NodeRegistry::new(cluster)); registry.start().await; // Get all nodes let nodes = registry.nodes().await; // Filter by tag let directors = nodes.iter() .filter(|n| n.tags.get("role") == Some(&"director".to_string())) .collect::<Vec<_>>(); }
Key methods:
new(cluster)
- Create node registrystart()
- Start monitoring clusternodes()
- Get all nodesnode_count()
- Count nodessubscribe()
- Subscribe to events
ClusterClient
High-level API for calling workers.
#![allow(unused)] fn main() { use rpcnet::cluster::{ClusterClient, ClusterClientConfig}; // Create client let config = ClusterClientConfig::default(); let client = Arc::new(ClusterClient::new(registry, config)); // Call any worker let result = client.call_worker("compute", request, Some("role=worker")).await?; }
Key methods:
new(registry, config)
- Create cluster clientcall_worker(method, data, filter)
- Call any worker matching filter
Configuration
ServerConfig
#![allow(unused)] fn main() { use rpcnet::ServerConfig; let config = ServerConfig::builder() .with_cert_and_key(cert, key)? // TLS certificate and key .with_ca_cert(ca)? // CA certificate for client verification .with_max_concurrent_streams(100)? // Max concurrent QUIC streams .with_max_idle_timeout(Duration::from_secs(30))? // Idle timeout .build(); }
ClientConfig
#![allow(unused)] fn main() { use rpcnet::ClientConfig; let config = ClientConfig::builder() .with_server_cert(cert)? // Server certificate .with_ca_cert(ca)? // CA certificate .with_connect_timeout(Duration::from_secs(5))? // Connection timeout .build(); }
ClusterConfig
#![allow(unused)] fn main() { use rpcnet::cluster::ClusterConfig; let config = ClusterConfig::default() .with_bind_addr("0.0.0.0:7946".parse()?) .with_gossip_interval(Duration::from_secs(1)) .with_health_check_interval(Duration::from_secs(2)) .with_phi_threshold(8.0); }
Code Generation
RPC Trait Definition
#![allow(unused)] fn main() { use rpcnet::prelude::*; #[rpc_trait] pub trait MyService { async fn my_method(&self, arg1: String, arg2: i32) -> Result<Response>; async fn streaming(&self, request: Request) -> impl Stream<Item = Result<Chunk>>; } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Response { pub data: Vec<u8>, } }
Generate Code
rpcnet-gen --input my_service.rpc.rs --output src/generated
Use Generated Code
#![allow(unused)] fn main() { mod generated; use generated::my_service::*; // Server side #[rpc_impl] impl MyService for Handler { async fn my_method(&self, arg1: String, arg2: i32) -> Result<Response> { // Implementation } } // Client side let client = MyServiceClient::connect(addr, config).await?; let response = client.my_method("test".to_string(), 42).await?; }
Streaming
Server-Side Streaming
#![allow(unused)] fn main() { #[rpc_trait] pub trait StreamService { async fn stream_data(&self, count: usize) -> impl Stream<Item = Result<Data>>; } #[rpc_impl] impl StreamService for Handler { async fn stream_data(&self, count: usize) -> impl Stream<Item = Result<Data>> { futures::stream::iter(0..count).map(|i| { Ok(Data { value: i }) }) } } }
Client-Side Streaming
#![allow(unused)] fn main() { #[rpc_trait] pub trait UploadService { async fn upload(&self, stream: impl Stream<Item = Chunk>) -> Result<Summary>; } // Client usage let chunks = futures::stream::iter(vec![chunk1, chunk2, chunk3]); let summary = client.upload(chunks).await?; }
Bidirectional Streaming
#![allow(unused)] fn main() { #[rpc_trait] pub trait ChatService { async fn chat(&self, stream: impl Stream<Item = Message>) -> impl Stream<Item = Result<Message>>; } }
Load Balancing Strategies
#![allow(unused)] fn main() { use rpcnet::cluster::LoadBalancingStrategy; // Round Robin - even distribution LoadBalancingStrategy::RoundRobin // Random - stateless selection LoadBalancingStrategy::Random // Least Connections - pick least loaded (recommended) LoadBalancingStrategy::LeastConnections }
Cluster Events
#![allow(unused)] fn main() { use rpcnet::cluster::ClusterEvent; let mut events = cluster.subscribe(); while let Some(event) = events.recv().await { match event { ClusterEvent::NodeJoined(node) => { println!("Node {} joined at {}", node.id, node.addr); } ClusterEvent::NodeLeft(node) => { println!("Node {} left", node.id); } ClusterEvent::NodeFailed(node) => { println!("Node {} failed", node.id); } ClusterEvent::NodeUpdated(node) => { println!("Node {} updated", node.id); } ClusterEvent::PartitionDetected(minority, majority) => { println!("Partition detected!"); } } } }
Error Handling
#![allow(unused)] fn main() { use rpcnet::{Error, ErrorKind}; match client.call("method", args).await { Ok(response) => { // Handle success } Err(e) => { match e.kind() { ErrorKind::ConnectionFailed => { // Connection issue, retry with different worker } ErrorKind::Timeout => { // Request timed out } ErrorKind::SerializationError => { // Data serialization failed } ErrorKind::ApplicationError => { // Application-level error from handler } _ => { // Other errors } } } } }
Common Patterns
Health Check Endpoint
#![allow(unused)] fn main() { #[rpc_trait] pub trait HealthService { async fn health(&self) -> Result<HealthStatus>; } #[derive(Serialize, Deserialize)] pub struct HealthStatus { pub healthy: bool, pub version: String, pub uptime_secs: u64, } }
Graceful Shutdown
#![allow(unused)] fn main() { use tokio::signal; async fn run(mut server: Server, cluster: Arc<ClusterMembership>) -> Result<()> { let server_task = tokio::spawn(async move { server.run().await }); signal::ctrl_c().await?; // Leave cluster gracefully cluster.leave().await?; // Wait for in-flight requests server.shutdown().await?; Ok(()) } }
Connection Retry
#![allow(unused)] fn main() { async fn call_with_retry<T>( f: impl Fn() -> Pin<Box<dyn Future<Output = Result<T>>>>, max_retries: usize, ) -> Result<T> { for attempt in 0..max_retries { match f().await { Ok(result) => return Ok(result), Err(e) if attempt < max_retries - 1 => { tokio::time::sleep(Duration::from_millis(100 * 2_u64.pow(attempt as u32))).await; } Err(e) => return Err(e), } } unreachable!() } }
Environment Variables
Common environment variables used in examples:
# Director
DIRECTOR_ADDR=127.0.0.1:61000
RUST_LOG=info
# Worker
WORKER_LABEL=worker-1
WORKER_ADDR=127.0.0.1:62001
DIRECTOR_ADDR=127.0.0.1:61000
# Client
CLIENT_ID=client-1
# Logging
RUST_LOG=rpcnet=debug,my_app=info
Feature Flags
[dependencies]
rpcnet = { version = "0.2", features = ["cluster", "metrics"] }
Available features:
cluster
- Enable cluster features (WorkerRegistry, ClusterClient, etc.)metrics
- Enable Prometheus metricscodegen
- Enable code generation support (always included in v0.2+)
Quick Examples
Simple RPC Server
use rpcnet::prelude::*; #[rpc_trait] pub trait Echo { async fn echo(&self, msg: String) -> Result<String>; } #[rpc_impl] impl Echo for Handler { async fn echo(&self, msg: String) -> Result<String> { Ok(msg) } } #[tokio::main] async fn main() -> Result<()> { let config = ServerConfig::builder() .with_cert_and_key(cert, key)? .build(); let mut server = Server::new(config); server.register_service(Arc::new(Handler)); server.bind("0.0.0.0:8080").await?; server.run().await?; Ok(()) }
Simple RPC Client
#[tokio::main] async fn main() -> Result<()> { let config = ClientConfig::builder() .with_server_cert(cert)? .build(); let client = EchoClient::connect("localhost:8080", config).await?; let response = client.echo("Hello!".to_string()).await?; println!("Response: {}", response); Ok(()) }
Next Steps
- Examples - Complete example programs
- Cluster Tutorial - Build a cluster
- API Documentation - Full API docs
Example Programs
This page indexes all example programs included in the RpcNet repository. Each example demonstrates specific features and can be run locally.
Repository Structure
All examples are located in the examples/
directory:
examples/
βββ cluster/ - Distributed cluster with auto-discovery
βββ (more to come)
Cluster Example
Location: examples/cluster/
Documentation: Cluster Example Chapter
Demonstrates RpcNet's distributed cluster features with automatic service discovery, load balancing, and failure handling.
Components
Director (examples/cluster/src/bin/director.rs
)
- Coordinator node for the cluster
- Uses
WorkerRegistry
for auto-discovery - Implements load-balanced request routing
- Monitors worker pool health
Worker (examples/cluster/src/bin/worker.rs
)
- Processing node that joins cluster automatically
- Tags itself with
role=worker
for discovery - Handles compute tasks
- Supports failure simulation for testing
Client (examples/cluster/src/bin/client.rs
)
- Connects through director
- Establishes direct connections to workers
- Handles worker failover automatically
- Demonstrates streaming requests
Quick Start
# Terminal 1: Start Director
DIRECTOR_ADDR=127.0.0.1:61000 \
RUST_LOG=info \
cargo run --manifest-path examples/cluster/Cargo.toml --bin director
# Terminal 2: Start Worker A
WORKER_LABEL=worker-a \
WORKER_ADDR=127.0.0.1:62001 \
DIRECTOR_ADDR=127.0.0.1:61000 \
RUST_LOG=info \
cargo run --manifest-path examples/cluster/Cargo.toml --bin worker
# Terminal 3: Start Worker B
WORKER_LABEL=worker-b \
WORKER_ADDR=127.0.0.1:62002 \
DIRECTOR_ADDR=127.0.0.1:61000 \
RUST_LOG=info \
cargo run --manifest-path examples/cluster/Cargo.toml --bin worker
# Terminal 4: Run Client
DIRECTOR_ADDR=127.0.0.1:61000 \
RUST_LOG=info \
cargo run --manifest-path examples/cluster/Cargo.toml --bin client
Features Demonstrated
- β Automatic Discovery: Workers join via SWIM gossip protocol
- β
Load Balancing: Uses
LeastConnections
strategy - β Health Checking: Phi Accrual failure detection
- β Failover: Client handles worker failures gracefully
- β Streaming: Server-side streaming responses
- β Tag-Based Routing: Filter workers by role
- β Cluster Events: Monitor node joined/left/failed
Testing Scenarios
1. Normal Operation:
- Start director + 2 workers + client
- Observe load distribution across workers
- Watch streaming responses flow
2. Worker Failure:
# Enable failure simulation
WORKER_FAILURE_ENABLED=true cargo run --bin worker
- Worker cycles through failures every ~18 seconds
- Client detects failures and switches workers
- Streaming continues with minimal interruption
3. Hard Kill:
- Press
Ctrl+C
on a worker - Director detects failure via gossip
- Client fails over to remaining workers
4. Worker Restart:
- Restart killed worker
- Automatic re-discovery and re-integration
- Load distribution resumes
Configuration Options
Director:
DIRECTOR_ADDR
- Bind address (default:127.0.0.1:61000
)RUST_LOG
- Log level (e.g.,info
,debug
)
Worker:
WORKER_LABEL
- Worker identifier (default:worker-1
)WORKER_ADDR
- Bind address (default:127.0.0.1:62001
)DIRECTOR_ADDR
- Director address (default:127.0.0.1:61000
)WORKER_FAILURE_ENABLED
- Enable failure simulation (default:false
)RUST_LOG
- Log level
Client:
DIRECTOR_ADDR
- Director address (default:127.0.0.1:61000
)RUST_LOG
- Log level
Code Highlights
Worker Auto-Discovery (worker.rs
):
#![allow(unused)] fn main() { // Join cluster let cluster = server.enable_cluster(cluster_config).await?; cluster.join(vec![director_addr.parse()?]).await?; // Tag for discovery cluster.set_tag("role", "worker"); cluster.set_tag("label", &worker_label); }
Load-Balanced Selection (director.rs
):
#![allow(unused)] fn main() { // Create registry with load balancing let registry = Arc::new(WorkerRegistry::new( cluster, LoadBalancingStrategy::LeastConnections )); // Select worker automatically let worker = registry.select_worker(Some("role=worker")).await?; }
Client Failover (client.rs
):
#![allow(unused)] fn main() { // Try worker match worker_client.generate(request).await { Ok(stream) => { // Process stream } Err(e) => { // Worker failed - return to director for new assignment println!("Worker failed: {}", e); continue; } } }
Running Examples from Repository
Prerequisites
- Clone repository:
git clone https://github.com/yourusername/rpcnet.git
cd rpcnet
- Generate test certificates:
mkdir certs
cd certs
openssl req -x509 -newkey rsa:4096 -nodes \
-keyout test_key.pem -out test_cert.pem \
-days 365 -subj "/CN=localhost"
cd ..
- Install dependencies:
cargo build --examples
Run Specific Example
# Cluster example
cd examples/cluster
cargo run --bin director
cargo run --bin worker
cargo run --bin client
Creating Your Own Examples
Basic Template
#![allow(unused)] fn main() { // examples/my_example/Cargo.toml [package] name = "my_example" version = "0.1.0" edition = "2021" [dependencies] rpcnet = { path = "../..", features = ["cluster"] } tokio = { version = "1", features = ["full"] } anyhow = "1" [[bin]] name = "server" path = "src/bin/server.rs" [[bin]] name = "client" path = "src/bin/client.rs" }
Example Structure
examples/my_example/
βββ Cargo.toml
βββ README.md
βββ my_service.rpc.rs # RPC trait definition
βββ src/
β βββ lib.rs
β βββ generated/ # Generated code
β β βββ my_service.rs
β βββ bin/
β βββ server.rs
β βββ client.rs
βββ tests/
βββ integration_tests.rs
Generate Code
cd examples/my_example
rpcnet-gen --input my_service.rpc.rs --output src/generated
Document Your Example
Create examples/my_example/README.md
:
# My Example
Brief description of what this example demonstrates.
## Features
- Feature 1
- Feature 2
## Running
Terminal 1:
\`\`\`bash
cargo run --bin server
\`\`\`
Terminal 2:
\`\`\`bash
cargo run --bin client
\`\`\`
## Expected Output
...
Testing Examples
Manual Testing
# Run example
cd examples/cluster
cargo run --bin director &
cargo run --bin worker &
cargo run --bin client
# Verify output
# Clean up
killall director worker
Integration Tests
# Run example's tests
cd examples/cluster
cargo test
# Run all example tests
cargo test --examples
Example Comparison
Example | Complexity | Features | Best For |
---|---|---|---|
cluster | Intermediate | Discovery, Load Balancing, Failover, Streaming | Understanding distributed systems |
Common Issues
Certificate Errors
Error: Certificate verification failed
Solution: Ensure certificates exist in certs/
:
ls certs/test_cert.pem certs/test_key.pem
Port Already in Use
Error: Address already in use (os error 48)
Solution: Kill existing processes or change port:
lsof -ti:61000 | xargs kill
# or
DIRECTOR_ADDR=127.0.0.1:61001 cargo run --bin director
Workers Not Discovered
Error: No workers available
Solution:
- Start director first (seed node)
- Wait 2-3 seconds for gossip propagation
- Check firewall allows UDP port 7946
Contributing Examples
Want to contribute an example? Great! Here's how:
- Create example directory:
examples/your_example/
- Write code: Follow structure above
- Test thoroughly: Include integration tests
- Document well: Clear README with running instructions
- Submit PR: Include example in this index
Good example ideas:
- Basic client-server RPC
- Bidirectional streaming
- Multi-region deployment
- Custom load balancing strategy
- Monitoring and metrics integration
Next Steps
- Cluster Tutorial - Build cluster from scratch
- API Reference - API documentation
- GitHub Repository - Browse all examples
Video Walkthroughs
Coming soon! Video walkthroughs demonstrating:
- Running the cluster example
- Testing failure scenarios
- Building your own example