Streaming Walkthrough
This end-to-end example builds a telemetry service that exercises every streaming mode RpcNet offers: bidirectional chat, server streaming updates, and client streaming uploads. Follow along to scaffold the project, implement the handlers, and drive the flows from a client binary.
Step 0: Prerequisites
- Rust 1.75+ (
rustup show
to confirm) cargo
on yourPATH
- macOS or Linux (TLS support is bundled via
s2n-quic
)
Step 1: Create the project layout
cargo new telemetry-streams --bin
cd telemetry-streams
mkdir -p certs src/bin
rm src/main.rs # we'll rely on explicit binaries instead of the default main
The example uses two binaries: src/bin/server.rs
and src/bin/client.rs
.
Step 2: Declare dependencies
Edit Cargo.toml
to pull in RpcNet and helper crates:
[package]
name = "telemetry-streams"
version = "0.1.0"
edition = "2021"
[dependencies]
rpcnet = "0.2"
serde = { version = "1", features = ["derive"] }
bincode = "1.3"
async-stream = "0.3"
futures = "0.3"
tokio = { version = "1", features = ["rt-multi-thread", "macros", "time"] }
rpcnet
provides the client/server runtime.async-stream
andfutures
help produce response streams on the server.serde
/bincode
handle payload serialization.- Tokio is required because RpcNet is async-first.
Step 3: Generate development certificates
RpcNet requires TLS material for QUIC. Create a self-signed pair for local experiments:
openssl req -x509 -newkey rsa:4096 \
-keyout certs/server-key.pem \
-out certs/server-cert.pem \
-days 365 -nodes \
-subj "/CN=localhost"
The client reuses the public certificate file to trust the server.
Step 4: Define shared data types
Expose a library module that both binaries can import. Create src/lib.rs
:
#![allow(unused)] fn main() { // src/lib.rs pub mod telemetry; }
Now add the telemetry definitions in src/telemetry.rs
:
#![allow(unused)] fn main() { // src/telemetry.rs use rpcnet::RpcError; use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, Debug, Clone)] pub struct MetricReading { pub sensor: String, pub value: f64, } #[derive(Serialize, Deserialize, Debug, Clone)] pub struct LiveUpdate { pub sensor: String, pub rolling_avg: f64, } #[derive(Serialize, Deserialize, Debug, Clone)] pub struct ChatMessage { pub from: String, pub body: String, } #[derive(Serialize, Deserialize, Debug, Clone)] pub struct Ack { pub accepted: usize, } pub fn encode<T: Serialize>(value: &T) -> Result<Vec<u8>, RpcError> { Ok(bincode::serialize(value)?) } pub fn decode<T: for<'de> Deserialize<'de>>(bytes: &[u8]) -> Result<T, RpcError> { Ok(bincode::deserialize(bytes)?) } }
These helpers convert structures to and from the Vec<u8>
payloads that
RpcNet transports.
Step 5: Implement the streaming server
Create src/bin/server.rs
with three handlers—one per streaming pattern:
// src/bin/server.rs use async_stream::stream; use futures::StreamExt; use rpcnet::{RpcConfig, RpcServer}; use telemetry_streams::telemetry::{self, Ack, ChatMessage, LiveUpdate, MetricReading}; use tokio::time::{sleep, Duration}; #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let config = RpcConfig::new("certs/server-cert.pem", "127.0.0.1:9000") .with_key_path("certs/server-key.pem") .with_server_name("localhost"); let mut server = RpcServer::new(config); // Bidirectional chat: echo each message with a server tag. server .register_streaming("chat", |mut inbound| async move { stream! { while let Some(frame) = inbound.next().await { let msg: ChatMessage = telemetry::decode(&frame)?; let reply = ChatMessage { from: "server".into(), body: format!("ack: {}", msg.body), }; yield telemetry::encode(&reply); } } }) .await; // Server streaming: emit rolling averages for a requested sensor. server .register_streaming("subscribe_metrics", |mut inbound| async move { stream! { if let Some(frame) = inbound.next().await { let req: MetricReading = telemetry::decode(&frame)?; let mut window = vec![req.value]; for step in 1..=5 { sleep(Duration::from_millis(500)).await; window.push(req.value + step as f64); let avg = window.iter().copied().sum::<f64>() / window.len() as f64; let update = LiveUpdate { sensor: req.sensor.clone(), rolling_avg: avg }; yield telemetry::encode(&update); } } } }) .await; // Client streaming: collect readings and acknowledge how many we processed. server .register_streaming("upload_batch", |mut inbound| async move { stream! { let mut readings: Vec<MetricReading> = Vec::new(); while let Some(frame) = inbound.next().await { let reading: MetricReading = telemetry::decode(&frame)?; readings.push(reading); } let ack = Ack { accepted: readings.len() }; yield telemetry::encode(&ack); } }) .await; let quic_server = server.bind()?; println!("Telemetry server listening on 127.0.0.1:9000"); server.start(quic_server).await?; Ok(()) }
Key points:
register_streaming
receives a stream of request frames (Vec<u8>
) and must return a stream ofResult<Vec<u8>, RpcError>
responses.- The bidirectional handler echoes every inbound payload.
- The server-streaming handler reads a single subscription request and then pushes periodic updates without further client input.
- The client-streaming handler drains all incoming frames before returning one acknowledgement.
Step 6: Implement the client
Create src/bin/client.rs
to exercise each streaming helper:
// src/bin/client.rs use futures::{stream, StreamExt}; use rpcnet::{RpcClient, RpcConfig, RpcError}; use telemetry_streams::telemetry::{self, Ack, ChatMessage, LiveUpdate, MetricReading}; #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let config = RpcConfig::new("certs/server-cert.pem", "127.0.0.1:0") .with_server_name("localhost"); let client = RpcClient::connect("127.0.0.1:9000".parse()?, config).await?; chat_demo(&client).await?; server_stream_demo(&client).await?; client_stream_demo(&client).await?; Ok(()) } async fn chat_demo(client: &RpcClient) -> Result<(), RpcError> { println!("\n--- Bidirectional chat ---"); let messages = vec![ ChatMessage { from: "operator".into(), body: "ping".into() }, ChatMessage { from: "operator".into(), body: "status?".into() }, ]; let outbound_frames: Vec<Vec<u8>> = messages .into_iter() .map(|msg| telemetry::encode(&msg).expect("serialize chat message")) .collect(); let outbound = stream::iter(outbound_frames); let mut inbound = client.call_streaming("chat", outbound).await?; while let Some(frame) = inbound.next().await { let bytes = frame?; let reply: ChatMessage = telemetry::decode(&bytes)?; println!("reply: {}", reply.body); } Ok(()) } async fn server_stream_demo(client: &RpcClient) -> Result<(), RpcError> { println!("\n--- Server streaming ---"); let request = telemetry::encode(&MetricReading { sensor: "temp".into(), value: 21.0 })?; let mut updates = client .call_server_streaming("subscribe_metrics", request) .await?; while let Some(frame) = updates.next().await { let bytes = frame?; let update: LiveUpdate = telemetry::decode(&bytes)?; println!("rolling avg: {:.2}", update.rolling_avg); } Ok(()) } async fn client_stream_demo(client: &RpcClient) -> Result<(), RpcError> { println!("\n--- Client streaming ---"); let readings: Vec<Vec<u8>> = vec![ MetricReading { sensor: "temp".into(), value: 21.0 }, MetricReading { sensor: "temp".into(), value: 21.5 }, MetricReading { sensor: "temp".into(), value: 22.0 }, ] .into_iter() .map(|reading| telemetry::encode(&reading).expect("serialize reading")) .collect(); let outbound = stream::iter(readings); let ack_frame = client .call_client_streaming("upload_batch", outbound) .await?; let ack: Ack = telemetry::decode(&ack_frame)?; println!("server accepted {} readings", ack.accepted); Ok(()) }
The client demonstrates:
call_streaming
for true bidirectional messaging.call_server_streaming
when only the server produces a stream of frames.call_client_streaming
to upload many frames and receive one response.
Step 7: Run the scenario
Terminal 1 – start the server:
cargo run --bin server
Terminal 2 – launch the client:
cargo run --bin client
Expected output (trimmed for brevity):
--- Bidirectional chat ---
reply: ack: ping
reply: ack: status?
--- Server streaming ---
rolling avg: 21.00
rolling avg: 21.50
...
--- Client streaming ---
server accepted 3 readings
Where to go next
- Revisit the Concepts chapter for API reference material.
- Combine streaming RPCs with code-generated unary services from the Getting Started tutorial.
- Layer authentication, backpressure, or persistence around these handlers to match your production needs.