Production Deployment
This guide covers best practices for deploying RpcNet clusters in production environments, including security, monitoring, high availability, and operational procedures.
Architecture Patterns
1. Basic Production Setup
Minimum viable production deployment:
Load Balancer (L4)
|
┌────────────┼────────────┐
│ │ │
┌────▼───┐ ┌────▼───┐ ┌────▼───┐
│Director│ │Director│ │Director│ (3+ for HA)
│ (HA) │ │ (HA) │ │ (HA) │
└────┬───┘ └────┬───┘ └────┬───┘
│ │ │
┌───────┴────────────┴────────────┴───────┐
│ │
┌───▼────┐ ┌────────┐ ┌────────┐ ┌────────▼┐
│Worker 1│ │Worker 2│ │Worker 3│ │Worker N │
└────────┘ └────────┘ └────────┘ └─────────┘
Components:
- Load Balancer: Routes clients to healthy directors
- Directors (3+): Coordinator nodes in HA configuration
- Workers (N): Processing nodes, scale horizontally
2. Multi-Region Setup
For global deployments:
Region US-EAST Region EU-WEST
┌──────────────────────────┐ ┌──────────────────────────┐
│ Director Cluster (3) │ │ Director Cluster (3) │
│ Worker Pool (10+) │ │ Worker Pool (10+) │
└──────────┬───────────────┘ └───────────┬──────────────┘
│ │
└───────────┬───────────────────┘
│
Cross-region
Gossip Protocol
(optional coordination)
Benefits:
- Lower latency for regional clients
- Fault isolation (region failure doesn't affect others)
- Regulatory compliance (data locality)
3. Hybrid Edge Deployment
For edge computing scenarios:
Cloud (Central)
┌─────────────────────┐
│ Director Cluster │
│ Worker Pool │
└──────────┬──────────┘
│
┌──────────┼──────────┐
│ │ │
┌────▼───┐ ┌───▼────┐ ┌───▼────┐
│ Edge 1 │ │ Edge 2 │ │ Edge 3 │
│Workers │ │Workers │ │Workers │
└────────┘ └────────┘ └────────┘
Use cases:
- IoT workloads
- Low-latency requirements
- Bandwidth optimization
Security
TLS Configuration
Production Certificates
#![allow(unused)] fn main() { // ❌ Bad: Self-signed certificates let cert = std::fs::read("self_signed.pem")?; // ✅ Good: Proper CA-signed certificates let cert = std::fs::read("/etc/rpcnet/certs/server.crt")?; let key = std::fs::read("/etc/rpcnet/certs/server.key")?; let ca = std::fs::read("/etc/rpcnet/certs/ca.crt")?; let config = ServerConfig::builder() .with_cert_and_key(cert, key)? .with_ca_cert(ca)? // Verify clients .build(); }
Certificate Rotation
#![allow(unused)] fn main() { use tokio::time::{interval, Duration}; async fn rotate_certificates(server: Arc<Server>) { let mut check_interval = interval(Duration::from_secs(3600)); // Check hourly loop { check_interval.tick().await; // Check certificate expiry if certificate_expires_soon("/etc/rpcnet/certs/server.crt", 30).await? { log::warn!("Certificate expiring soon, rotating..."); // Load new certificate let new_cert = std::fs::read("/etc/rpcnet/certs/server.crt.new")?; let new_key = std::fs::read("/etc/rpcnet/certs/server.key.new")?; // Hot-reload without downtime server.reload_certificate(new_cert, new_key).await?; log::info!("Certificate rotated successfully"); } } } }
Authentication & Authorization
#![allow(unused)] fn main() { #[rpc_trait] pub trait SecureService { async fn process(&self, auth_token: String, data: Vec<u8>) -> Result<Response>; } #[rpc_impl] impl SecureService for Handler { async fn process(&self, auth_token: String, data: Vec<u8>) -> Result<Response> { // Verify token let claims = verify_jwt(&auth_token)?; // Check permissions if !claims.has_permission("compute:execute") { return Err(anyhow::anyhow!("Insufficient permissions")); } // Process request Ok(self.do_process(data).await?) } } }
Network Segmentation
┌─────────────────────────────────────────────────────┐
│ Public Network │
│ (Clients, Load Balancer) │
└────────────────────┬────────────────────────────────┘
│ Firewall
┌────────────────────▼────────────────────────────────┐
│ Management Network │
│ (Directors, Monitoring, Logging) │
└────────────────────┬────────────────────────────────┘
│ Firewall
┌────────────────────▼────────────────────────────────┐
│ Worker Network │
│ (Workers, Internal Communication) │
└─────────────────────────────────────────────────────┘
Firewall Rules:
# Public → Management: Only load balancer ports
iptables -A FORWARD -i public -o management -p tcp --dport 8080 -j ACCEPT
# Management → Workers: Full access
iptables -A FORWARD -i management -o workers -j ACCEPT
# Workers → Workers: Gossip protocol
iptables -A FORWARD -i workers -o workers -p udp --dport 7946 -j ACCEPT
Monitoring
Essential Metrics
#![allow(unused)] fn main() { use prometheus::{register_gauge, register_counter, register_histogram}; // Throughput let request_counter = register_counter!("rpc_requests_total", "Total RPC requests"); request_counter.inc(); // Latency let latency_histogram = register_histogram!( "rpc_latency_seconds", "RPC latency distribution", vec![0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0] ); latency_histogram.observe(duration.as_secs_f64()); // Health let healthy_workers = register_gauge!("cluster_healthy_workers", "Number of healthy workers"); healthy_workers.set(registry.healthy_count().await as f64); // Errors let error_counter = register_counter!("rpc_errors_total", "Total RPC errors", &["type"]); error_counter.with_label_values(&["timeout"]).inc(); }
Prometheus Integration
#![allow(unused)] fn main() { use prometheus::{Encoder, TextEncoder}; use warp::Filter; async fn start_metrics_server() { let metrics_route = warp::path!("metrics").map(|| { let encoder = TextEncoder::new(); let metric_families = prometheus::gather(); let mut buffer = vec![]; encoder.encode(&metric_families, &mut buffer).unwrap(); warp::reply::with_header( buffer, "Content-Type", "text/plain; charset=utf-8", ) }); warp::serve(metrics_route) .run(([0, 0, 0, 0], 9090)) .await; } }
Prometheus config (prometheus.yml
):
scrape_configs:
- job_name: 'rpcnet_directors'
static_configs:
- targets: ['director-1:9090', 'director-2:9090', 'director-3:9090']
- job_name: 'rpcnet_workers'
static_configs:
- targets: ['worker-1:9090', 'worker-2:9090', 'worker-3:9090']
Grafana Dashboards
Key panels:
- Throughput:
rate(rpc_requests_total[1m])
- Latency P99:
histogram_quantile(0.99, rpc_latency_seconds)
- Error Rate:
rate(rpc_errors_total[1m])
- Worker Health:
cluster_healthy_workers
Alerting
# alerts.yml
groups:
- name: rpcnet
interval: 30s
rules:
- alert: HighErrorRate
expr: rate(rpc_errors_total[5m]) > 0.05
for: 2m
annotations:
summary: "High RPC error rate detected"
- alert: LowWorkerCount
expr: cluster_healthy_workers < 3
for: 1m
annotations:
summary: "Less than 3 healthy workers available"
- alert: HighLatency
expr: histogram_quantile(0.99, rpc_latency_seconds) > 0.1
for: 5m
annotations:
summary: "P99 latency above 100ms"
Logging
Structured Logging
#![allow(unused)] fn main() { use tracing::{info, warn, error, instrument}; #[instrument(skip(data))] async fn process_request(request_id: Uuid, worker_id: Uuid, data: Vec<u8>) -> Result<Response> { info!( request_id = %request_id, worker_id = %worker_id, data_size = data.len(), "Processing request" ); match worker.call("compute", data).await { Ok(response) => { info!( request_id = %request_id, worker_id = %worker_id, response_size = response.len(), "Request completed" ); Ok(response) } Err(e) => { error!( request_id = %request_id, worker_id = %worker_id, error = %e, "Request failed" ); Err(e) } } } }
Log Aggregation
Fluentd config (fluent.conf
):
<source>
@type forward
port 24224
</source>
<match rpcnet.**>
@type elasticsearch
host elasticsearch.example.com
port 9200
index_name rpcnet
type_name logs
</match>
High Availability
Director HA Setup
#![allow(unused)] fn main() { // Each director is identical, configured via environment let director_id = Uuid::new_v4(); let cluster_config = ClusterConfig::default() .with_bind_addr(env::var("BIND_ADDR")?.parse()?) .with_seeds(parse_seeds(&env::var("SEED_NODES")?)?); let cluster = server.enable_cluster(cluster_config).await?; // Tag as director cluster.set_tag("role", "director"); cluster.set_tag("id", &director_id.to_string()); // All directors operate identically, clients can use any one }
Graceful Shutdown
#![allow(unused)] fn main() { use tokio::signal; async fn run_server(mut server: Server) -> Result<()> { // Spawn server task let server_handle = tokio::spawn(async move { server.run().await }); // Wait for shutdown signal signal::ctrl_c().await?; log::info!("Shutdown signal received, gracefully shutting down..."); // 1. Stop accepting new connections server.stop_accepting().await; // 2. Wait for in-flight requests (with timeout) tokio::time::timeout( Duration::from_secs(30), server.wait_for_in_flight() ).await?; // 3. Leave cluster gracefully cluster.leave().await?; // 4. Close connections server.shutdown().await?; log::info!("Shutdown complete"); Ok(()) } }
Health Checks
#![allow(unused)] fn main() { #[rpc_trait] pub trait HealthService { async fn health(&self) -> Result<HealthStatus>; async fn ready(&self) -> Result<ReadyStatus>; } #[derive(Serialize, Deserialize)] pub struct HealthStatus { pub healthy: bool, pub version: String, pub uptime_secs: u64, } #[derive(Serialize, Deserialize)] pub struct ReadyStatus { pub ready: bool, pub workers_available: usize, pub cluster_size: usize, } #[rpc_impl] impl HealthService for Handler { async fn health(&self) -> Result<HealthStatus> { Ok(HealthStatus { healthy: true, version: env!("CARGO_PKG_VERSION").to_string(), uptime_secs: self.start_time.elapsed().as_secs(), }) } async fn ready(&self) -> Result<ReadyStatus> { let workers = self.registry.worker_count().await; let cluster_size = self.cluster.node_count().await; Ok(ReadyStatus { ready: workers > 0, workers_available: workers, cluster_size, }) } } }
Kubernetes probes:
livenessProbe:
exec:
command:
- /usr/local/bin/health-check
- --endpoint=health
initialDelaySeconds: 10
periodSeconds: 10
readinessProbe:
exec:
command:
- /usr/local/bin/health-check
- --endpoint=ready
initialDelaySeconds: 5
periodSeconds: 5
Deployment
Docker
Dockerfile:
FROM rust:1.75 as builder
WORKDIR /app
COPY Cargo.toml Cargo.lock ./
COPY src ./src
RUN cargo build --release
FROM debian:bookworm-slim
RUN apt-get update && apt-get install -y \
ca-certificates \
&& rm -rf /var/lib/apt/lists/*
COPY --from=builder /app/target/release/director /usr/local/bin/
COPY --from=builder /app/target/release/worker /usr/local/bin/
# Expose ports
EXPOSE 8080 7946/udp
CMD ["director"]
Docker Compose (docker-compose.yml
):
version: '3.8'
services:
director-1:
image: rpcnet:latest
command: director
environment:
- DIRECTOR_ADDR=0.0.0.0:8080
- RUST_LOG=info
ports:
- "8080:8080"
- "7946:7946/udp"
worker-1:
image: rpcnet:latest
command: worker
environment:
- WORKER_LABEL=worker-1
- WORKER_ADDR=0.0.0.0:8081
- DIRECTOR_ADDR=director-1:8080
- RUST_LOG=info
depends_on:
- director-1
Kubernetes
Deployment (director-deployment.yaml
):
apiVersion: apps/v1
kind: Deployment
metadata:
name: rpcnet-director
spec:
replicas: 3
selector:
matchLabels:
app: rpcnet-director
template:
metadata:
labels:
app: rpcnet-director
spec:
containers:
- name: director
image: rpcnet:latest
command: ["director"]
env:
- name: DIRECTOR_ADDR
value: "0.0.0.0:8080"
- name: RUST_LOG
value: "info"
ports:
- containerPort: 8080
name: rpc
- containerPort: 7946
name: gossip
protocol: UDP
resources:
requests:
memory: "256Mi"
cpu: "500m"
limits:
memory: "512Mi"
cpu: "1000m"
Service (director-service.yaml
):
apiVersion: v1
kind: Service
metadata:
name: rpcnet-director
spec:
type: LoadBalancer
selector:
app: rpcnet-director
ports:
- name: rpc
port: 8080
targetPort: 8080
- name: gossip
port: 7946
targetPort: 7946
protocol: UDP
HorizontalPodAutoscaler:
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: rpcnet-worker-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: rpcnet-worker
minReplicas: 3
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
Configuration Management
Environment-Based Config
#![allow(unused)] fn main() { use config::{Config, Environment, File}; #[derive(Debug, Deserialize)] struct Settings { server: ServerSettings, cluster: ClusterSettings, monitoring: MonitoringSettings, } #[derive(Debug, Deserialize)] struct ServerSettings { bind_addr: String, cert_path: String, key_path: String, } fn load_config() -> Result<Settings> { let settings = Config::builder() // Default config .add_source(File::with_name("config/default")) // Environment-specific config (optional) .add_source(File::with_name(&format!("config/{}", env!("ENV"))).required(false)) // Environment variables (override) .add_source(Environment::with_prefix("RPCNET")) .build()?; settings.try_deserialize() } }
Secret Management
#![allow(unused)] fn main() { use aws_sdk_secretsmanager::Client as SecretsClient; async fn load_tls_certs_from_secrets() -> Result<(Vec<u8>, Vec<u8>)> { let config = aws_config::load_from_env().await; let client = SecretsClient::new(&config); // Load certificate let cert_secret = client .get_secret_value() .secret_id("rpcnet/production/tls_cert") .send() .await?; let cert = cert_secret.secret_binary().unwrap().as_ref().to_vec(); // Load key let key_secret = client .get_secret_value() .secret_id("rpcnet/production/tls_key") .send() .await?; let key = key_secret.secret_binary().unwrap().as_ref().to_vec(); Ok((cert, key)) } }
Operational Procedures
Rolling Updates
#!/bin/bash
# Rolling update script for workers
WORKERS=("worker-1" "worker-2" "worker-3" "worker-4")
for worker in "${WORKERS[@]}"; do
echo "Updating $worker..."
# Gracefully shutdown worker
kubectl exec $worker -- kill -SIGTERM 1
# Wait for worker to leave cluster
sleep 10
# Update image
kubectl set image deployment/rpcnet-worker worker=rpcnet:new-version
# Wait for new pod to be ready
kubectl wait --for=condition=ready pod -l app=$worker --timeout=60s
# Verify worker joined cluster
kubectl exec director-1 -- check-worker-registered $worker
echo "$worker updated successfully"
done
Backup and Restore
#![allow(unused)] fn main() { // Backup cluster state (metadata only, not data) async fn backup_cluster_state(cluster: Arc<ClusterMembership>) -> Result<()> { let state = ClusterState { nodes: cluster.nodes().await, timestamp: SystemTime::now(), }; let backup = serde_json::to_vec(&state)?; std::fs::write("/backup/cluster_state.json", backup)?; Ok(()) } // Restore from backup (for disaster recovery) async fn restore_cluster_state(path: &str) -> Result<ClusterState> { let backup = std::fs::read(path)?; let state: ClusterState = serde_json::from_slice(&backup)?; Ok(state) } }
Runbooks
Worker Node Failure:
- Verify failure:
kubectl get pods | grep worker
- Check logs:
kubectl logs <worker-pod>
- If recoverable:
kubectl delete pod <worker-pod>
(auto-restarts) - If not: Investigate root cause, fix, redeploy
- Verify cluster health:
kubectl exec director-1 -- cluster-health
High Latency:
- Check Grafana: Identify which nodes have high latency
- SSH to affected nodes:
ssh worker-5
- Check CPU/memory:
top
,free -h
- Check network:
netstat -s
,iftop
- Review logs:
journalctl -u rpcnet-worker -n 1000
- If needed: Scale up workers or restart affected nodes
Cost Optimization
Resource Sizing
#![allow(unused)] fn main() { // Right-size based on actual usage async fn recommend_sizing(metrics: &Metrics) -> Recommendation { let avg_cpu = metrics.avg_cpu_usage(); let avg_memory = metrics.avg_memory_usage(); let p99_cpu = metrics.p99_cpu_usage(); if avg_cpu < 30.0 && p99_cpu < 60.0 { Recommendation::DownsizeWorkers } else if p99_cpu > 80.0 { Recommendation::UpsizeWorkers } else { Recommendation::CurrentSizingOptimal } } }
Auto-Scaling
# Scale workers based on request rate
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: rpcnet-worker-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: rpcnet-worker
minReplicas: 2
maxReplicas: 20
metrics:
- type: Pods
pods:
metric:
name: rpc_requests_per_second
target:
type: AverageValue
averageValue: "5000" # Scale when > 5K RPS per worker
Checklist
Pre-Deployment
- TLS certificates from trusted CA
- Secrets stored in secret manager (not env vars)
- Monitoring and alerting configured
- Log aggregation set up
- Health checks implemented
- Graceful shutdown handling
- Resource limits configured
- Auto-scaling rules defined
- Backup procedures tested
- Runbooks documented
Post-Deployment
- Verify all nodes healthy
- Check metrics dashboards
- Test failover scenarios
- Validate performance (latency, throughput)
- Review logs for errors
- Test rolling updates
- Verify backups working
- Update documentation
Next Steps
- Performance Tuning - Optimize for production load
- Failure Handling - Handle production incidents
- Migration Guide - Migrate existing systems
References
- Kubernetes Best Practices - K8s configuration
- Prometheus Monitoring - Metrics best practices
- AWS Well-Architected - Cloud architecture patterns