Topic Registry and Entity Model
P2.A introduces a gateway-side in-memory topic registry plus the rafka-identity crate for the canonical id/slug/RRL entity model. This page is the engineering reference.
crates/rafka-identity
Provides the three identity forms for all rafka entities. Topic-specific for now; broker/authz crates will consume it in later phases.
TopicId
pub struct TopicId(String);
impl TopicId {
pub fn mint() -> Self; // "top_" + ulid::Ulid::new().to_lowercase()
pub fn as_str(&self) -> &str;
pub fn parse(s: &str) -> Option<Self>;
}
Format: top_<lowercase-ULID>. Length: 30 chars (4 prefix + 26 ULID). The top_ prefix brands the id so ids from different entity types are never accidentally interchangeable.
Slug
pub struct Slug(String);
impl Slug {
pub fn validate(s: &str) -> Result<Self, SlugError>;
pub fn sanitize(name: &str) -> Self;
pub fn as_str(&self) -> &str;
}
Invariants: lowercase ASCII alphanumeric and hyphens only; length 1–63; no leading/trailing hyphen; no consecutive hyphens.
sanitize(name):
- Lowercase.
- Replace any char outside
[a-z0-9-]with-. - Collapse consecutive hyphens.
- Trim leading/trailing hyphens.
- Clamp to 63 chars (trim any trailing hyphen left by clamp).
Example: My.Topic → my-topic. Hello World → hello-world.
Rrl
pub struct Rrl(String);
impl Rrl {
pub fn topic(org: &str, env: &str, cluster: &str, topic_slug: &str) -> Self;
pub fn as_str(&self) -> &str;
}
Format: <org>/<env>/<cluster>/topics/<topic-slug>.
Example: rafka/prod/east/topics/my-topic.
RRLs contain slashes, not colons. They must never appear in URL path segments. Use
?rrl=query param or request body field.
OrgContext (POC)
pub fn default_org_context() -> OrgContext {
OrgContext { org_id: 1, org: "rafka", env: "prod", cluster: "east" }
}
Single-tenant POC constant. Phase 5 replaces this with real tenant store resolution.
Gateway topic registry (gateway/src/topic_registry.rs)
Process-global DashMap<(org_id: u64, name: String), TopicEntity>. Single gateway instance; gossip-sync to peers is a P5/multi-node concern.
TopicEntity
pub struct TopicEntity {
pub id: TopicId,
pub slug: Slug,
pub rrl: Rrl,
pub name: String, // raw Kafka name, verbatim
pub org_id: u64,
pub partition_count: i32,
pub replication_factor: i16,
pub configs: HashMap<String, String>,
}
Public API
| Function | Error codes | Notes |
|---|---|---|
create(org_id, name, partitions, rf, configs) | 3, 17, 36, 37, 38, 40 | Validates + mints id/slug/RRL |
delete(org_id, name) | — (Option) | Returns the removed entity or None |
lookup(org_id, name) | — (Option) | Clone of the entry |
list(org_id) | — | All topics for the org |
ensure_auto(org_id, name) | — | Register-if-absent (Metadata flag=true path); 1 partition / rf=1 |
set_configs(org_id, name, configs) | 3 | Full-replace |
Span emit sites
| Span name | Location | Attributes |
|---|---|---|
rafka.gateway.topic.create.via-wire | registry::create (#[instrument]) | topic, topic_id, partition_count |
rafka.gateway.topic.create.from-metadata | registry::ensure_auto (#[instrument]) | topic, topic_id |
Config validation
validate_create_configs and validate_alter_configs are ported from v1 topic_config_validation.rs.
validate_alter_configs rules:
rafka.failover.enabledis immutable via AlterConfigs (I2 rule, ported from v1). Any change from the current value — including the initialfalse→true— returns error 40. The flag can only be set at CreateTopics time via the topic's config map.cleanup.policycontainingcompactwhilerafka.failover.enabled=true(as recorded in the registry) returns error 40 (Gotcha 5: compact bitmap breaks Op 36 offset translation).
KAFKA_OP_DELETE_TOPIC = 0x05 (mesh-ops)
New mesh operation dispatched by the gateway's DeleteTopics handler to purge WAL state on the broker.
pub const KAFKA_OP_DELETE_TOPIC: u8 = 0x05;
pub struct DeleteTopicOp {
pub topic: String,
pub partitions: i32, // org_id rides the KafkaOp carrier
}
pub struct DeleteTopicOpResp { pub error_code: i16 }
Postcard-encoded, same pattern as ProduceOp/FetchOp/ListOffsetsOp.
Broker handler (broker/src/produce.rs::handle_delete_topic): for each partition in 0..partitions, builds make_vt_key(org_id, topic, p) and calls wal.delete_topic(&vt_key). Span: rafka.broker.topic.delete.
Gateway wire handlers
Admin handler span emit sites
| Span name | Handler | Notes |
|---|---|---|
rafka.gateway.topic.create.via-wire | handle_create_topics_request (via registry::create) | api_key 19 |
rafka.gateway.topic.delete.via-wire | handle_delete_topics_request | api_key 20; #[instrument] on the handler |
rafka.gateway.topic.describe-configs | handle_describe_configs_request | api_key 32; #[instrument] on the handler |
rafka.gateway.topic.alter-configs | handle_alter_configs_request | api_key 33; #[instrument] on the handler |
rafka.gateway.kafka.metadata | handle_metadata_request | api_key 3; rewritten for P2.A; auto_create attribute added |
rafka.broker.topic.delete | broker handle_delete_topic | api_key 20 broker-side; #[instrument] |
Defect-A: Metadata allow_auto_topic_creation flag
The MetadataRequest carries allow_auto_topic_creation: bool (present at API version 4+). P1 ignored this flag (every topic request auto-created). P2.A decodes it and dispatches:
topic registered → return registry entry (partition_count from registry)
topic absent + flag=true → ensure_auto (1 partition); return entry error=0
topic absent + flag=false → error_code=3 UNKNOWN_TOPIC_OR_PARTITION
wildcard (topics=None) → list all registered org topics
allow_auto_topic_creation defaults to true when the decoded request is absent (pre-v4 clients, which don't send the flag, get producer-like behavior).
This fix is P1-regression safe: librdkafka producers send flag=true, so all existing P1 tests + the run.ps1 conformance test remain green untouched.
Version caps (below flexible-encoding threshold)
| API | Cap | Flexible at |
|---|---|---|
| CreateTopics (19) | v4 | v5 |
| DeleteTopics (20) | v3 | v4 |
| DescribeConfigs (32) | v3 | v4 |
| AlterConfigs (33) | v1 | v2 |