Kafka as a Messenger Service: Using Topics for In-Flight Data Coordination

The data ingest pipeline was reliable until something upstream changed. A source system would update a row, the pipeline would pick it up on the next run, and whatever was downstream would see the change — but only after the pipeline completed and the Delta table got refreshed. That was fine for daily batch workloads. It was not fine for the risk analysis pipeline that needed to know about a status change within minutes, not hours.

The fix wasn't to make the batch pipeline faster. It was to use Kafka as a coordination layer — a messenger service for data that was still in flight.

Kafka as a Coordination Bus, Not Just a Source

Most of the Kafka content I've seen treats it as an event source — something that generates events that your pipeline consumes. That's one use. The pattern that proved more useful for this engagement was using Kafka as a signaling mechanism between pipeline stages: one stage publishes a message when it finishes a unit of work, and the next stage listens for that message before proceeding.

This is different from a polling model (the next stage queries a database or file system to see if the previous stage finished) and different from a direct API call (which creates coupling between stages). The Kafka message is fire-and-forget from the producer's perspective; the consumer decides when to act on it.

Publishing Pipeline Status Messages

from confluent_kafka import Producer
import json
from datetime import datetime

producer_config = {
    'bootstrap.servers': 'kafka-broker:9092',
    'client.id': 'ingest-pipeline'
}

def publish_stage_complete(
    producer: Producer,
    topic: str,
    stage_name: str,
    source_name: str,
    records_processed: int
) -> None:
    message = {
        'event': 'STAGE_COMPLETE',
        'stage': stage_name,
        'source': source_name,
        'records': records_processed,
        'timestamp': datetime.utcnow().isoformat(),
        'status': 'SUCCESS'
    }
    producer.produce(
        topic,
        key=source_name.encode('utf-8'),
        value=json.dumps(message).encode('utf-8')
    )
    producer.flush()

producer = Producer(producer_config)
publish_stage_complete(
    producer,
    topic='pipeline.stage.events',
    stage_name='bronze_ingest',
    source_name='OrdersDB',
    records_processed=14823
)

Consuming Stage Events in a Downstream Process

from confluent_kafka import Consumer
import json

consumer_config = {
    'bootstrap.servers': 'kafka-broker:9092',
    'group.id': 'risk-analysis-trigger',
    'auto.offset.reset': 'latest'
}

def wait_for_stage(consumer: Consumer, topic: str, expected_source: str) -> dict:
    consumer.subscribe([topic])
    try:
        while True:
            msg = consumer.poll(timeout=5.0)
            if msg is None:
                continue
            if msg.error():
                raise RuntimeError(f"Consumer error: {msg.error()}")
            event = json.loads(msg.value().decode('utf-8'))
            if (event.get('source') == expected_source
                    and event.get('event') == 'STAGE_COMPLETE'
                    and event.get('status') == 'SUCCESS'):
                return event
    finally:
        consumer.close()

consumer = Consumer(consumer_config)
event = wait_for_stage(consumer, 'pipeline.stage.events', 'OrdersDB')
print(f"Bronze ingest done. {event['records']} records. Starting risk analysis.")

Why Not Just Use a Database Table as a Coordination Flag?

You can. I've seen it done with a pipeline_status table that downstream processes poll every N seconds. It works. The failure mode is that polling at high frequency puts load on the database, and polling at low frequency adds latency. Kafka decouples the timing — the consumer wakes up exactly when the producer sends the message, not on a polling schedule.

The other advantage: Kafka's consumer group model means you can have multiple downstream processes all listening for the same event without any coordination between them. The risk analysis job and the audit log job both listen for the bronze completion event — neither needs to know the other exists. As always, I'm here to help.

Read more