Skip to main content

GroupCoordinator Architecture (P3.A)

Design principle — broker owns ALL group state

Mirroring the produce/fetch architecture exactly: the broker owns all group state; the gateway is a stateless wire translator. The gateway decodes the Kafka wire protocol for each consumer-group API, builds a postcard-encoded Op struct, sends it over the KafkaOp iroh bi-stream to the broker, and re-encodes the response. No group state lives in the gateway.

The single exception is FindCoordinator (api_key 10): the gateway always returns itself as the coordinator (node_id=0, RAFKA_KAFKA_CLIENT_HOST:RAFKA_KAFKA_CLIENT_PORT) without a broker call, because in rafka-v2's single-gateway topology the gateway IS the coordinator endpoint.

KafkaOp constants (0x06–0x0B)

ConstantValueOp structDescription
KAFKA_OP_JOIN_GROUP0x06JoinGroupOp / JoinGroupOpRespJoinGroup — enter or rejoin a consumer group
KAFKA_OP_SYNC_GROUP0x07SyncGroupOp / SyncGroupOpRespSyncGroup — distribute partition assignments
KAFKA_OP_HEARTBEAT0x08HeartbeatOp / HeartbeatOpRespHeartbeat — keep-alive from a stable member
KAFKA_OP_LEAVE_GROUP0x09LeaveGroupOp / LeaveGroupOpRespLeaveGroup — graceful member departure
KAFKA_OP_OFFSET_COMMIT0x0AOffsetCommitOp / OffsetCommitOpRespOffsetCommit — persist committed offsets
KAFKA_OP_OFFSET_FETCH0x0BOffsetFetchOp / OffsetFetchOpRespOffsetFetch — retrieve committed offsets on join

All Op structs use postcard .encode() / .decode(). org_id rides the KafkaOp carrier (not in the struct payload).

GroupCoordinator — process-global (OnceLock)

pub struct GroupCoordinator {
// (org_id, group_id_string) → Group.
// Groups persist across member leaves (including last-member → Empty)
// so offsets survive for the rejoin guarantee.
groups: RwLock<HashMap<(u64, String), Arc<RwLock<Group>>>>,

// In-memory offset store: (org_id, group_id, topic, partition) → (offset, metadata).
// Durable WAL persistence is P3.B.
offsets: RwLock<HashMap<(u64, String, String, i32), (i64, Option<String>)>>,
}

The coordinator is process-global, initialised once via OnceLock. Each test that exercises the coordinator directly uses a fresh_coord() helper that creates a new Arc<GroupCoordinator>, bypassing the global to avoid cross-test contamination.

Rebalance — events, not timeouts

The rebalance completion is signalled via a Tokio oneshot channel, never a tokio::time::timeout or a polling loop.

Flow for a single-member join

handle_join_group (broker handler, #[instrument])

├── coordinator.join_group(org_id, group_id, member_id, ...)
│ ├── create group entry if absent (mint cgp_id ONCE; rejoin re-uses)
│ ├── insert member
│ ├── state → PreparingRebalance
│ ├── push (member_id, tx) to join_waiters
│ └── if waiters.len() == members.len():
│ complete_join_phase(g) ← SYNCHRONOUS for single member
│ ├── generation_id += 1
│ ├── state → CompletingRebalance
│ ├── elect leader (lexicographic-first member_id)
│ └── fire tx.send(JoinResult) for each waiter

└── rx.await ← awaits the oneshot; gateway rail has no time cap

└── encode JoinGroupOpResp and return

For multi-member groups (P3.C), complete_join_phase would be triggered after the debounce window (configurable via RAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS). In P3.A this defaults to 0 (immediate).

SyncGroup flow

handle_sync_group

├── if generation_id != g.generation_id → return error_code 22
├── if state != CompletingRebalance → return error_code 27
├── if is_leader:
│ ├── write assignments into each member's struct
│ ├── fire tx.send(assignment) immediately (leader gets response now)
│ ├── drain sync_waiters → fire each non-leader's oneshot
│ └── state → Stable
└── else (non-leader):
└── push (member_id, tx) to sync_waiters; rx.await

Offset key shape

In-memory offsets are stored in a HashMap with a four-tuple key:

(org_id: u64, group_id: String, topic: String, partition: i32) → (offset: i64, metadata: Option<String>)
  • committed_offset=-1 means "no committed offset for this (group, topic, partition)". This is the sentinel value OffsetFetch returns when the key is absent.
  • metadata is an optional client-supplied string (e.g., a JSON blob). It is stored and returned verbatim.
  • An empty partitions vector in OffsetFetchOp means "fetch ALL committed offsets for this (org_id, group_id)".

Op struct shapes (for reference)

// JoinGroup
struct JoinGroupOp {
group_id: String, member_id: String, protocol_type: String,
session_timeout_ms: i32, protocols: Vec<(String, Vec<u8>)>
}
struct JoinGroupOpResp {
error_code: i16, generation_id: i32, protocol_name: String,
leader_id: String, member_id: String, members: Vec<(String, Vec<u8>)>
}

// SyncGroup
struct SyncGroupOp {
group_id: String, member_id: String, generation_id: i32,
assignments: Vec<(String, Vec<u8>)>
}
struct SyncGroupOpResp { error_code: i16, assignment: Vec<u8> }

// Heartbeat
struct HeartbeatOp { group_id: String, member_id: String, generation_id: i32 }
struct HeartbeatOpResp { error_code: i16 }

// LeaveGroup
struct LeaveGroupOp { group_id: String, member_id: String }
struct LeaveGroupOpResp { error_code: i16 }

// OffsetCommit
struct OffsetCommitOp {
group_id: String,
offsets: Vec<(String /*topic*/, i32 /*partition*/, i64 /*offset*/, Option<String> /*metadata*/)>
}
struct OffsetCommitOpResp { results: Vec<(String, i32, i16 /*error_code*/)> }

// OffsetFetch
struct OffsetFetchOp { group_id: String, partitions: Vec<(String, i32)> }
struct OffsetFetchOpResp {
offsets: Vec<(String, i32, i64 /*offset; -1 if none*/, Option<String>, i16 /*error_code*/)>
}

Error code table

CodeNameWhen emitted
0NoneSuccessful operation
16COORDINATOR_NOT_AVAILABLEGroup not found in coordinator (Heartbeat/SyncGroup on unknown group)
22ILLEGAL_GENERATIONHeartbeat or SyncGroup with wrong generation_id; group rebalanced
25UNKNOWN_MEMBER_IDHeartbeat from a member_id the coordinator does not recognise
27REBALANCE_IN_PROGRESSSyncGroup sent when group is not in CompletingRebalance; Heartbeat during rebalance
69Dead group sentinel (join to a Dead group) — reserved; not triggered in P3.A

OffsetCommit returns per-partition error_code=0 unconditionally (in-memory write cannot fail in normal operation). OffsetFetch returns per-partition error_code=0 with committed_offset=-1 for absent entries.

Spans emitted

Span nameHandlerNotes
rafka.gateway.kafka.find-coordinatorgateway handle_find_coordinatorGateway-local; no broker call. Returns self.
rafka.gateway.group.joingateway handle_join_group_requestWraps the broker round-trip.
rafka.gateway.group.syncgateway handle_sync_group_request
rafka.gateway.group.heartbeatgateway handle_heartbeat_request
rafka.gateway.group.leavegateway handle_leave_group_request
rafka.gateway.group.offset-commitgateway handle_offset_commit_request
rafka.gateway.group.offset-fetchgateway handle_offset_fetch_request
rafka.broker.group.joinbroker handle_join_groupIncludes the rx.await — fires when rebalance completes.
rafka.broker.group.syncbroker handle_sync_group
rafka.broker.group.heartbeatbroker handle_heartbeat
rafka.broker.group.leavebroker handle_leave_group
rafka.broker.group.offset-commitbroker handle_offset_commit
rafka.broker.group.offset-fetchbroker handle_offset_fetch
rafka.broker.coordinator.rebalance-completebroker handle_join_groupNested info span fired when the rebalance oneshot fires. Attributes: generation_id, leader_id.

What is NOT in P3.A

ConcernStatus
Durable offsets (WAL)P3.B
Multi-member rebalanceP3.C
Session timeout expiry / dead-member evictionP3.C
Gateway-side group registry / authzP5
Group describe REST endpointP5