Consuming with Consumer Groups
Rafka P3.A ships the full single-member consumer-group protocol (JoinGroup, SyncGroup, Heartbeat, LeaveGroup, OffsetCommit, OffsetFetch, FindCoordinator). Standard clients — confluent-kafka-python, librdkafka, kcat -G — connect with no special configuration.
Prerequisites
pip install confluent-kafka
Rafka gateway running on localhost:9092 (see Quickstart: Produce).
The core pattern — subscribe, consume, commit
from confluent_kafka import Consumer, Producer
from confluent_kafka.admin import AdminClient, NewTopic
# 1. Ensure the topic exists and has records.
# Topic + produce MUST precede subscribe — Metadata returns no partitions
# for unknown topics and the consumer hangs waiting for assignment.
admin = AdminClient({"bootstrap.servers": "localhost:9092"})
admin.create_topics([NewTopic("orders", num_partitions=1, replication_factor=1)])
producer = Producer({"bootstrap.servers": "localhost:9092"})
for i in range(10):
producer.produce("orders", value=f"order-{i}".encode())
producer.flush()
# 2. First consumer session — read 5 records and commit.
c1 = Consumer({
"bootstrap.servers": "localhost:9092",
"group.id": "order-processor",
"enable.auto.commit": False, # manual commit is the reliable pattern
"auto.offset.reset": "earliest", # start from offset 0 on first join
})
c1.subscribe(["orders"])
consumed = []
while len(consumed) < 5:
msg = c1.poll(timeout=2.0)
if msg and not msg.error():
consumed.append(msg.offset())
c1.commit(asynchronous=False) # commit offset 5 synchronously
print(f"Committed at offset 5; consumed: {consumed}") # [0, 1, 2, 3, 4]
c1.close()
# 3. Second consumer session — a NEW instance, same group.id.
# The join triggers OffsetFetch; the broker returns the committed offset 5.
c2 = Consumer({
"bootstrap.servers": "localhost:9092",
"group.id": "order-processor", # same group — resumes from committed offset
"enable.auto.commit": False,
"auto.offset.reset": "earliest", # fallback if no commit exists; 5 exists so unused
})
c2.subscribe(["orders"])
msg = None
while msg is None or msg.error():
msg = c2.poll(timeout=2.0)
print(f"Resumed at offset: {msg.offset()}") # → 5 (NOT 0, NOT 10)
c2.close()
The discriminating property: c2 is a new Consumer instance. librdkafka tracks fetch position client-side, so re-polling c1 would resume from its own cursor regardless of whether OffsetCommit/OffsetFetch work. Only a second instance forces a fresh JoinGroup → OffsetFetch roundtrip through the broker.
kcat snippet
# Subscribe to "orders" with group.id=order-processor; reads until EOF.
kcat -b localhost:9092 -G order-processor orders
kcat -G uses the consumer-group protocol (JoinGroup + SyncGroup + Heartbeat + OffsetFetch). The -G flag initiates a full group-protocol rebalance, not just an assign().
enable.auto.commit vs. manual commit
| Mode | How | When to use |
|---|---|---|
| Auto-commit | enable.auto.commit=True (default) | Simplest; offsets committed on a timer. May re-process records if the consumer crashes between poll and auto-commit. |
| Manual sync commit | enable.auto.commit=False + commit(asynchronous=False) | Recommended. Commit only after processing is durable. Synchronous — blocks until broker acknowledges the commit. |
| Manual async commit | enable.auto.commit=False + commit(asynchronous=True) | Fire-and-forget; check the delivery report callback for errors. |
Setup dependency — produce before subscribe
AdminClient.create_topics → Producer.produce → Consumer.subscribe
The consumer sends a Metadata request on subscribe. If the topic does not exist (or has no partitions), Metadata returns UNKNOWN_TOPIC_OR_PARTITION and the consumer spins in a metadata-retry loop. Always create the topic and produce at least one record before the consumer subscribes.
Error handling
| Error | Code | Cause |
|---|---|---|
| ILLEGAL_GENERATION (22) | ILLEGAL_GENERATION | Heartbeat or SyncGroup sent with a stale generation_id — group rebalanced while you were away. Rejoin. |
| UNKNOWN_MEMBER_ID (25) | UNKNOWN_MEMBER_ID | Heartbeat from a member_id the group coordinator does not recognise (e.g., after a long pause). Rejoin. |
| REBALANCE_IN_PROGRESS (27) | REBALANCE_IN_PROGRESS | SyncGroup sent before the rebalance window closed, or Heartbeat during a rebalance. Back off and retry. |
librdkafka handles all three internally and rebalances automatically. You will rarely see them surfaced in application code.