Dead Letter Queues in Kafka: Where Rejected Records Go and How You Get Them Back

Every pipeline that reads from Kafka eventually encounters a message it can't process. Wrong schema, missing required field, value that fails a business rule, upstream system sent malformed JSON at 2am on a Tuesday. What you do with that message determines whether your pipeline is robust or just fast.

The naive approach is to let the consumer crash on bad data, fix the code, and replay from the last committed offset. That works once. By the third time you've done it in production, you want a better model.

What a Dead Letter Queue Is

A dead letter queue (DLQ) is a separate Kafka topic where messages go when the consumer can't process them. Instead of crashing or silently dropping the message, the consumer catches the error, writes the original message (plus error metadata) to the DLQ topic, and continues consuming from the main topic.

This gives you three things: pipeline continuity (one bad message doesn't stop the flow), an audit trail (every rejected message is preserved with the reason it was rejected), and a recovery path (you can fix the bad messages and replay them from the DLQ once the issue is resolved).

Implementing a DLQ in Python

from confluent_kafka import Consumer, Producer
import json
from datetime import datetime

def process_message(payload: dict) -> None:
    # Business logic here — raises if something is wrong
    if 'order_id' not in payload:
        raise ValueError("Missing required field: order_id")
    if payload.get('amount', 0) < 0:
        raise ValueError(f"Negative amount: {payload['amount']}")
    # ... actual processing

def send_to_dlq(
    producer: Producer,
    dlq_topic: str,
    original_message: bytes,
    original_key: bytes,
    error: Exception,
    source_topic: str
) -> None:
    dlq_payload = {
        'original_message': original_message.decode('utf-8', errors='replace'),
        'original_key': original_key.decode('utf-8', errors='replace') if original_key else None,
        'error_type': type(error).__name__,
        'error_message': str(error),
        'source_topic': source_topic,
        'failed_at': datetime.utcnow().isoformat()
    }
    producer.produce(
        dlq_topic,
        key=original_key,
        value=json.dumps(dlq_payload).encode('utf-8')
    )
    producer.flush()

consumer = Consumer({
    'bootstrap.servers': 'kafka-broker:9092',
    'group.id': 'orders-processor',
    'auto.offset.reset': 'earliest'
})
producer = Producer({'bootstrap.servers': 'kafka-broker:9092'})

consumer.subscribe(['orders.raw'])

while True:
    msg = consumer.poll(timeout=1.0)
    if msg is None:
        continue
    try:
        payload = json.loads(msg.value().decode('utf-8'))
        process_message(payload)
    except Exception as e:
        send_to_dlq(
            producer,
            dlq_topic='orders.raw.dlq',
            original_message=msg.value(),
            original_key=msg.key() or b'',
            error=e,
            source_topic='orders.raw'
        )

Replaying from the DLQ

Once you've fixed the root cause — corrected the schema, patched the business logic, cleaned the upstream data — you replay the DLQ messages back to the original topic. The simplest replay is just a consumer-to-producer bridge that reads each DLQ record, extracts the original_message field, and publishes it back to orders.raw:

def replay_dlq(dlq_topic: str, target_topic: str) -> int:
    replayed = 0
    # Read DLQ from the beginning
    replay_consumer = Consumer({
        'bootstrap.servers': 'kafka-broker:9092',
        'group.id': f'dlq-replay-{target_topic}',
        'auto.offset.reset': 'earliest'
    })
    replay_consumer.subscribe([dlq_topic])
    while True:
        msg = replay_consumer.poll(timeout=5.0)
        if msg is None:
            break
        dlq_record = json.loads(msg.value().decode('utf-8'))
        producer.produce(
            target_topic,
            key=dlq_record.get('original_key', '').encode('utf-8'),
            value=dlq_record['original_message'].encode('utf-8')
        )
        replayed += 1
    producer.flush()
    replay_consumer.close()
    return replayed

The Naming Convention

Use a consistent naming pattern for DLQ topics: {original_topic}.dlq. When you have twenty Kafka topics, you want it to be obvious which DLQ belongs to which source. Aligning the DLQ name to the source topic name makes filtering and monitoring straightforward — one Kafka consumer group watching *.dlq can alert on any pipeline that's producing dead letters. As always, I'm here to help.

Read more