Skip to main content

Consuming with Consumer Groups

Rafka P3.A ships the full single-member consumer-group protocol (JoinGroup, SyncGroup, Heartbeat, LeaveGroup, OffsetCommit, OffsetFetch, FindCoordinator). Standard clients — confluent-kafka-python, librdkafka, kcat -G — connect with no special configuration.

Prerequisites

pip install confluent-kafka

Rafka gateway running on localhost:9092 (see Quickstart: Produce).

The core pattern — subscribe, consume, commit

from confluent_kafka import Consumer, Producer
from confluent_kafka.admin import AdminClient, NewTopic

# 1. Ensure the topic exists and has records.
# Topic + produce MUST precede subscribe — Metadata returns no partitions
# for unknown topics and the consumer hangs waiting for assignment.
admin = AdminClient({"bootstrap.servers": "localhost:9092"})
admin.create_topics([NewTopic("orders", num_partitions=1, replication_factor=1)])

producer = Producer({"bootstrap.servers": "localhost:9092"})
for i in range(10):
producer.produce("orders", value=f"order-{i}".encode())
producer.flush()

# 2. First consumer session — read 5 records and commit.
c1 = Consumer({
"bootstrap.servers": "localhost:9092",
"group.id": "order-processor",
"enable.auto.commit": False, # manual commit is the reliable pattern
"auto.offset.reset": "earliest", # start from offset 0 on first join
})
c1.subscribe(["orders"])

consumed = []
while len(consumed) < 5:
msg = c1.poll(timeout=2.0)
if msg and not msg.error():
consumed.append(msg.offset())

c1.commit(asynchronous=False) # commit offset 5 synchronously
print(f"Committed at offset 5; consumed: {consumed}") # [0, 1, 2, 3, 4]
c1.close()

# 3. Second consumer session — a NEW instance, same group.id.
# The join triggers OffsetFetch; the broker returns the committed offset 5.
c2 = Consumer({
"bootstrap.servers": "localhost:9092",
"group.id": "order-processor", # same group — resumes from committed offset
"enable.auto.commit": False,
"auto.offset.reset": "earliest", # fallback if no commit exists; 5 exists so unused
})
c2.subscribe(["orders"])

msg = None
while msg is None or msg.error():
msg = c2.poll(timeout=2.0)

print(f"Resumed at offset: {msg.offset()}") # → 5 (NOT 0, NOT 10)
c2.close()

The discriminating property: c2 is a new Consumer instance. librdkafka tracks fetch position client-side, so re-polling c1 would resume from its own cursor regardless of whether OffsetCommit/OffsetFetch work. Only a second instance forces a fresh JoinGroup → OffsetFetch roundtrip through the broker.

kcat snippet

# Subscribe to "orders" with group.id=order-processor; reads until EOF.
kcat -b localhost:9092 -G order-processor orders

kcat -G uses the consumer-group protocol (JoinGroup + SyncGroup + Heartbeat + OffsetFetch). The -G flag initiates a full group-protocol rebalance, not just an assign().

enable.auto.commit vs. manual commit

ModeHowWhen to use
Auto-commitenable.auto.commit=True (default)Simplest; offsets committed on a timer. May re-process records if the consumer crashes between poll and auto-commit.
Manual sync commitenable.auto.commit=False + commit(asynchronous=False)Recommended. Commit only after processing is durable. Synchronous — blocks until broker acknowledges the commit.
Manual async commitenable.auto.commit=False + commit(asynchronous=True)Fire-and-forget; check the delivery report callback for errors.

Setup dependency — produce before subscribe

AdminClient.create_topics → Producer.produce → Consumer.subscribe

The consumer sends a Metadata request on subscribe. If the topic does not exist (or has no partitions), Metadata returns UNKNOWN_TOPIC_OR_PARTITION and the consumer spins in a metadata-retry loop. Always create the topic and produce at least one record before the consumer subscribes.

Error handling

ErrorCodeCause
ILLEGAL_GENERATION (22)ILLEGAL_GENERATIONHeartbeat or SyncGroup sent with a stale generation_id — group rebalanced while you were away. Rejoin.
UNKNOWN_MEMBER_ID (25)UNKNOWN_MEMBER_IDHeartbeat from a member_id the group coordinator does not recognise (e.g., after a long pause). Rejoin.
REBALANCE_IN_PROGRESS (27)REBALANCE_IN_PROGRESSSyncGroup sent before the rebalance window closed, or Heartbeat during a rebalance. Back off and retry.

librdkafka handles all three internally and rebalances automatically. You will rarely see them surfaced in application code.