Kafka as a Messenger Service: Using Topics for In-Flight Data Coordination
The data ingest pipeline was reliable until something upstream changed. A source system would update a row, the pipeline would pick it up on the next run, and whatever was downstream would see the change — but only after the pipeline completed and the Delta table got refreshed. That was fine for daily batch workloads. It was not fine for the risk analysis pipeline that needed to know about a status change within minutes, not hours.
The fix wasn't to make the batch pipeline faster. It was to use Kafka as a coordination layer — a messenger service for data that was still in flight.
Kafka as a Coordination Bus, Not Just a Source
Most of the Kafka content I've seen treats it as an event source — something that generates events that your pipeline consumes. That's one use. The pattern that proved more useful for this engagement was using Kafka as a signaling mechanism between pipeline stages: one stage publishes a message when it finishes a unit of work, and the next stage listens for that message before proceeding.
This is different from a polling model (the next stage queries a database or file system to see if the previous stage finished) and different from a direct API call (which creates coupling between stages). The Kafka message is fire-and-forget from the producer's perspective; the consumer decides when to act on it.
Publishing Pipeline Status Messages
from confluent_kafka import Producer
import json
from datetime import datetime
producer_config = {
'bootstrap.servers': 'kafka-broker:9092',
'client.id': 'ingest-pipeline'
}
def publish_stage_complete(
producer: Producer,
topic: str,
stage_name: str,
source_name: str,
records_processed: int
) -> None:
message = {
'event': 'STAGE_COMPLETE',
'stage': stage_name,
'source': source_name,
'records': records_processed,
'timestamp': datetime.utcnow().isoformat(),
'status': 'SUCCESS'
}
producer.produce(
topic,
key=source_name.encode('utf-8'),
value=json.dumps(message).encode('utf-8')
)
producer.flush()
producer = Producer(producer_config)
publish_stage_complete(
producer,
topic='pipeline.stage.events',
stage_name='bronze_ingest',
source_name='OrdersDB',
records_processed=14823
)Consuming Stage Events in a Downstream Process
from confluent_kafka import Consumer
import json
consumer_config = {
'bootstrap.servers': 'kafka-broker:9092',
'group.id': 'risk-analysis-trigger',
'auto.offset.reset': 'latest'
}
def wait_for_stage(consumer: Consumer, topic: str, expected_source: str) -> dict:
consumer.subscribe([topic])
try:
while True:
msg = consumer.poll(timeout=5.0)
if msg is None:
continue
if msg.error():
raise RuntimeError(f"Consumer error: {msg.error()}")
event = json.loads(msg.value().decode('utf-8'))
if (event.get('source') == expected_source
and event.get('event') == 'STAGE_COMPLETE'
and event.get('status') == 'SUCCESS'):
return event
finally:
consumer.close()
consumer = Consumer(consumer_config)
event = wait_for_stage(consumer, 'pipeline.stage.events', 'OrdersDB')
print(f"Bronze ingest done. {event['records']} records. Starting risk analysis.")
Why Not Just Use a Database Table as a Coordination Flag?
You can. I've seen it done with a pipeline_status table that downstream processes poll every N seconds. It works. The failure mode is that polling at high frequency puts load on the database, and polling at low frequency adds latency. Kafka decouples the timing — the consumer wakes up exactly when the producer sends the message, not on a polling schedule.
The other advantage: Kafka's consumer group model means you can have multiple downstream processes all listening for the same event without any coordination between them. The risk analysis job and the audit log job both listen for the bronze completion event — neither needs to know the other exists. As always, I'm here to help.