GroupCoordinator Architecture (P3.A)
Design principle — broker owns ALL group state
Mirroring the produce/fetch architecture exactly: the broker owns all group state; the gateway is a stateless wire translator. The gateway decodes the Kafka wire protocol for each consumer-group API, builds a postcard-encoded Op struct, sends it over the KafkaOp iroh bi-stream to the broker, and re-encodes the response. No group state lives in the gateway.
The single exception is FindCoordinator (api_key 10): the gateway always returns itself as the coordinator (node_id=0, RAFKA_KAFKA_CLIENT_HOST:RAFKA_KAFKA_CLIENT_PORT) without a broker call, because in rafka-v2's single-gateway topology the gateway IS the coordinator endpoint.
KafkaOp constants (0x06–0x0B)
| Constant | Value | Op struct | Description |
|---|---|---|---|
KAFKA_OP_JOIN_GROUP | 0x06 | JoinGroupOp / JoinGroupOpResp | JoinGroup — enter or rejoin a consumer group |
KAFKA_OP_SYNC_GROUP | 0x07 | SyncGroupOp / SyncGroupOpResp | SyncGroup — distribute partition assignments |
KAFKA_OP_HEARTBEAT | 0x08 | HeartbeatOp / HeartbeatOpResp | Heartbeat — keep-alive from a stable member |
KAFKA_OP_LEAVE_GROUP | 0x09 | LeaveGroupOp / LeaveGroupOpResp | LeaveGroup — graceful member departure |
KAFKA_OP_OFFSET_COMMIT | 0x0A | OffsetCommitOp / OffsetCommitOpResp | OffsetCommit — persist committed offsets |
KAFKA_OP_OFFSET_FETCH | 0x0B | OffsetFetchOp / OffsetFetchOpResp | OffsetFetch — retrieve committed offsets on join |
All Op structs use postcard .encode() / .decode(). org_id rides the KafkaOp carrier (not in the struct payload).
GroupCoordinator — process-global (OnceLock)
pub struct GroupCoordinator {
// (org_id, group_id_string) → Group.
// Groups persist across member leaves (including last-member → Empty)
// so offsets survive for the rejoin guarantee.
groups: RwLock<HashMap<(u64, String), Arc<RwLock<Group>>>>,
// In-memory offset store: (org_id, group_id, topic, partition) → (offset, metadata).
// Durable WAL persistence is P3.B.
offsets: RwLock<HashMap<(u64, String, String, i32), (i64, Option<String>)>>,
}
The coordinator is process-global, initialised once via OnceLock. Each test that exercises the coordinator directly uses a fresh_coord() helper that creates a new Arc<GroupCoordinator>, bypassing the global to avoid cross-test contamination.
Rebalance — events, not timeouts
The rebalance completion is signalled via a Tokio oneshot channel, never a tokio::time::timeout or a polling loop.
Flow for a single-member join
handle_join_group (broker handler, #[instrument])
│
├── coordinator.join_group(org_id, group_id, member_id, ...)
│ ├── create group entry if absent (mint cgp_id ONCE; rejoin re-uses)
│ ├── insert member
│ ├── state → PreparingRebalance
│ ├── push (member_id, tx) to join_waiters
│ └── if waiters.len() == members.len():
│ complete_join_phase(g) ← SYNCHRONOUS for single member
│ ├── generation_id += 1
│ ├── state → CompletingRebalance
│ ├── elect leader (lexicographic-first member_id)
│ └── fire tx.send(JoinResult) for each waiter
│
└── rx.await ← awaits the oneshot; gateway rail has no time cap
│
└── encode JoinGroupOpResp and return
For multi-member groups (P3.C), complete_join_phase would be triggered after the debounce window (configurable via RAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS). In P3.A this defaults to 0 (immediate).
SyncGroup flow
handle_sync_group
│
├── if generation_id != g.generation_id → return error_code 22
├── if state != CompletingRebalance → return error_code 27
├── if is_leader:
│ ├── write assignments into each member's struct
│ ├── fire tx.send(assignment) immediately (leader gets response now)
│ ├── drain sync_waiters → fire each non-leader's oneshot
│ └── state → Stable
└── else (non-leader):
└── push (member_id, tx) to sync_waiters; rx.await
Offset key shape
In-memory offsets are stored in a HashMap with a four-tuple key:
(org_id: u64, group_id: String, topic: String, partition: i32) → (offset: i64, metadata: Option<String>)
committed_offset=-1means "no committed offset for this (group, topic, partition)". This is the sentinel valueOffsetFetchreturns when the key is absent.metadatais an optional client-supplied string (e.g., a JSON blob). It is stored and returned verbatim.- An empty
partitionsvector inOffsetFetchOpmeans "fetch ALL committed offsets for this (org_id, group_id)".
Op struct shapes (for reference)
// JoinGroup
struct JoinGroupOp {
group_id: String, member_id: String, protocol_type: String,
session_timeout_ms: i32, protocols: Vec<(String, Vec<u8>)>
}
struct JoinGroupOpResp {
error_code: i16, generation_id: i32, protocol_name: String,
leader_id: String, member_id: String, members: Vec<(String, Vec<u8>)>
}
// SyncGroup
struct SyncGroupOp {
group_id: String, member_id: String, generation_id: i32,
assignments: Vec<(String, Vec<u8>)>
}
struct SyncGroupOpResp { error_code: i16, assignment: Vec<u8> }
// Heartbeat
struct HeartbeatOp { group_id: String, member_id: String, generation_id: i32 }
struct HeartbeatOpResp { error_code: i16 }
// LeaveGroup
struct LeaveGroupOp { group_id: String, member_id: String }
struct LeaveGroupOpResp { error_code: i16 }
// OffsetCommit
struct OffsetCommitOp {
group_id: String,
offsets: Vec<(String /*topic*/, i32 /*partition*/, i64 /*offset*/, Option<String> /*metadata*/)>
}
struct OffsetCommitOpResp { results: Vec<(String, i32, i16 /*error_code*/)> }
// OffsetFetch
struct OffsetFetchOp { group_id: String, partitions: Vec<(String, i32)> }
struct OffsetFetchOpResp {
offsets: Vec<(String, i32, i64 /*offset; -1 if none*/, Option<String>, i16 /*error_code*/)>
}
Error code table
| Code | Name | When emitted |
|---|---|---|
| 0 | None | Successful operation |
| 16 | COORDINATOR_NOT_AVAILABLE | Group not found in coordinator (Heartbeat/SyncGroup on unknown group) |
| 22 | ILLEGAL_GENERATION | Heartbeat or SyncGroup with wrong generation_id; group rebalanced |
| 25 | UNKNOWN_MEMBER_ID | Heartbeat from a member_id the coordinator does not recognise |
| 27 | REBALANCE_IN_PROGRESS | SyncGroup sent when group is not in CompletingRebalance; Heartbeat during rebalance |
| 69 | — | Dead group sentinel (join to a Dead group) — reserved; not triggered in P3.A |
OffsetCommit returns per-partition error_code=0 unconditionally (in-memory write cannot fail in normal operation). OffsetFetch returns per-partition error_code=0 with committed_offset=-1 for absent entries.
Spans emitted
| Span name | Handler | Notes |
|---|---|---|
rafka.gateway.kafka.find-coordinator | gateway handle_find_coordinator | Gateway-local; no broker call. Returns self. |
rafka.gateway.group.join | gateway handle_join_group_request | Wraps the broker round-trip. |
rafka.gateway.group.sync | gateway handle_sync_group_request | |
rafka.gateway.group.heartbeat | gateway handle_heartbeat_request | |
rafka.gateway.group.leave | gateway handle_leave_group_request | |
rafka.gateway.group.offset-commit | gateway handle_offset_commit_request | |
rafka.gateway.group.offset-fetch | gateway handle_offset_fetch_request | |
rafka.broker.group.join | broker handle_join_group | Includes the rx.await — fires when rebalance completes. |
rafka.broker.group.sync | broker handle_sync_group | |
rafka.broker.group.heartbeat | broker handle_heartbeat | |
rafka.broker.group.leave | broker handle_leave_group | |
rafka.broker.group.offset-commit | broker handle_offset_commit | |
rafka.broker.group.offset-fetch | broker handle_offset_fetch | |
rafka.broker.coordinator.rebalance-complete | broker handle_join_group | Nested info span fired when the rebalance oneshot fires. Attributes: generation_id, leader_id. |
What is NOT in P3.A
| Concern | Status |
|---|---|
| Durable offsets (WAL) | P3.B |
| Multi-member rebalance | P3.C |
| Session timeout expiry / dead-member eviction | P3.C |
| Gateway-side group registry / authz | P5 |
| Group describe REST endpoint | P5 |