Skip to main content

Fetch

The fetch vertical is Rafka's consumer-side Kafka op. A standard Kafka client sends a FetchRequest to the gateway on TCP :9092. The gateway decodes it, forwards a FetchOp to the broker over the iroh mesh, the broker reads records from the WAL, and the gateway returns a FetchResponse — all standard Kafka wire.

The fetch flow

Kafka client ──TCP :9092──▶ gateway (kafka_ingress.rs)
ApiVersions negotiation
Metadata request → self-advertise (gateway is the single broker)
ListOffsets (optional) → earliest/latest offset from WAL bitset
FetchRequest decode → FetchOp (postcard-encoded)
→ kafka_op_call(broker_peer, KAFKA_OP_FETCH, org_id, payload)
▼ iroh QUIC bi-stream
broker (run_bi_reader dispatch → handle_fetch)
→ wal_store().fetch_virtual(vt_key, fetch_offset, max_bytes)
◀ FetchOpResp { records, high_watermark, error_code }
→ gateway re-encodes records into a RecordBatch
→ gateway encodes FetchResponse ──────────────────▶ client

Every step emits an OTLP span. The fetch trace chain is:

gateway.kafka.connect → gateway.kafka.request → rafka.gateway.kafka.fetch → rafka.broker.fetch.read

FetchOp wire types

FetchOp and FetchOpResp are postcard-encoded and live in crates/rafka-mesh-ops:

  • FetchOp { topic, partition, fetch_offset, max_bytes } — what to fetch.
  • FetchOpResp { records: Vec<Vec<u8>>, high_watermark, error_code } — the raw record values and the topic's high-watermark offset.

The op code is KAFKA_OP_FETCH = 0x03.

How records are read from the WAL

The broker's handle_fetch handler:

  1. Builds the virtual-topic key: "{org_id}-{topic_hash_decimal}-{partition}". The topic_hash is a Blake3 org-salted u128 truncation of (org_id, topic_name).
  2. Reads the high-watermark as virtual_index.bitsets[vt_key].len().
  3. Fetches records one-by-one starting at fetch_offset via wal.fetch_virtual(vt_key, offset, max_bytes), stopping when the next record is None (past HWM) or the max_bytes budget is exhausted.
  4. Wraps the raw Vec<Vec<u8>> in a Kafka RecordBatch before returning.

Span attributes:

  • virtual_topic — the vt_key string
  • fetch_offset — the starting offset requested
  • returned_count — number of records returned
  • high_watermark — total record count for the topic/partition

Offsets and the high-watermark

  • fetch_offset = 0 returns from the beginning of the log.
  • high_watermark equals the total number of records produced to the (topic, partition) pair. A fetch from fetch_offset >= high_watermark returns an empty record set.
  • The WAL's virtual topic index is a RoaringTreemap bitset; virtual offsets are zero-indexed positions in the bitset.

ListOffsets integration

Before fetching, clients typically send a ListOffsetsRequest to discover the earliest or latest offset:

timestamp valueMeaningResolved offset
-2Earliest0 (WAL always starts at 0)
-1Latesthigh_watermark (bitset len)
>= 0By timestamphigh_watermark (approximate; exact timestamp indexing not yet implemented)

Span name: rafka.broker.list_offsets.read.

Metadata and self-advertisement

When kcat or librdkafka opens a connection it first sends a MetadataRequest. Rafka's gateway responds as the single broker (node_id=0, pointing at itself). Every requested topic gets a 1-partition entry (partition 0, leader 0) regardless of whether any records have been produced. Topics spring into existence on first produce — there is no explicit topic-creation step in P1.

Supported Kafka API versions

API KeyNameVersion range
18ApiVersionsv0–v3
3Metadatav0–v5
2ListOffsetsv1–v5
1Fetchv0–v11

Version caps are set below each API's flexible-encoding threshold so the response header remains v0 (no tagged fields) in all supported version ranges.

Current limitations

  • Single-tenantorg_id is hardcoded to 1. Multi-tenancy arrives in Phase 2.
  • Timestamp-exact ListOffsets (timestamp >= 0) returns high_watermark as an approximation; a full time index is a later phase item.
  • Consumer-group coordination (JoinGroup / SyncGroup / OffsetCommit) is not yet implemented (Phase 3).