Kafka Schema Registry: Enforcing Structure on Your Event Streams Before They Hit Delta
Your Kafka producer is sending orders. Your consumer is writing them to Delta Lake. Everything works in the dev environment where all the producers are your own Python scripts. Then someone onboards a new upstream service, it starts sending messages with a field renamed, a type changed, a required field dropped — and your consumer crashes at 3am because it tried to parse a string as a decimal.
Kafka Schema Registry exists to prevent this. It's a type system for your event streams.
What Schema Registry Does
Schema Registry is a service that stores Avro (or JSON Schema, or Protobuf) schemas for your Kafka topics. Every message produced to a topic is validated against the registered schema before it's written. Every consumer retrieves the schema and uses it to deserialize messages correctly. If a producer tries to send a message that violates the schema, it fails at produce time — not at consume time.
This moves schema violations from a runtime crash in your pipeline to a produce-time failure in the upstream service. That's a dramatically better place to catch them.
Registering a Schema
import requests
import json
SCHEMA_REGISTRY_URL = "http://schema-registry:8081"
order_schema = {
"type": "record",
"name": "Order",
"namespace": "com.company.orders",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "customer_id", "type": "string"},
{"name": "order_amount", "type": {"type": "bytes", "logicalType": "decimal", "precision": 18, "scale": 2}},
{"name": "order_date", "type": {"type": "int", "logicalType": "date"}},
{"name": "region_code", "type": "string"},
{"name": "status", "type": {"type": "enum", "name": "OrderStatus",
"symbols": ["PENDING", "CONFIRMED", "SHIPPED", "CANCELLED"]}}
]
}
resp = requests.post(
f"{SCHEMA_REGISTRY_URL}/subjects/orders.raw-value/versions",
headers={"Content-Type": "application/vnd.schemaregistry.v1+json"},
json={"schema": json.dumps(order_schema)}
)
resp.raise_for_status()
print(f"Registered schema ID: {resp.json()['id']}")
Producing with Schema Validation
from confluent_kafka.avro import AvroProducer
from confluent_kafka import avro
order_schema_str = json.dumps(order_schema)
loaded_schema = avro.loads(order_schema_str)
producer = AvroProducer(
{
'bootstrap.servers': 'kafka-broker:9092',
'schema.registry.url': SCHEMA_REGISTRY_URL
},
default_value_schema=loaded_schema
)
order_record = {
'order_id': 'ORD-00042',
'customer_id': 'CUST-9981',
'order_amount': 14999, # stored as decimal bytes
'order_date': 18900, # days since epoch
'region_code': 'WEST',
'status': 'PENDING'
}
producer.produce(topic='orders.raw', value=order_record)
producer.flush()
Schema Evolution Rules
Schema Registry enforces compatibility rules. The default is BACKWARD compatibility: new versions of the schema must be readable by consumers using the previous schema version. This means you can:
- Add optional fields (with defaults) — safe
- Remove fields — safe under BACKWARD, not under FORWARD
- Rename fields — not safe under any backward/forward compatibility mode
- Change field types — not safe
# Check current compatibility setting for a subject
resp = requests.get(f"{SCHEMA_REGISTRY_URL}/config/orders.raw-value")
print(resp.json()) # {'compatibilityLevel': 'BACKWARD'}
# Add an optional field safely
order_schema_v2 = {
"type": "record",
"name": "Order",
"namespace": "com.company.orders",
"fields": [
# ... existing fields ...
{
"name": "promo_code",
"type": ["null", "string"], # union with null = optional
"default": None
}
]
}
What to Do When You Can't Use Schema Registry
Sometimes you're consuming from a Kafka topic you don't control and the producer isn't using Schema Registry. In that case, defensive deserialization — try/except around JSON parsing, explicit type coercion, validation before writing to Delta — is your fallback. It's not as clean, but it's how you handle third-party producers you can't change. The DLQ pattern from the previous post handles the messages that fail. As always, I'm here to help.