Practical Examples

Real-world code patterns for common use cases. All examples assume basic familiarity with the Getting Started guide.

Table of Contents

  1. Practical Examples
    1. Basic Patterns
      1. Simple Event Log
      2. Structured Data with Serialization
      3. Multiple Topics
    2. High-Throughput Patterns
      1. Batched Writes
      2. Batched Reads
    3. Concurrency Patterns
      1. Multi-Threaded Producer
      2. Producer-Consumer Pattern
    4. Durability Patterns
      1. Critical Transactions (Maximum Durability)
      2. Analytics Pipeline (High Throughput)
    5. Recovery Patterns
      1. Crash Recovery with Replay
      2. Checkpoint-Based State Management
    6. Advanced Patterns
      1. Topic-Per-User Isolation
      2. Namespaced Instances for Multi-Tenancy
      3. Retry with Exponential Backoff
    7. Testing Patterns
      1. Mock for Unit Tests
    8. Performance Patterns
      1. Measure Throughput
    9. Error Handling Patterns
      1. Graceful Degradation
    10. More Examples

Basic Patterns

Simple Event Log

use walrus_rust::{Walrus, ReadConsistency};
use std::io;

fn main() -> io::Result<()> {
    let wal = Walrus::new()?;

    // Append events
    wal.append_for_topic("user-events", b"user:123 logged in")?;
    wal.append_for_topic("user-events", b"user:123 viewed page /home")?;
    wal.append_for_topic("user-events", b"user:123 clicked button")?;

    // Read them back
    while let Some(entry) = wal.read_next("user-events", true)? {
        println!("Event: {}", String::from_utf8_lossy(&entry.data));
    }

    Ok(())
}

Output:

Event: user:123 logged in
Event: user:123 viewed page /home
Event: user:123 clicked button

Structured Data with Serialization

use walrus_rust::Walrus;
use serde::{Serialize, Deserialize};
use bincode;

#[derive(Serialize, Deserialize, Debug)]
struct OrderEvent {
    order_id: u64,
    user_id: u64,
    amount: f64,
    status: String,
}

fn main() -> std::io::Result<()> {
    let wal = Walrus::new()?;

    // Write structured event
    let event = OrderEvent {
        order_id: 12345,
        user_id: 999,
        amount: 149.99,
        status: "completed".to_string(),
    };

    let bytes = bincode::serialize(&event).unwrap();
    wal.append_for_topic("orders", &bytes)?;

    // Read it back
    if let Some(entry) = wal.read_next("orders", true)? {
        let decoded: OrderEvent = bincode::deserialize(&entry.data).unwrap();
        println!("Order: {:?}", decoded);
    }

    Ok(())
}

Multiple Topics

use walrus_rust::Walrus;
use std::sync::Arc;
use std::thread;

fn main() -> std::io::Result<()> {
    let wal = Arc::new(Walrus::new()?);

    // Writer threads for different topics
    let mut handles = vec![];

    for topic in ["metrics", "logs", "events"] {
        let wal_clone = Arc::clone(&wal);
        let topic = topic.to_string();

        let handle = thread::spawn(move || {
            for i in 0..1000 {
                let msg = format!("{}: message {}", topic, i);
                wal_clone.append_for_topic(&topic, msg.as_bytes()).unwrap();
            }
        });

        handles.push(handle);
    }

    // Wait for writers
    for handle in handles {
        handle.join().unwrap();
    }

    // Read from each topic
    for topic in ["metrics", "logs", "events"] {
        println!("\n=== {} ===", topic);
        let mut count = 0;
        while let Some(_) = wal.read_next(topic, true)? {
            count += 1;
        }
        println!("Read {} entries from {}", count, topic);
    }

    Ok(())
}

High-Throughput Patterns

Batched Writes

use walrus_rust::{Walrus, ReadConsistency, FsyncSchedule};
use std::io;

fn main() -> io::Result<()> {
    // Configure for high throughput
    let wal = Walrus::with_consistency_and_schedule(
        ReadConsistency::AtLeastOnce { persist_every: 10_000 },
        FsyncSchedule::Milliseconds(5_000),
    )?;

    // Collect entries to batch
    let entries: Vec<Vec<u8>> = (0..1000)
        .map(|i| format!("entry {}", i).into_bytes())
        .collect();

    // Convert to slice of slices
    let batch: Vec<&[u8]> = entries.iter().map(|v| v.as_slice()).collect();

    // Atomic batch append
    wal.batch_append_for_topic("high-volume", &batch)?;

    println!("Wrote {} entries atomically", batch.len());

    Ok(())
}

Batched Reads

use walrus_rust::Walrus;

fn main() -> std::io::Result<()> {
    let wal = Walrus::new()?;

    // ... write some data ...

    // Read up to 10 MB at a time
    let max_bytes = 10 * 1024 * 1024;

    loop {
        let entries = wal.batch_read_for_topic("events", max_bytes, true)?;

        if entries.is_empty() {
            break;  // No more data
        }

        println!("Processing batch of {} entries", entries.len());

        for entry in entries {
            process_entry(&entry.data);
        }
    }

    Ok(())
}

fn process_entry(data: &[u8]) {
    // Your processing logic here
    println!("  Entry: {} bytes", data.len());
}

Concurrency Patterns

Multi-Threaded Producer

use walrus_rust::Walrus;
use std::sync::Arc;
use std::thread;
use std::time::Duration;

fn main() -> std::io::Result<()> {
    let wal = Arc::new(Walrus::new()?);
    let num_producers = 8;
    let messages_per_producer = 10_000;

    let start = std::time::Instant::now();
    let mut handles = vec![];

    for producer_id in 0..num_producers {
        let wal_clone = Arc::clone(&wal);

        let handle = thread::spawn(move || {
            for i in 0..messages_per_producer {
                let msg = format!("producer-{}: msg-{}", producer_id, i);
                wal_clone.append_for_topic("messages", msg.as_bytes()).unwrap();
            }
        });

        handles.push(handle);
    }

    // Wait for all producers
    for handle in handles {
        handle.join().unwrap();
    }

    let elapsed = start.elapsed();
    let total_msgs = num_producers * messages_per_producer;
    let throughput = total_msgs as f64 / elapsed.as_secs_f64();

    println!("Wrote {} messages in {:?}", total_msgs, elapsed);
    println!("Throughput: {:.0} msgs/sec", throughput);

    Ok(())
}

Producer-Consumer Pattern

use walrus_rust::{Walrus, ReadConsistency};
use std::sync::{Arc, atomic::{AtomicBool, Ordering}};
use std::thread;
use std::time::Duration;

fn main() -> std::io::Result<()> {
    let wal = Arc::new(Walrus::with_consistency(
        ReadConsistency::StrictlyAtOnce,
    )?);

    let running = Arc::new(AtomicBool::new(true));

    // Producer thread
    let producer_wal = Arc::clone(&wal);
    let producer_running = Arc::clone(&running);
    let producer = thread::spawn(move || {
        let mut counter = 0;
        while producer_running.load(Ordering::Relaxed) {
            let msg = format!("job-{}", counter);
            producer_wal.append_for_topic("jobs", msg.as_bytes()).unwrap();
            counter += 1;
            thread::sleep(Duration::from_millis(10));
        }
        println!("Producer: wrote {} jobs", counter);
    });

    // Consumer thread
    let consumer_wal = Arc::clone(&wal);
    let consumer_running = Arc::clone(&running);
    let consumer = thread::spawn(move || {
        let mut processed = 0;
        while consumer_running.load(Ordering::Relaxed) {
            if let Some(entry) = consumer_wal.read_next("jobs", true).unwrap() {
                let job = String::from_utf8_lossy(&entry.data);
                println!("Processing: {}", job);
                processed += 1;
                thread::sleep(Duration::from_millis(20));  // Simulate work
            } else {
                thread::sleep(Duration::from_millis(10));  // Wait for data
            }
        }
        println!("Consumer: processed {} jobs", processed);
    });

    // Run for 5 seconds
    thread::sleep(Duration::from_secs(5));
    running.store(false, Ordering::Relaxed);

    // Wait for threads
    producer.join().unwrap();
    consumer.join().unwrap();

    Ok(())
}

Durability Patterns

Critical Transactions (Maximum Durability)

use walrus_rust::{Walrus, ReadConsistency, FsyncSchedule};

fn process_payment(wal: &Walrus, payment_data: &[u8]) -> std::io::Result<()> {
    // Configure for zero data loss
    let txn_wal = Walrus::with_consistency_and_schedule_for_key(
        "transactions",
        ReadConsistency::StrictlyAtOnce,
        FsyncSchedule::SyncEach,  // Fsync every write
    )?;

    // Write transaction
    txn_wal.append_for_topic("payments", payment_data)?;

    // At this point, data is on disk and survives crashes
    println!("Payment persisted to disk");

    Ok(())
}

Analytics Pipeline (High Throughput)

use walrus_rust::{Walrus, ReadConsistency, FsyncSchedule};

fn main() -> std::io::Result<()> {
    // Optimize for throughput, tolerate 5s data loss
    let wal = Walrus::with_consistency_and_schedule_for_key(
        "analytics",
        ReadConsistency::AtLeastOnce { persist_every: 100_000 },
        FsyncSchedule::Milliseconds(5_000),
    )?;

    // High-volume ingestion
    for batch in get_event_batches() {
        let entries: Vec<&[u8]> = batch.iter().map(|e| e.as_slice()).collect();
        wal.batch_append_for_topic("events", &entries)?;
    }

    Ok(())
}

fn get_event_batches() -> Vec<Vec<Vec<u8>>> {
    // Your data source here
    vec![]
}

Recovery Patterns

Crash Recovery with Replay

use walrus_rust::Walrus;
use std::collections::HashMap;

#[derive(Default)]
struct AppState {
    user_balances: HashMap<u64, f64>,
}

impl AppState {
    fn apply_transaction(&mut self, user_id: u64, amount: f64) {
        *self.user_balances.entry(user_id).or_insert(0.0) += amount;
    }
}

fn main() -> std::io::Result<()> {
    let wal = Walrus::new()?;
    let mut state = AppState::default();

    // Replay WAL to rebuild state
    println!("Replaying WAL...");
    while let Some(entry) = wal.read_next("transactions", true)? {
        let parts: Vec<&str> = std::str::from_utf8(&entry.data)
            .unwrap()
            .split(',')
            .collect();

        let user_id: u64 = parts[0].parse().unwrap();
        let amount: f64 = parts[1].parse().unwrap();

        state.apply_transaction(user_id, amount);
    }

    println!("State rebuilt from WAL:");
    for (user_id, balance) in &state.user_balances {
        println!("  User {}: ${:.2}", user_id, balance);
    }

    // Continue normal operation
    state.apply_transaction(123, 50.0);
    wal.append_for_topic("transactions", b"123,50.0")?;

    Ok(())
}

Checkpoint-Based State Management

use walrus_rust::Walrus;
use serde::{Serialize, Deserialize};
use std::fs;

#[derive(Serialize, Deserialize, Default)]
struct Snapshot {
    version: u64,
    last_offset: u64,
    data: Vec<String>,
}

fn main() -> std::io::Result<()> {
    let wal = Walrus::new()?;

    // Load latest snapshot
    let mut snapshot = load_snapshot().unwrap_or_default();

    // Replay from snapshot point
    // (In practice, you'd track offset per-topic and resume)
    println!("Resuming from version {}", snapshot.version);

    // Process new entries
    let mut count = 0;
    while let Some(entry) = wal.read_next("data", true)? {
        let item = String::from_utf8_lossy(&entry.data).to_string();
        snapshot.data.push(item);
        count += 1;

        // Checkpoint every 1000 entries
        if count % 1000 == 0 {
            snapshot.version += 1;
            save_snapshot(&snapshot)?;
            println!("Checkpoint saved at version {}", snapshot.version);
        }
    }

    Ok(())
}

fn load_snapshot() -> Option<Snapshot> {
    let data = fs::read("snapshot.bin").ok()?;
    bincode::deserialize(&data).ok()
}

fn save_snapshot(snapshot: &Snapshot) -> std::io::Result<()> {
    let data = bincode::serialize(snapshot).unwrap();
    fs::write("snapshot.bin.tmp", data)?;
    fs::rename("snapshot.bin.tmp", "snapshot.bin")?;
    Ok(())
}

Advanced Patterns

Topic-Per-User Isolation

use walrus_rust::Walrus;
use std::sync::Arc;

struct UserEventLog {
    wal: Arc<Walrus>,
}

impl UserEventLog {
    fn new() -> std::io::Result<Self> {
        Ok(Self {
            wal: Arc::new(Walrus::new()?),
        })
    }

    fn log_event(&self, user_id: u64, event: &str) -> std::io::Result<()> {
        let topic = format!("user-{}", user_id);
        self.wal.append_for_topic(&topic, event.as_bytes())
    }

    fn get_user_events(&self, user_id: u64) -> std::io::Result<Vec<String>> {
        let topic = format!("user-{}", user_id);
        let mut events = vec![];

        while let Some(entry) = self.wal.read_next(&topic, true)? {
            events.push(String::from_utf8_lossy(&entry.data).to_string());
        }

        Ok(events)
    }
}

fn main() -> std::io::Result<()> {
    let log = UserEventLog::new()?;

    // Log events for different users
    log.log_event(123, "logged in")?;
    log.log_event(456, "viewed dashboard")?;
    log.log_event(123, "clicked button")?;

    // Retrieve per-user
    let user_123_events = log.get_user_events(123)?;
    println!("User 123: {:?}", user_123_events);

    let user_456_events = log.get_user_events(456)?;
    println!("User 456: {:?}", user_456_events);

    Ok(())
}

Output:

User 123: ["logged in", "clicked button"]
User 456: ["viewed dashboard"]

Namespaced Instances for Multi-Tenancy

use walrus_rust::{Walrus, ReadConsistency};
use std::collections::HashMap;

struct TenantManager {
    wals: HashMap<String, Walrus>,
}

impl TenantManager {
    fn new() -> Self {
        Self {
            wals: HashMap::new(),
        }
    }

    fn get_or_create_wal(&mut self, tenant_id: &str) -> std::io::Result<&Walrus> {
        if !self.wals.contains_key(tenant_id) {
            // Each tenant gets isolated WAL instance
            let wal = Walrus::with_consistency_for_key(
                tenant_id,
                ReadConsistency::StrictlyAtOnce,
            )?;
            self.wals.insert(tenant_id.to_string(), wal);
        }
        Ok(self.wals.get(tenant_id).unwrap())
    }

    fn log_event(&mut self, tenant_id: &str, topic: &str, data: &[u8])
        -> std::io::Result<()>
    {
        let wal = self.get_or_create_wal(tenant_id)?;
        wal.append_for_topic(topic, data)
    }
}

fn main() -> std::io::Result<()> {
    let mut manager = TenantManager::new();

    // Tenants have completely isolated WALs
    manager.log_event("acme-corp", "events", b"user signup")?;
    manager.log_event("widgets-inc", "events", b"purchase")?;
    manager.log_event("acme-corp", "metrics", b"cpu:45%")?;

    // Files stored separately:
    // wal_files/acme-corp/
    // wal_files/widgets-inc/

    Ok(())
}

Retry with Exponential Backoff

use walrus_rust::{Walrus, ReadConsistency};
use std::io::{Error, ErrorKind};
use std::time::Duration;
use std::thread;

fn append_with_retry(
    wal: &Walrus,
    topic: &str,
    data: &[u8],
    max_retries: u32,
) -> std::io::Result<()> {
    let mut attempt = 0;
    let mut backoff_ms = 100;

    loop {
        match wal.append_for_topic(topic, data) {
            Ok(()) => return Ok(()),
            Err(e) if e.kind() == ErrorKind::WouldBlock => {
                // Batch write in progress, retry
                attempt += 1;
                if attempt >= max_retries {
                    return Err(Error::new(
                        ErrorKind::TimedOut,
                        "Max retries exceeded",
                    ));
                }

                thread::sleep(Duration::from_millis(backoff_ms));
                backoff_ms *= 2;  // Exponential backoff
            }
            Err(e) => return Err(e),  // Other errors, fail immediately
        }
    }
}

fn main() -> std::io::Result<()> {
    let wal = Walrus::new()?;

    // Try appending with automatic retry
    append_with_retry(&wal, "events", b"important data", 5)?;

    println!("Write succeeded (possibly after retries)");

    Ok(())
}

Testing Patterns

Mock for Unit Tests

use walrus_rust::Walrus;
use std::collections::HashMap;

trait WalInterface {
    fn append(&self, topic: &str, data: &[u8]) -> std::io::Result<()>;
    fn read(&self, topic: &str) -> std::io::Result<Option<Vec<u8>>>;
}

// Real implementation
struct RealWal {
    wal: Walrus,
}

impl WalInterface for RealWal {
    fn append(&self, topic: &str, data: &[u8]) -> std::io::Result<()> {
        self.wal.append_for_topic(topic, data)
    }

    fn read(&self, topic: &str) -> std::io::Result<Option<Vec<u8>>> {
        Ok(self.wal.read_next(topic, true)?.map(|e| e.data))
    }
}

// Mock for testing
struct MockWal {
    data: HashMap<String, Vec<Vec<u8>>>,
}

impl MockWal {
    fn new() -> Self {
        Self { data: HashMap::new() }
    }
}

impl WalInterface for MockWal {
    fn append(&self, topic: &str, data: &[u8]) -> std::io::Result<()> {
        // In real mock, use interior mutability (RefCell, etc.)
        Ok(())
    }

    fn read(&self, topic: &str) -> std::io::Result<Option<Vec<u8>>> {
        Ok(self.data.get(topic).and_then(|v| v.first().cloned()))
    }
}

// Your business logic works with trait
fn process_events<W: WalInterface>(wal: &W) -> std::io::Result<()> {
    wal.append("events", b"event1")?;
    if let Some(data) = wal.read("events")? {
        println!("Processed: {:?}", data);
    }
    Ok(())
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_with_mock() {
        let mock = MockWal::new();
        process_events(&mock).unwrap();
        // Assert on mock state
    }
}

Performance Patterns

Measure Throughput

use walrus_rust::{Walrus, ReadConsistency, FsyncSchedule};
use std::time::Instant;

fn main() -> std::io::Result<()> {
    let wal = Walrus::with_consistency_and_schedule(
        ReadConsistency::AtLeastOnce { persist_every: 10_000 },
        FsyncSchedule::NoFsync,  // Max throughput
    )?;

    let num_entries = 1_000_000;
    let entry_size = 1024;
    let data = vec![0u8; entry_size];

    let start = Instant::now();

    for _ in 0..num_entries {
        wal.append_for_topic("benchmark", &data)?;
    }

    let elapsed = start.elapsed();
    let ops_per_sec = num_entries as f64 / elapsed.as_secs_f64();
    let mb_per_sec = (num_entries * entry_size) as f64 / elapsed.as_secs_f64() / 1_048_576.0;

    println!("Throughput: {:.0} ops/sec", ops_per_sec);
    println!("Bandwidth: {:.2} MB/sec", mb_per_sec);
    println!("Latency: {:.2} μs/op", elapsed.as_micros() as f64 / num_entries as f64);

    Ok(())
}

Error Handling Patterns

Graceful Degradation

use walrus_rust::Walrus;
use std::io::{Error, ErrorKind};

struct ResilientLogger {
    wal: Option<Walrus>,
}

impl ResilientLogger {
    fn new() -> Self {
        let wal = match Walrus::new() {
            Ok(w) => Some(w),
            Err(e) => {
                eprintln!("WARNING: WAL initialization failed: {}", e);
                None
            }
        };

        Self { wal }
    }

    fn log(&self, topic: &str, message: &str) {
        if let Some(ref wal) = self.wal {
            if let Err(e) = wal.append_for_topic(topic, message.as_bytes()) {
                eprintln!("WARNING: Failed to log to WAL: {}", e);
                // Fall back to stderr
                eprintln!("[{}] {}", topic, message);
            }
        } else {
            // WAL unavailable, log to stderr
            eprintln!("[{}] {}", topic, message);
        }
    }
}

fn main() {
    let logger = ResilientLogger::new();

    logger.log("events", "Application started");
    logger.log("events", "Processing request");

    // Even if WAL fails, application continues
    println!("Application running normally");
}

More Examples

For additional examples and patterns:

  • Test suite: tests/ directory in the repo contains 4000+ lines of real-world scenarios
  • Benchmarks: benches/ shows high-performance usage patterns
  • GitHub issues: Search for “example” tag for community contributions

Contribute your own: If you build something cool with Walrus, share it! We accept PRs for this examples page.