Practical Examples
Real-world code patterns for common use cases. All examples assume basic familiarity with the Getting Started guide.
Table of Contents
- Practical Examples
Basic Patterns
Simple Event Log
use walrus_rust::{Walrus, ReadConsistency};
use std::io;
fn main() -> io::Result<()> {
let wal = Walrus::new()?;
// Append events
wal.append_for_topic("user-events", b"user:123 logged in")?;
wal.append_for_topic("user-events", b"user:123 viewed page /home")?;
wal.append_for_topic("user-events", b"user:123 clicked button")?;
// Read them back
while let Some(entry) = wal.read_next("user-events", true)? {
println!("Event: {}", String::from_utf8_lossy(&entry.data));
}
Ok(())
}
Output:
Event: user:123 logged in
Event: user:123 viewed page /home
Event: user:123 clicked button
Structured Data with Serialization
use walrus_rust::Walrus;
use serde::{Serialize, Deserialize};
use bincode;
#[derive(Serialize, Deserialize, Debug)]
struct OrderEvent {
order_id: u64,
user_id: u64,
amount: f64,
status: String,
}
fn main() -> std::io::Result<()> {
let wal = Walrus::new()?;
// Write structured event
let event = OrderEvent {
order_id: 12345,
user_id: 999,
amount: 149.99,
status: "completed".to_string(),
};
let bytes = bincode::serialize(&event).unwrap();
wal.append_for_topic("orders", &bytes)?;
// Read it back
if let Some(entry) = wal.read_next("orders", true)? {
let decoded: OrderEvent = bincode::deserialize(&entry.data).unwrap();
println!("Order: {:?}", decoded);
}
Ok(())
}
Multiple Topics
use walrus_rust::Walrus;
use std::sync::Arc;
use std::thread;
fn main() -> std::io::Result<()> {
let wal = Arc::new(Walrus::new()?);
// Writer threads for different topics
let mut handles = vec![];
for topic in ["metrics", "logs", "events"] {
let wal_clone = Arc::clone(&wal);
let topic = topic.to_string();
let handle = thread::spawn(move || {
for i in 0..1000 {
let msg = format!("{}: message {}", topic, i);
wal_clone.append_for_topic(&topic, msg.as_bytes()).unwrap();
}
});
handles.push(handle);
}
// Wait for writers
for handle in handles {
handle.join().unwrap();
}
// Read from each topic
for topic in ["metrics", "logs", "events"] {
println!("\n=== {} ===", topic);
let mut count = 0;
while let Some(_) = wal.read_next(topic, true)? {
count += 1;
}
println!("Read {} entries from {}", count, topic);
}
Ok(())
}
High-Throughput Patterns
Batched Writes
use walrus_rust::{Walrus, ReadConsistency, FsyncSchedule};
use std::io;
fn main() -> io::Result<()> {
// Configure for high throughput
let wal = Walrus::with_consistency_and_schedule(
ReadConsistency::AtLeastOnce { persist_every: 10_000 },
FsyncSchedule::Milliseconds(5_000),
)?;
// Collect entries to batch
let entries: Vec<Vec<u8>> = (0..1000)
.map(|i| format!("entry {}", i).into_bytes())
.collect();
// Convert to slice of slices
let batch: Vec<&[u8]> = entries.iter().map(|v| v.as_slice()).collect();
// Atomic batch append
wal.batch_append_for_topic("high-volume", &batch)?;
println!("Wrote {} entries atomically", batch.len());
Ok(())
}
Batched Reads
use walrus_rust::Walrus;
fn main() -> std::io::Result<()> {
let wal = Walrus::new()?;
// ... write some data ...
// Read up to 10 MB at a time
let max_bytes = 10 * 1024 * 1024;
loop {
let entries = wal.batch_read_for_topic("events", max_bytes, true)?;
if entries.is_empty() {
break; // No more data
}
println!("Processing batch of {} entries", entries.len());
for entry in entries {
process_entry(&entry.data);
}
}
Ok(())
}
fn process_entry(data: &[u8]) {
// Your processing logic here
println!(" Entry: {} bytes", data.len());
}
Concurrency Patterns
Multi-Threaded Producer
use walrus_rust::Walrus;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
fn main() -> std::io::Result<()> {
let wal = Arc::new(Walrus::new()?);
let num_producers = 8;
let messages_per_producer = 10_000;
let start = std::time::Instant::now();
let mut handles = vec![];
for producer_id in 0..num_producers {
let wal_clone = Arc::clone(&wal);
let handle = thread::spawn(move || {
for i in 0..messages_per_producer {
let msg = format!("producer-{}: msg-{}", producer_id, i);
wal_clone.append_for_topic("messages", msg.as_bytes()).unwrap();
}
});
handles.push(handle);
}
// Wait for all producers
for handle in handles {
handle.join().unwrap();
}
let elapsed = start.elapsed();
let total_msgs = num_producers * messages_per_producer;
let throughput = total_msgs as f64 / elapsed.as_secs_f64();
println!("Wrote {} messages in {:?}", total_msgs, elapsed);
println!("Throughput: {:.0} msgs/sec", throughput);
Ok(())
}
Producer-Consumer Pattern
use walrus_rust::{Walrus, ReadConsistency};
use std::sync::{Arc, atomic::{AtomicBool, Ordering}};
use std::thread;
use std::time::Duration;
fn main() -> std::io::Result<()> {
let wal = Arc::new(Walrus::with_consistency(
ReadConsistency::StrictlyAtOnce,
)?);
let running = Arc::new(AtomicBool::new(true));
// Producer thread
let producer_wal = Arc::clone(&wal);
let producer_running = Arc::clone(&running);
let producer = thread::spawn(move || {
let mut counter = 0;
while producer_running.load(Ordering::Relaxed) {
let msg = format!("job-{}", counter);
producer_wal.append_for_topic("jobs", msg.as_bytes()).unwrap();
counter += 1;
thread::sleep(Duration::from_millis(10));
}
println!("Producer: wrote {} jobs", counter);
});
// Consumer thread
let consumer_wal = Arc::clone(&wal);
let consumer_running = Arc::clone(&running);
let consumer = thread::spawn(move || {
let mut processed = 0;
while consumer_running.load(Ordering::Relaxed) {
if let Some(entry) = consumer_wal.read_next("jobs", true).unwrap() {
let job = String::from_utf8_lossy(&entry.data);
println!("Processing: {}", job);
processed += 1;
thread::sleep(Duration::from_millis(20)); // Simulate work
} else {
thread::sleep(Duration::from_millis(10)); // Wait for data
}
}
println!("Consumer: processed {} jobs", processed);
});
// Run for 5 seconds
thread::sleep(Duration::from_secs(5));
running.store(false, Ordering::Relaxed);
// Wait for threads
producer.join().unwrap();
consumer.join().unwrap();
Ok(())
}
Durability Patterns
Critical Transactions (Maximum Durability)
use walrus_rust::{Walrus, ReadConsistency, FsyncSchedule};
fn process_payment(wal: &Walrus, payment_data: &[u8]) -> std::io::Result<()> {
// Configure for zero data loss
let txn_wal = Walrus::with_consistency_and_schedule_for_key(
"transactions",
ReadConsistency::StrictlyAtOnce,
FsyncSchedule::SyncEach, // Fsync every write
)?;
// Write transaction
txn_wal.append_for_topic("payments", payment_data)?;
// At this point, data is on disk and survives crashes
println!("Payment persisted to disk");
Ok(())
}
Analytics Pipeline (High Throughput)
use walrus_rust::{Walrus, ReadConsistency, FsyncSchedule};
fn main() -> std::io::Result<()> {
// Optimize for throughput, tolerate 5s data loss
let wal = Walrus::with_consistency_and_schedule_for_key(
"analytics",
ReadConsistency::AtLeastOnce { persist_every: 100_000 },
FsyncSchedule::Milliseconds(5_000),
)?;
// High-volume ingestion
for batch in get_event_batches() {
let entries: Vec<&[u8]> = batch.iter().map(|e| e.as_slice()).collect();
wal.batch_append_for_topic("events", &entries)?;
}
Ok(())
}
fn get_event_batches() -> Vec<Vec<Vec<u8>>> {
// Your data source here
vec![]
}
Recovery Patterns
Crash Recovery with Replay
use walrus_rust::Walrus;
use std::collections::HashMap;
#[derive(Default)]
struct AppState {
user_balances: HashMap<u64, f64>,
}
impl AppState {
fn apply_transaction(&mut self, user_id: u64, amount: f64) {
*self.user_balances.entry(user_id).or_insert(0.0) += amount;
}
}
fn main() -> std::io::Result<()> {
let wal = Walrus::new()?;
let mut state = AppState::default();
// Replay WAL to rebuild state
println!("Replaying WAL...");
while let Some(entry) = wal.read_next("transactions", true)? {
let parts: Vec<&str> = std::str::from_utf8(&entry.data)
.unwrap()
.split(',')
.collect();
let user_id: u64 = parts[0].parse().unwrap();
let amount: f64 = parts[1].parse().unwrap();
state.apply_transaction(user_id, amount);
}
println!("State rebuilt from WAL:");
for (user_id, balance) in &state.user_balances {
println!(" User {}: ${:.2}", user_id, balance);
}
// Continue normal operation
state.apply_transaction(123, 50.0);
wal.append_for_topic("transactions", b"123,50.0")?;
Ok(())
}
Checkpoint-Based State Management
use walrus_rust::Walrus;
use serde::{Serialize, Deserialize};
use std::fs;
#[derive(Serialize, Deserialize, Default)]
struct Snapshot {
version: u64,
last_offset: u64,
data: Vec<String>,
}
fn main() -> std::io::Result<()> {
let wal = Walrus::new()?;
// Load latest snapshot
let mut snapshot = load_snapshot().unwrap_or_default();
// Replay from snapshot point
// (In practice, you'd track offset per-topic and resume)
println!("Resuming from version {}", snapshot.version);
// Process new entries
let mut count = 0;
while let Some(entry) = wal.read_next("data", true)? {
let item = String::from_utf8_lossy(&entry.data).to_string();
snapshot.data.push(item);
count += 1;
// Checkpoint every 1000 entries
if count % 1000 == 0 {
snapshot.version += 1;
save_snapshot(&snapshot)?;
println!("Checkpoint saved at version {}", snapshot.version);
}
}
Ok(())
}
fn load_snapshot() -> Option<Snapshot> {
let data = fs::read("snapshot.bin").ok()?;
bincode::deserialize(&data).ok()
}
fn save_snapshot(snapshot: &Snapshot) -> std::io::Result<()> {
let data = bincode::serialize(snapshot).unwrap();
fs::write("snapshot.bin.tmp", data)?;
fs::rename("snapshot.bin.tmp", "snapshot.bin")?;
Ok(())
}
Advanced Patterns
Topic-Per-User Isolation
use walrus_rust::Walrus;
use std::sync::Arc;
struct UserEventLog {
wal: Arc<Walrus>,
}
impl UserEventLog {
fn new() -> std::io::Result<Self> {
Ok(Self {
wal: Arc::new(Walrus::new()?),
})
}
fn log_event(&self, user_id: u64, event: &str) -> std::io::Result<()> {
let topic = format!("user-{}", user_id);
self.wal.append_for_topic(&topic, event.as_bytes())
}
fn get_user_events(&self, user_id: u64) -> std::io::Result<Vec<String>> {
let topic = format!("user-{}", user_id);
let mut events = vec![];
while let Some(entry) = self.wal.read_next(&topic, true)? {
events.push(String::from_utf8_lossy(&entry.data).to_string());
}
Ok(events)
}
}
fn main() -> std::io::Result<()> {
let log = UserEventLog::new()?;
// Log events for different users
log.log_event(123, "logged in")?;
log.log_event(456, "viewed dashboard")?;
log.log_event(123, "clicked button")?;
// Retrieve per-user
let user_123_events = log.get_user_events(123)?;
println!("User 123: {:?}", user_123_events);
let user_456_events = log.get_user_events(456)?;
println!("User 456: {:?}", user_456_events);
Ok(())
}
Output:
User 123: ["logged in", "clicked button"]
User 456: ["viewed dashboard"]
Namespaced Instances for Multi-Tenancy
use walrus_rust::{Walrus, ReadConsistency};
use std::collections::HashMap;
struct TenantManager {
wals: HashMap<String, Walrus>,
}
impl TenantManager {
fn new() -> Self {
Self {
wals: HashMap::new(),
}
}
fn get_or_create_wal(&mut self, tenant_id: &str) -> std::io::Result<&Walrus> {
if !self.wals.contains_key(tenant_id) {
// Each tenant gets isolated WAL instance
let wal = Walrus::with_consistency_for_key(
tenant_id,
ReadConsistency::StrictlyAtOnce,
)?;
self.wals.insert(tenant_id.to_string(), wal);
}
Ok(self.wals.get(tenant_id).unwrap())
}
fn log_event(&mut self, tenant_id: &str, topic: &str, data: &[u8])
-> std::io::Result<()>
{
let wal = self.get_or_create_wal(tenant_id)?;
wal.append_for_topic(topic, data)
}
}
fn main() -> std::io::Result<()> {
let mut manager = TenantManager::new();
// Tenants have completely isolated WALs
manager.log_event("acme-corp", "events", b"user signup")?;
manager.log_event("widgets-inc", "events", b"purchase")?;
manager.log_event("acme-corp", "metrics", b"cpu:45%")?;
// Files stored separately:
// wal_files/acme-corp/
// wal_files/widgets-inc/
Ok(())
}
Retry with Exponential Backoff
use walrus_rust::{Walrus, ReadConsistency};
use std::io::{Error, ErrorKind};
use std::time::Duration;
use std::thread;
fn append_with_retry(
wal: &Walrus,
topic: &str,
data: &[u8],
max_retries: u32,
) -> std::io::Result<()> {
let mut attempt = 0;
let mut backoff_ms = 100;
loop {
match wal.append_for_topic(topic, data) {
Ok(()) => return Ok(()),
Err(e) if e.kind() == ErrorKind::WouldBlock => {
// Batch write in progress, retry
attempt += 1;
if attempt >= max_retries {
return Err(Error::new(
ErrorKind::TimedOut,
"Max retries exceeded",
));
}
thread::sleep(Duration::from_millis(backoff_ms));
backoff_ms *= 2; // Exponential backoff
}
Err(e) => return Err(e), // Other errors, fail immediately
}
}
}
fn main() -> std::io::Result<()> {
let wal = Walrus::new()?;
// Try appending with automatic retry
append_with_retry(&wal, "events", b"important data", 5)?;
println!("Write succeeded (possibly after retries)");
Ok(())
}
Testing Patterns
Mock for Unit Tests
use walrus_rust::Walrus;
use std::collections::HashMap;
trait WalInterface {
fn append(&self, topic: &str, data: &[u8]) -> std::io::Result<()>;
fn read(&self, topic: &str) -> std::io::Result<Option<Vec<u8>>>;
}
// Real implementation
struct RealWal {
wal: Walrus,
}
impl WalInterface for RealWal {
fn append(&self, topic: &str, data: &[u8]) -> std::io::Result<()> {
self.wal.append_for_topic(topic, data)
}
fn read(&self, topic: &str) -> std::io::Result<Option<Vec<u8>>> {
Ok(self.wal.read_next(topic, true)?.map(|e| e.data))
}
}
// Mock for testing
struct MockWal {
data: HashMap<String, Vec<Vec<u8>>>,
}
impl MockWal {
fn new() -> Self {
Self { data: HashMap::new() }
}
}
impl WalInterface for MockWal {
fn append(&self, topic: &str, data: &[u8]) -> std::io::Result<()> {
// In real mock, use interior mutability (RefCell, etc.)
Ok(())
}
fn read(&self, topic: &str) -> std::io::Result<Option<Vec<u8>>> {
Ok(self.data.get(topic).and_then(|v| v.first().cloned()))
}
}
// Your business logic works with trait
fn process_events<W: WalInterface>(wal: &W) -> std::io::Result<()> {
wal.append("events", b"event1")?;
if let Some(data) = wal.read("events")? {
println!("Processed: {:?}", data);
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_with_mock() {
let mock = MockWal::new();
process_events(&mock).unwrap();
// Assert on mock state
}
}
Performance Patterns
Measure Throughput
use walrus_rust::{Walrus, ReadConsistency, FsyncSchedule};
use std::time::Instant;
fn main() -> std::io::Result<()> {
let wal = Walrus::with_consistency_and_schedule(
ReadConsistency::AtLeastOnce { persist_every: 10_000 },
FsyncSchedule::NoFsync, // Max throughput
)?;
let num_entries = 1_000_000;
let entry_size = 1024;
let data = vec![0u8; entry_size];
let start = Instant::now();
for _ in 0..num_entries {
wal.append_for_topic("benchmark", &data)?;
}
let elapsed = start.elapsed();
let ops_per_sec = num_entries as f64 / elapsed.as_secs_f64();
let mb_per_sec = (num_entries * entry_size) as f64 / elapsed.as_secs_f64() / 1_048_576.0;
println!("Throughput: {:.0} ops/sec", ops_per_sec);
println!("Bandwidth: {:.2} MB/sec", mb_per_sec);
println!("Latency: {:.2} μs/op", elapsed.as_micros() as f64 / num_entries as f64);
Ok(())
}
Error Handling Patterns
Graceful Degradation
use walrus_rust::Walrus;
use std::io::{Error, ErrorKind};
struct ResilientLogger {
wal: Option<Walrus>,
}
impl ResilientLogger {
fn new() -> Self {
let wal = match Walrus::new() {
Ok(w) => Some(w),
Err(e) => {
eprintln!("WARNING: WAL initialization failed: {}", e);
None
}
};
Self { wal }
}
fn log(&self, topic: &str, message: &str) {
if let Some(ref wal) = self.wal {
if let Err(e) = wal.append_for_topic(topic, message.as_bytes()) {
eprintln!("WARNING: Failed to log to WAL: {}", e);
// Fall back to stderr
eprintln!("[{}] {}", topic, message);
}
} else {
// WAL unavailable, log to stderr
eprintln!("[{}] {}", topic, message);
}
}
}
fn main() {
let logger = ResilientLogger::new();
logger.log("events", "Application started");
logger.log("events", "Processing request");
// Even if WAL fails, application continues
println!("Application running normally");
}
More Examples
For additional examples and patterns:
- Test suite:
tests/directory in the repo contains 4000+ lines of real-world scenarios - Benchmarks:
benches/shows high-performance usage patterns - GitHub issues: Search for “example” tag for community contributions
Contribute your own: If you build something cool with Walrus, share it! We accept PRs for this examples page.