Skip to main content

Topic Registry and Entity Model

P2.A introduces a gateway-side in-memory topic registry plus the rafka-identity crate for the canonical id/slug/RRL entity model. This page is the engineering reference.

crates/rafka-identity

Provides the three identity forms for all rafka entities. Topic-specific for now; broker/authz crates will consume it in later phases.

TopicId

pub struct TopicId(String);
impl TopicId {
pub fn mint() -> Self; // "top_" + ulid::Ulid::new().to_lowercase()
pub fn as_str(&self) -> &str;
pub fn parse(s: &str) -> Option<Self>;
}

Format: top_<lowercase-ULID>. Length: 30 chars (4 prefix + 26 ULID). The top_ prefix brands the id so ids from different entity types are never accidentally interchangeable.

Slug

pub struct Slug(String);
impl Slug {
pub fn validate(s: &str) -> Result<Self, SlugError>;
pub fn sanitize(name: &str) -> Self;
pub fn as_str(&self) -> &str;
}

Invariants: lowercase ASCII alphanumeric and hyphens only; length 1–63; no leading/trailing hyphen; no consecutive hyphens.

sanitize(name):

  1. Lowercase.
  2. Replace any char outside [a-z0-9-] with -.
  3. Collapse consecutive hyphens.
  4. Trim leading/trailing hyphens.
  5. Clamp to 63 chars (trim any trailing hyphen left by clamp).

Example: My.Topicmy-topic. Hello Worldhello-world.

Rrl

pub struct Rrl(String);
impl Rrl {
pub fn topic(org: &str, env: &str, cluster: &str, topic_slug: &str) -> Self;
pub fn as_str(&self) -> &str;
}

Format: <org>/<env>/<cluster>/topics/<topic-slug>.

Example: rafka/prod/east/topics/my-topic.

RRLs contain slashes, not colons. They must never appear in URL path segments. Use ?rrl= query param or request body field.

OrgContext (POC)

pub fn default_org_context() -> OrgContext {
OrgContext { org_id: 1, org: "rafka", env: "prod", cluster: "east" }
}

Single-tenant POC constant. Phase 5 replaces this with real tenant store resolution.

Gateway topic registry (gateway/src/topic_registry.rs)

Process-global DashMap<(org_id: u64, name: String), TopicEntity>. Single gateway instance; gossip-sync to peers is a P5/multi-node concern.

TopicEntity

pub struct TopicEntity {
pub id: TopicId,
pub slug: Slug,
pub rrl: Rrl,
pub name: String, // raw Kafka name, verbatim
pub org_id: u64,
pub partition_count: i32,
pub replication_factor: i16,
pub configs: HashMap<String, String>,
}

Public API

FunctionError codesNotes
create(org_id, name, partitions, rf, configs)3, 17, 36, 37, 38, 40Validates + mints id/slug/RRL
delete(org_id, name)— (Option)Returns the removed entity or None
lookup(org_id, name)— (Option)Clone of the entry
list(org_id)All topics for the org
ensure_auto(org_id, name)Register-if-absent (Metadata flag=true path); 1 partition / rf=1
set_configs(org_id, name, configs)3Full-replace

Span emit sites

Span nameLocationAttributes
rafka.gateway.topic.create.via-wireregistry::create (#[instrument])topic, topic_id, partition_count
rafka.gateway.topic.create.from-metadataregistry::ensure_auto (#[instrument])topic, topic_id

Config validation

validate_create_configs and validate_alter_configs are ported from v1 topic_config_validation.rs.

validate_alter_configs rules:

  1. rafka.failover.enabled is immutable via AlterConfigs (I2 rule, ported from v1). Any change from the current value — including the initial false→true — returns error 40. The flag can only be set at CreateTopics time via the topic's config map.
  2. cleanup.policy containing compact while rafka.failover.enabled=true (as recorded in the registry) returns error 40 (Gotcha 5: compact bitmap breaks Op 36 offset translation).

KAFKA_OP_DELETE_TOPIC = 0x05 (mesh-ops)

New mesh operation dispatched by the gateway's DeleteTopics handler to purge WAL state on the broker.

pub const KAFKA_OP_DELETE_TOPIC: u8 = 0x05;

pub struct DeleteTopicOp {
pub topic: String,
pub partitions: i32, // org_id rides the KafkaOp carrier
}
pub struct DeleteTopicOpResp { pub error_code: i16 }

Postcard-encoded, same pattern as ProduceOp/FetchOp/ListOffsetsOp.

Broker handler (broker/src/produce.rs::handle_delete_topic): for each partition in 0..partitions, builds make_vt_key(org_id, topic, p) and calls wal.delete_topic(&vt_key). Span: rafka.broker.topic.delete.

Gateway wire handlers

Admin handler span emit sites

Span nameHandlerNotes
rafka.gateway.topic.create.via-wirehandle_create_topics_request (via registry::create)api_key 19
rafka.gateway.topic.delete.via-wirehandle_delete_topics_requestapi_key 20; #[instrument] on the handler
rafka.gateway.topic.describe-configshandle_describe_configs_requestapi_key 32; #[instrument] on the handler
rafka.gateway.topic.alter-configshandle_alter_configs_requestapi_key 33; #[instrument] on the handler
rafka.gateway.kafka.metadatahandle_metadata_requestapi_key 3; rewritten for P2.A; auto_create attribute added
rafka.broker.topic.deletebroker handle_delete_topicapi_key 20 broker-side; #[instrument]

Defect-A: Metadata allow_auto_topic_creation flag

The MetadataRequest carries allow_auto_topic_creation: bool (present at API version 4+). P1 ignored this flag (every topic request auto-created). P2.A decodes it and dispatches:

topic registered → return registry entry (partition_count from registry)
topic absent + flag=true → ensure_auto (1 partition); return entry error=0
topic absent + flag=false → error_code=3 UNKNOWN_TOPIC_OR_PARTITION
wildcard (topics=None) → list all registered org topics

allow_auto_topic_creation defaults to true when the decoded request is absent (pre-v4 clients, which don't send the flag, get producer-like behavior).

This fix is P1-regression safe: librdkafka producers send flag=true, so all existing P1 tests + the run.ps1 conformance test remain green untouched.

Version caps (below flexible-encoding threshold)

APICapFlexible at
CreateTopics (19)v4v5
DeleteTopics (20)v3v4
DescribeConfigs (32)v3v4
AlterConfigs (33)v1v2