Skip to main content

WAL Durability

Rafka's broker stores produced records in a write-ahead log (WAL) called SingleWal. Records are written to disk and survive broker restarts. When the broker starts it recovers the full virtual topic index from the WAL without any external coordination.

Storage layout

The WAL stores records in fixed-size 1 GB segment files. Each segment has a companion memory-mapped Dense Offset Index (<segment_id>.oidx) that maps global message IDs to physical byte locations.

data/
0.log — segment 0 (raw record bytes, length-prefixed)
0.oidx — memory-mapped offset index (40 bytes per entry)
1.log — segment 1 (created when segment 0 fills)
1.oidx
...

Each OffsetIndexEntry (40 bytes) stores:

FieldTypePurpose
segment_idu32Which .log file the record is in
physical_offsetu32Byte offset within the segment
lengthu32Record byte length
org_idu64Tenant org identifier
topic_hashu128Blake3 org-salted topic hash
partitionu32Partition index

The (org_id, topic_hash, partition) tuple in every index entry is what makes the virtual topic index recoverable after a restart — the broker can rebuild it by scanning the index without reading the record data.

Virtual topic index

The virtual topic index maps a virtual topic key to a RoaringTreemap bitset of global message IDs.

Virtual topic key format: "{org_id}-{topic_hash_decimal}-{partition}"

The bitset's rank-of-a-global-ID is the virtual offset — the Kafka-visible offset clients use in ProduceResponse.base_offset and FetchRequest.fetch_offset. This means:

  • Virtual offset 0 = the first record ever produced to this (org, topic, partition).
  • high_watermark = bitset.len() — the total record count.
  • Fetch from fetch_offset = N reads the Nth record in the bitset.

Append path

store_records(wal, vt_key, records) in broker/src/produce.rs:

  1. Calls wal.append_direct(vt_key, record_bytes, …) once per record.
  2. append_direct writes the record to the current segment and updates the offset index.
  3. The virtual topic index is updated: global_id is inserted into vt_key's bitset; the bitset length − 1 is the virtual offset returned as base_offset.

Span: rafka.broker.produce.store — attributes: virtual_topic, record_count, base_offset.

Fetch path

handle_fetch in broker/src/produce.rs:

  1. Looks up vt_key's bitset to read high_watermark.
  2. Calls wal.fetch_virtual(vt_key, virtual_offset, max_bytes) in a loop, incrementing virtual_offset each iteration.
  3. fetch_virtual translates a virtual offset → global message ID (bitset select) → OffsetIndexEntry → reads raw bytes from the segment file.
  4. Returns when fetch_virtual returns None (past HWM) or max_bytes is exhausted.

Span: rafka.broker.fetch.read — attributes: virtual_topic, fetch_offset, returned_count, high_watermark.

Restart recovery

When SingleWal::new(data_dir) runs on startup it performs a recovery scan:

  1. Reads every OffsetIndexEntry from all existing .oidx files in order.
  2. For each entry, reconstructs the virtual topic key from (org_id, topic_hash, partition) using parse_virtual_topic_tuple.
  3. Inserts the global_id into the corresponding bitset, restoring the full virtual topic index.

After recovery, high_watermark, virtual offsets, and fetch paths are identical to the pre-restart state. Records are not re-read from .log files during recovery — only the 40-byte index entries are scanned.

Configuration

Environment variableDefaultPurpose
RAFKA_DATA_DIR./dataBase directory for segment files and index files

The WAL is initialized once at broker startup via a OnceLock<Arc<SingleWal>>. All produce and fetch handlers share the same WAL instance for the lifetime of the process.

What is NOT yet implemented

  • Segment compaction / GC (old segments accumulate until disk pressure)
  • Replication (records live on one broker; Phase 4 adds HA)
  • Timestamp-exact offset resolution for ListOffsets with timestamp >= 0