╔═══════════════════════════════════════════════════════════════════════════════════════════════════════╗
║ WALRUS BATCH READ/WRITE ARCHITECTURE ║
╚═══════════════════════════════════════════════════════════════════════════════════════════════════════╝
┌─────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ API LAYER (Public Interface) │
└─────────────────────────────────────────────────────────────────────────────────────────────────────┘
│
┌─────────────────────────┼─────────────────────────┐
│ │ │
▼ ▼ ▼
┌──────────────────┐ ┌──────────────────────┐ ┌──────────────────────┐
│ append_for_topic │ │batch_append_for_topic│ │batch_read_for_topic │
│ (single write) │ │ (batch write API) │ │ (batch read API) │
└────────┬─────────┘ └──────────┬───────────┘ └──────────┬───────────┘
│ │ │
│ │ │
└───────┬──────────────────┘ │
│ │
▼ ▼
┌─────────────────────────────────────┐ ┌─────────────────────────────────┐
│ WRITER SUBSYSTEM │ │ READER SUBSYSTEM │
│ (src/wal/runtime/writer.rs) │ │ (src/wal/runtime/walrus_read.rs)│
└─────────────────────────────────────┘ └─────────────────────────────────┘
╔═══════════════════════════════════════════════════════════════════════════════════════════════════════╗
║ BATCH WRITE FLOW (batch_append_for_topic) ║
╚═══════════════════════════════════════════════════════════════════════════════════════════════════════╝
┌──────────────────────────────────────────────────────────────────────────────────────┐
│ PHASE 0: VALIDATION │
├──────────────────────────────────────────────────────────────────────────────────────┤
│ • Check batch size: ≤ MAX_BATCH_ENTRIES (2,000) │
│ • Check total bytes: ≤ MAX_BATCH_BYTES (10GB) │
│ • Acquire atomic flag: is_batch_writing (compare_exchange) │
│ • Create RAII BatchGuard to ensure flag release │ │ │
│ FAIL ─► Return ErrorKind::InvalidInput (size) or WouldBlock (flag busy) │ └───────────────────────────────────────┬──────────────────────────────────────────────┘
│ SUCCESS
▼
┌──────────────────────────────────────────────────────────────────────────────────────┐ │ PHASE 1: PRE-ALLOCATION & PLANNING │
├──────────────────────────────────────────────────────────────────────────────────────┤
│ • Acquire locks: current_block (Mutex) + current_offset (Mutex) │
│ • Save revert info: original_offset, allocated_block_ids[] │
│ • Build write plan: Vec<(Block, offset, batch_index)> │
│ │
│ FOR EACH entry in batch: │
│ need = PREFIX_META_SIZE (64 bytes) + data.len() │
│ IF need fits in current_block: │ │ ├─► Add to write_plan: (current_block, planning_offset, idx) │
│ └─► planning_offset += need │
│ ELSE (need new block): │ │ ├─► Seal current block (set used, flush, append to reader chain) │
│ ├─► BlockAllocator::alloc_block(need) ─► new_block │
│ ├─► Track: allocated_block_ids.push(new_block.id) │
│ ├─► current_block = new_block │
│ └─► planning_offset = 0 │
│ │
│ • Result: Complete write plan with all blocks pre-allocated │ └───────────────────────────────────────┬──────────────────────────────────────────────┘
│ ┌───────────────────────┴────────────────────────┐ │ Backend Selection (Linux + FD backend?) │
└───┬─────────────────────────────────────────┬──┘
│ YES (io_uring) │ NO (mmap)
▼ ▼
┌─────────────────────────────────────┐ ┌─────────────────────────────────────┐ │ PHASE 2+3: IO_URING PATH │ │ FALLBACK: SEQUENTIAL MMAP PATH │
├─────────────────────────────────────┤ ├─────────────────────────────────────┤ │ PHASE 2: Preparation │ │ FOR EACH (block, offset, idx): │ │ • Create io_uring (size = plan.len)│ │ ├─► block.write(offset, data) │
│ • Build buffers[] for each entry: │ │ └─► ON ERROR: zero headers, │
│ ├─► Serialize Metadata: │ │ flush, rollback offset │
│ │ • read_size, checksum (FNV-1a)│ │ │
│ │ • owned_by, next_block_start │ │ Fsync all touched files │
│ ├─► Build combined buffer: │ │ Update writer offset = planning │
│ │ [64B metadata][data payload] │ └─────────────────────────────────────┘
│ └─► Push io_uring::opcode::Write │
│ with file_offset = blk.offset│
│ │
│ PHASE 3: Atomic Submission │
│ • submit_and_wait(plan.len()) │
│ • Check ALL completion queue entries│
│ • Validate: result == expected_bytes│
│ │
│ IF ANY FAILURE: │
│ ├─► Zero all entry headers │
│ ├─► Fsync zeros to disk │
│ ├─► Rollback offset │
│ ├─► Mark allocated blocks unlocked│
│ └─► Return Error │
│ │
│ SUCCESS: │
│ ├─► Fsync all touched files │
│ ├─► Update writer offset = planning│
│ └─► Release locks & RAII guard │
└─────────────────────────────────────┘
╔═══════════════════════════════════════════════════════════════════════════════════════════════════════╗
║ BATCH READ FLOW (batch_read_for_topic) ║
╚═══════════════════════════════════════════════════════════════════════════════════════════════════════╝
┌──────────────────────────────────────────────────────────────────────────────────────┐
│ PHASE 1: INITIALIZATION & CURSOR HYDRATION │
├──────────────────────────────────────────────────────────────────────────────────────┤
│ • Get or create ColReaderInfo (Arc<RwLock<ColReaderInfo>>) │
│ • Snapshot active writer state: (active_block, written_offset) │
│ │
│ IF NOT hydrated_from_index: │
│ ├─► Read from ReadOffsetIndex │
│ ├─► Check for TAIL_FLAG (bit 63): cursor in active tail vs sealed chain │
│ │ ├─► TAIL: extract block_id, set cur_block_idx = chain.len() │
│ │ └─► SEALED: use cur_block_idx, cur_block_offset from index │
│ └─► Mark hydrated_from_index = true │
│ │
│ IF persisted tail exists AND that block is now sealed: │
│ └─► Fold into sealed chain: find block_id in chain, update cur_block_idx/offset │
└───────────────────────────────────────┬──────────────────────────────────────────────┘
▼
┌──────────────────────────────────────────────────────────────────────────────────────┐
│ PHASE 2: BUILD READ PLAN │
├──────────────────────────────────────────────────────────────────────────────────────┤
│ Initialize: plan = Vec<ReadPlan>, planned_bytes = 0 │
│ │
│ SEALED CHAIN PLANNING (cur_idx < chain.len()): │
│ WHILE cur_idx < chain.len() AND planned_bytes < max_bytes: │
│ block = chain[cur_idx] │
│ IF cur_offset >= block.used: │
│ ├─► Mark block as checkpointed │
│ └─► Advance: cur_idx++, cur_offset = 0, continue │
│ ELSE: │
│ end = min(block.used, cur_offset + (max_bytes - planned_bytes)) │
│ plan.push(ReadPlan { blk, start: cur_offset, end, is_tail: false }) │
│ planned_bytes += (end - cur_offset) │
│ cur_idx++, cur_offset = 0 │
│ │
│ TAIL PLANNING (if cur_idx >= chain.len()): │
│ tail_start = if tail_block_id == active_block.id { tail_offset } else { 0 } │
│ IF tail_start < written: │
│ plan.push(ReadPlan { blk: active_block, start: tail_start, end: written, │
│ is_tail: true }) │
│ │
│ IF plan.is_empty(): Return empty Vec │
└───────────────────────────────────────┬──────────────────────────────────────────────┘
│
┌───────────────────────┴────────────────────────┐
│ Lock Strategy (StrictlyAtOnce vs AtLeastOnce) │
└───┬─────────────────────────────────────────┬──┘
│ StrictlyAtOnce │ AtLeastOnce
▼ ▼
┌──────────────────┐ ┌──────────────────┐
│ HOLD LOCK │ │ RELEASE LOCK │
│ during I/O │ │ before I/O │
│ (prevents dup │ │ (allows parallel │
│ consumption) │ │ readers) │
└────────┬─────────┘ └────────┬─────────┘
└───────────────────┬────────────────────┘
▼
┌──────────────────────────────────────────────────────────────────────────────────────┐
│ PHASE 3: EXECUTE READS │
├──────────────────────────────────────────────────────────────────────────────────────┤
│ ┌────────────────────────────┐ ┌────────────────────────────┐ │
│ │ LINUX + FD BACKEND │ │ FALLBACK (mmap) │ │
│ ├────────────────────────────┤ ├────────────────────────────┤ │
│ │ • Create io_uring │ │ FOR EACH read_plan: │ │
│ │ • FOR EACH read_plan: │ │ size = end - start │ │
│ │ ├─► Allocate buffer[size]│ │ buffer = vec![0; size] │ │
│ │ ├─► Get FD from block │ │ offset = blk.offset + │ │
│ │ ├─► Push Read opcode: │ │ read_plan.start │ │
│ │ │ fd, buffer, size, │ │ blk.mmap.read(offset, │ │
│ │ │ offset, user_data │ │ buffer) │ │
│ │ └─► Store buffer │ │ buffers.push(buffer) │ │
│ │ │ └────────────────────────────┘ │
│ │ • submit_and_wait(plan.len)│ │
│ │ • Validate completions: │ │
│ │ ├─► result >= 0 │ │
│ │ └─► result == expected │ │
│ │ FAIL ─► UnexpectedEof │ │
│ └────────────────────────────┘ │
└───────────────────────────────────────┬──────────────────────────────────────────────┘
▼
┌──────────────────────────────────────────────────────────────────────────────────────┐
│ PHASE 4: PARSE ENTRIES & COMMIT PROGRESS │
├──────────────────────────────────────────────────────────────────────────────────────┤
│ Initialize: entries = Vec, total_data_bytes = 0, entries_parsed = 0 │
│ │
│ FOR EACH (plan_idx, read_plan) in plan: │
│ buffer = buffers[plan_idx] │
│ buf_offset = 0 │
│ │
│ WHILE buf_offset < buffer.len() AND entries.len() < MAX_BATCH_ENTRIES: │
│ ┌─ Read 64-byte metadata header ─────────────────────────────────┐ │
│ │ Bytes 0-1: meta_len (little-endian) │ │
│ │ Bytes 2+: rkyv-serialized Metadata │ │
│ │ ├─► read_size (data payload size) │ │
│ │ ├─► checksum (FNV-1a hash) │ │
│ │ ├─► owned_by (topic name) │ │
│ │ └─► next_block_start │ │
│ └────────────────────────────────────────────────────────────────┘ │
│ │
│ entry_consumed = PREFIX_META_SIZE (64) + data_size │
│ │
│ ┌─ Enforce Budget ───────────────────────────────────────────────┐ │
│ │ next_total = total_data_bytes + data_size │ │
│ │ IF next_total > max_bytes AND entries is not empty: │ │
│ │ └─► BREAK (budget exceeded, stop parsing) │ │
│ │ ELSE always allow at least 1 entry │ │
│ └────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌─ Verify Checksum ──────────────────────────────────────────────┐ │
│ │ data_slice = buffer[buf_offset+64 .. buf_offset+64+data_size] │ │
│ │ IF checksum64(data_slice) != meta.checksum: │ │
│ │ └─► Return ErrorKind::InvalidData │ │
│ └────────────────────────────────────────────────────────────────┘ │
│ │
│ entries.push(Entry { data: data_slice.to_vec() }) │
│ total_data_bytes = next_total │
│ entries_parsed++ │
│ │
│ ┌─ Track Position ───────────────────────────────────────────────┐ │
│ │ in_block_offset = read_plan.start + buf_offset + entry_consumed│ │
│ │ IF read_plan.is_tail: │ │
│ │ └─► final_tail_block_id, final_tail_offset = ... │ │
│ │ ELSE (sealed chain): │ │
│ │ └─► final_block_idx, final_block_offset = ... │ │
│ └────────────────────────────────────────────────────────────────┘ │
│ │
│ buf_offset += entry_consumed │
│ │
│ ┌─ Commit Progress (if checkpoint=true) ─────────────────────────────┐ │
│ │ IF saw_tail: │ │
│ │ ├─► Update: tail_block_id, tail_offset │ │
│ │ └─► Persist to index with TAIL_FLAG │ │
│ │ ELSE (sealed): │ │
│ │ ├─► Update: cur_block_idx, cur_block_offset │ │
│ │ └─► Persist to index │ │
│ │ │ │
│ │ StrictlyAtOnce: ALWAYS persist │ │
│ │ AtLeastOnce: persist every N reads (threshold-based) │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │
│ RETURN entries (Vec<Entry>) │
└──────────────────────────────────────────────────────────────────────────────────────┘
╔═══════════════════════════════════════════════════════════════════════════════════════════════════════╗
║ CORE COMPONENTS ║
╚═══════════════════════════════════════════════════════════════════════════════════════════════════════╝
┌────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ WRITER (per topic) READER (per topic) │
│ ┌──────────────────────────────────────┐ ┌──────────────────────────────────────┐ │
│ │ Writer { │ │ ColReaderInfo { │ │
│ │ allocator: Arc<BlockAllocator> │ │ chain: Vec<Block> // sealed │ │
│ │ current_block: Mutex<Block> │ │ cur_block_idx: usize │ │
│ │ current_offset: Mutex<u64> │ │ cur_block_offset: u64 │ │
│ │ is_batch_writing: AtomicBool ◄─────┼──┐ │ tail_block_id: u64 │ │
│ │ reader: Arc<Reader> │ │ │ tail_offset: u64 │ │
│ │ fsync_schedule: FsyncSchedule │ │ │ reads_since_persist: u32 │ │
│ │ } │ │ │ hydrated_from_index: bool │ │
│ └──────────────────────────────────────┘ │ │ } │ │
│ │ └──────────────────────────────────────┘ │
│ Concurrency Control: │ Read Consistency: │
│ • Mutex locks prevent concurrent writes │ • StrictlyAtOnce: hold lock during I/O │
│ • AtomicBool prevents batch + regular ────┘ • AtLeastOnce: release lock, parallel OK │
│ • Regular write() checks is_batch_writing │
│ returns WouldBlock if batch in progress │
└────────────────────────────────────────────────────────────────────────────────────────────────────┘
┌────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ BLOCK ALLOCATOR READ OFFSET INDEX │
│ ┌──────────────────────────────────────┐ ┌──────────────────────────────────────┐ │
│ │ BlockAllocator { │ │ ReadOffsetIndex { │ │
│ │ // Manages block lifecycle │ │ <topic>_index.db files │ │
│ │ // Pre-allocates 10MB blocks │ │ Stores: │ │
│ │ // Tracks file usage (1GB files) │ │ cur_block_idx | TAIL_FLAG │ │
│ │ // Block states: locked/unlocked │ │ cur_block_offset │ │
│ │ } │ │ TAIL_FLAG = 1 << 63 │ │
│ │ │ │ } │ │
│ │ alloc_block(need: u64) -> Block │ │ │ │
│ │ ├─► Returns pre-allocated block │ │ Persistence Strategy: │ │
│ │ └─► Marks as locked │ │ StrictlyAtOnce: every read │ │
│ │ │ │ AtLeastOnce: threshold-based │ │
│ └──────────────────────────────────────┘ └──────────────────────────────────────┘ │
└────────────────────────────────────────────────────────────────────────────────────────────────────┘
╔═══════════════════════════════════════════════════════════════════════════════════════════════════════╗
║ STORAGE LAYOUT ║
╚═══════════════════════════════════════════════════════════════════════════════════════════════════════╝
Filesystem Structure:
┌─────────────────────────────────────────────────────────────────────────────────────────┐
│ wal_files/<namespace>/ │
│ ├── <timestamp_1>.wal (1GB file = 100 × 10MB blocks) │
│ ├── <timestamp_2>.wal │
│ ├── ... │
│ ├── topic_A_index.db (persisted read cursor) │
│ ├── topic_B_index.db │
│ └── ... │
└─────────────────────────────────────────────────────────────────────────────────────────┘
Block Structure (10MB logical blocks):
┌──────────────────────────────────────────────────────────────────────────────────────────┐
│ Block { │
│ id: u64 // unique block identifier │
│ offset: u64 // byte offset in file │
│ limit: u64 // 10MB capacity │
│ used: u64 // bytes written (sealed blocks only) │
│ mmap: SharedMmap // storage backend (FD or mmap) │
│ file_path: String │
│ } │
└──────────────────────────────────────────────────────────────────────────────────────────┘
Entry Layout (within block):
┌──────────────────────────────────────────────────────────────────────────────────────────┐
│ ┌──────────────────────────┬────────────────────────────────────────────────┐ │
│ │ METADATA PREFIX (64B) │ DATA PAYLOAD (variable) │ │
│ ├──────────────────────────┼────────────────────────────────────────────────┤ │
│ │ [0-1]: meta_len (u16 LE) │ │ │
│ │ [2+]: rkyv Metadata { │ Raw bytes (max ~10MB) │ │
│ │ read_size │ │ │
│ │ checksum (FNV) │ │ │
│ │ owned_by │ │ │
│ │ next_block_start│ │ │
│ │ } │ │ │
│ └──────────────────────────┴────────────────────────────────────────────────┘ │
│ │
│ Checksum: FNV-1a hash computed over data payload only │
│ Verified on every read before returning to caller │
└──────────────────────────────────────────────────────────────────────────────────────────┘
Storage Backend Selection:
┌──────────────────────────────────────────────────────────────────────────────────────────┐
│ FD Backend (enable_fd_backend) │ Mmap Backend (default/fallback) │
│ ├─► Linux + io_uring support │ ├─► Non-Linux platforms │
│ ├─► Batch writes via io_uring │ ├─► Sequential writes │
│ ├─► Batch reads via io_uring │ ├─► Direct mmap read/write │
│ └─► Better throughput for large batches │ └─► Simpler, universal compatibility │
└──────────────────────────────────────────────────────────────────────────────────────────┘
╔═══════════════════════════════════════════════════════════════════════════════════════════════════════╗
║ KEY CONSTRAINTS & LIMITS ║
╚═══════════════════════════════════════════════════════════════════════════════════════════════════════╝
┌────────────────────────────────────────────────────────────────────────────────────────┐
│ Constant │ Value │ Purpose │
├──────────────────────────┼────────────┼─────────────────────────────────────────────────┤
│ MAX_BATCH_ENTRIES │ 2,000 │ Entry cap for batch read/write │
│ │ │ (stays below io_uring SQ limit of 2,047) │
├──────────────────────────┼────────────┼─────────────────────────────────────────────────┤
│ MAX_BATCH_BYTES │ 10 GB │ Total size limit for batch writes │
│ │ │ (enables pre-computation of block requirements) │
├──────────────────────────┼────────────┼─────────────────────────────────────────────────┤
│ DEFAULT_BLOCK_SIZE │ 10 MB │ Logical block size │
├──────────────────────────┼────────────┼─────────────────────────────────────────────────┤
│ BLOCKS_PER_FILE │ 100 │ Blocks per WAL file │
├──────────────────────────┼────────────┼─────────────────────────────────────────────────┤
│ MAX_FILE_SIZE │ 1 GB │ 10MB × 100 blocks │
├──────────────────────────┼────────────┼─────────────────────────────────────────────────┤
│ PREFIX_META_SIZE │ 64 bytes │ Metadata prefix per entry │
└────────────────────────────────────────────────────────────────────────────────────────┘
Error Handling:
┌────────────────────────────────────────────────────────────────────────────────────────┐
│ Error │ Cause │ Recovery Action │
├──────────────────────────┼────────────────────────────────┼───────────────────────────┤
│ InvalidInput │ Batch > 2000 entries or 10GB │ Immediate return │
├──────────────────────────┼────────────────────────────────┼───────────────────────────┤
│ WouldBlock │ Concurrent batch write │ Client retry with backoff │
├──────────────────────────┼────────────────────────────────┼───────────────────────────┤
│ UnexpectedEof │ Short read from io_uring/mmap │ Data corruption detected │
├──────────────────────────┼────────────────────────────────┼───────────────────────────┤
│ InvalidData │ Checksum mismatch │ Data corruption detected │
├──────────────────────────┼────────────────────────────────┼───────────────────────────┤
│ Unsupported │ FD backend not available │ Fallback to mmap or fail │
├──────────────────────────┼────────────────────────────────┼───────────────────────────┤
│ Write failure (batch) │ io_uring error │ Zero headers, rollback │
│ │ │ offset, unlock blocks │
└────────────────────────────────────────────────────────────────────────────────────────┘
╔═══════════════════════════════════════════════════════════════════════════════════════════════════════╗
║ ATOMICITY GUARANTEES ║
╚═══════════════════════════════════════════════════════════════════════════════════════════════════════╝
Batch Write Atomicity:
┌────────────────────────────────────────────────────────────────────────────────────────┐
│ All entries in a batch are written atomically - either ALL or NONE │
│ │
│ Mechanism: │
│ 1. io_uring batched submission → Kernel-level atomicity │
│ 2. Pre-allocation → No mid-batch allocation failures │
│ 3. Held locks → No concurrent modifications during batch │
│ 4. Atomic flag (is_batch_writing) → Prevents concurrent regular writes │
│ 5. Rollback on failure: │
│ ├─► Zero all entry headers (make unreadable) │
│ ├─► Restore original writer offset │
│ └─► Mark allocated blocks as unlocked/reclaimable │
│ │
│ Writer offset is ONLY updated on complete success (Phase 4) │
│ Readers will see either all entries or none │
└────────────────────────────────────────────────────────────────────────────────────────┘
Read Consistency Models:
┌────────────────────────────────────────────────────────────────────────────────────────┐
│ StrictlyAtOnce │ AtLeastOnce │
│ ├─► Hold write lock during I/O │ ├─► Release lock before I/O │
│ ├─► Prevents duplicate consumption │ ├─► Allows parallel readers │
│ ├─► Persist after every read │ ├─► Persist every N reads (threshold) │
│ └─► Single consumer guarantee │ └─► Multiple consumers OK, may re-read │
└────────────────────────────────────────────────────────────────────────────────────────┘