Search…

Design a distributed message queue

In this series (18 parts)
  1. Design a URL shortener
  2. Design a key-value store
  3. Design a rate limiter
  4. Design a web crawler
  5. Design a notification system
  6. Design a news feed
  7. Design a chat application
  8. Design a video streaming platform
  9. Design a music streaming service
  10. Design a ride-sharing service
  11. Design a food delivery platform
  12. Design a hotel booking platform
  13. Design a search engine
  14. Design a distributed message queue
  15. Design a code deployment system
  16. Design a payments platform
  17. Design an ad click aggregation system
  18. Design a distributed cache

A distributed message queue sits between every service that produces data and every service that consumes it. Without one, a spike in traffic at the producer cascades straight into the consumer and everything downstream falls over. The queue absorbs that shock, lets consumers process at their own pace, and gives you a durable log you can replay when things go wrong.

If you are new to the topic, read up on message queues and database replication first. Both concepts show up repeatedly in this design.

1. Requirements

Functional

  • Producers publish messages to named topics.
  • Each topic is split into ordered partitions.
  • Consumers read messages by offset within a partition.
  • Consumer groups allow parallel consumption: each partition is assigned to exactly one consumer in a group.
  • Messages are retained for a configurable period (default 7 days).
  • Support at-least-once delivery out of the box and exactly-once semantics as an opt-in mode.

Non-functional

  • Throughput: 2 million messages per second across all topics at peak.
  • Latency: p99 publish latency under 10 ms.
  • Durability: no acknowledged message is ever lost, even if a broker dies.
  • Availability: the system stays writable during single-node failures.
  • DAU: 50,000 producer applications, 200,000 consumer instances.
  • Message size: average 1 KB, max 1 MB.

2. Capacity estimation

MetricCalculationResult
Peak QPS (writes)2M msg/s2,000,000
Daily message volume2M * 86,400 * 0.4 avg utilization~69 billion
Daily storage (raw)69B * 1 KB~69 TB
7-day retention69 TB * 7~483 TB
Replication factor 3483 TB * 3~1.45 PB
Network ingress (peak)2M * 1 KB~2 GB/s
Network egress (peak, 3 consumer groups avg)2 GB/s * 3~6 GB/s

Storage dominates cost. Tiered storage, where old segments move to object storage like S3, cuts this by 60-80% in practice.

With a replication factor of 3, the cluster needs at least 30 brokers to keep per-broker storage under 50 TB (a reasonable limit for NVMe drives). Each broker handles about 67K writes/s at peak. Network cards should be at least 25 Gbps to handle the combined ingress and egress with headroom for replication traffic between brokers.

3. High-level architecture

graph TD
  P1[Producer A] -->|publish| LB[Load Balancer]
  P2[Producer B] -->|publish| LB
  P3[Producer C] -->|publish| LB
  LB --> B1[Broker 1<br/>Partitions 0,3]
  LB --> B2[Broker 2<br/>Partitions 1,4]
  LB --> B3[Broker 3<br/>Partitions 2,5]
  B1 --> B2
  B2 --> B3
  B3 --> B1
  B1 --> C1[Consumer Group A]
  B2 --> C2[Consumer Group B]
  B3 --> C1
  ZK[Coordination Service<br/>Metadata + Leader Election] --> B1
  ZK --> B2
  ZK --> B3

High-level architecture: producers publish through a load balancer to brokers that own topic partitions. A coordination service manages metadata and leader election. Consumer groups pull messages from assigned partitions.

The core components:

  • Producers serialize messages, pick a partition (round-robin, key-hash, or custom), and send a batch to the partition leader.
  • Brokers store messages in an append-only log on disk. Each partition has one leader and N-1 replicas.
  • Coordination service tracks broker liveness, partition assignments, and consumer group membership. Think ZooKeeper or a Raft-based controller.
  • Consumers pull batches from brokers, process them, and commit offsets.

Data flows through the system in a single direction: producer to broker to consumer. There is no broker-to-producer feedback loop. This simplicity is deliberate. It keeps the broker stateless with respect to consumers and lets you add consumer groups without touching the write path.

The broker stores messages in segment files on disk. Each segment is an immutable, append-only file. When a segment reaches its size limit (1 GB default), the broker rolls to a new segment. An index file maps offsets to byte positions within the segment, enabling O(1) lookups by offset.

4. Deep dives

4.1 Partitioning and consumer groups

Partitioning is how you scale horizontally. A topic with 64 partitions can spread across 64 brokers and be consumed by up to 64 consumers in a single group. The partition count is your parallelism ceiling.

Choosing the right partition count matters more than most teams realize. Too few partitions and you cannot saturate your consumer fleet. Too many and you pay in broker memory overhead (each partition needs buffer space), longer rebalance times, and more open file descriptors. A good starting point: set partitions to 2x the expected number of consumers, with a minimum of 6.

Partition assignment uses consistent hashing on the message key. Messages with the same key always land in the same partition, which guarantees ordering for that key. Messages without a key get round-robin assignment.

graph LR
  subgraph Topic: order-events
      P0[Partition 0]
      P1[Partition 1]
      P2[Partition 2]
      P3[Partition 3]
  end
  subgraph Consumer Group X
      CX1[Consumer 1]
      CX2[Consumer 2]
  end
  subgraph Consumer Group Y
      CY1[Consumer 1]
      CY2[Consumer 2]
      CY3[Consumer 3]
      CY4[Consumer 4]
  end
  P0 --> CX1
  P1 --> CX1
  P2 --> CX2
  P3 --> CX2
  P0 --> CY1
  P1 --> CY2
  P2 --> CY3
  P3 --> CY4

Partition-to-consumer mapping: Group X has 2 consumers, each handling 2 partitions. Group Y has 4 consumers with 1:1 assignment. Adding a 5th consumer to Group Y would leave it idle.

Key rules for consumer groups:

  1. Each partition is assigned to exactly one consumer within a group.
  2. A consumer can own multiple partitions.
  3. If consumers outnumber partitions, the extras sit idle.
  4. When a consumer joins or leaves, a rebalance redistributes partitions.

Rebalancing is the pain point. A naive “stop the world” rebalance pauses all consumers in the group. Cooperative rebalancing revokes only the partitions that need to move, keeping most consumers active during the transition.

The rebalance protocol works in three phases:

  1. Group coordinator detects change. A consumer sends a heartbeat timeout or a new consumer sends a JoinGroup request.
  2. Partition assignment. The group leader (one of the consumers) runs the assignment strategy and sends the mapping back to the coordinator.
  3. Distribution. The coordinator pushes assignments to all consumers. Each consumer starts fetching from its newly assigned partitions.

With cooperative rebalancing, steps 2 and 3 happen incrementally. Only affected partitions pause. The rest continue processing without interruption. This reduces rebalance time from seconds to milliseconds for large groups.

4.2 Producer-consumer flow and offset management

Every message gets an offset: a monotonically increasing 64-bit integer within its partition. Consumers track their position by committing offsets. If a consumer crashes and restarts, it resumes from the last committed offset.

There are two commit strategies. Auto-commit saves the offset on a timer (every 5 seconds by default). This is simple but risks reprocessing: if the consumer crashes between processing a message and the next auto-commit, it will read that message again on restart. Manual commit lets the consumer commit after processing, giving precise control. Most production systems use manual commit with at-least-once processing and make the consumer logic idempotent.

sequenceDiagram
  participant P as Producer
  participant L as Partition Leader
  participant R as Replica
  participant C as Consumer
  participant CS as Coord Service

  P->>L: Produce batch (messages 100-109)
  L->>L: Append to local log
  L->>R: Replicate batch
  R-->>L: ACK replication
  L-->>P: ACK (offsets 100-109)

  C->>CS: Fetch partition assignment
  CS-->>C: Assigned partition 0

  C->>L: Fetch from offset 100
  L-->>C: Return messages 100-109
  C->>C: Process messages
  C->>CS: Commit offset 109
  CS-->>C: Offset committed

Producer-consumer lifecycle: the producer sends a batch, the leader replicates it, then acknowledges. The consumer fetches, processes, and commits its offset back to the coordination service.

Three acknowledgment modes control the durability-latency tradeoff:

ModeBehaviorLatencyDurability
acks=0Fire and forgetLowestMessages can be lost
acks=1Leader writes to local logLowLost if leader dies before replication
acks=allAll in-sync replicas writeHigherNo data loss

For exactly-once semantics, the producer uses an idempotency key (producer ID + sequence number). The broker deduplicates based on this pair. Combined with transactional writes that atomically publish to multiple partitions and commit consumer offsets, you get end-to-end exactly-once processing. The cost is roughly 10-20% extra latency.

4.3 Retention and log compaction

Messages live on disk in segment files, typically 1 GB each. Two retention strategies exist:

graph LR
  subgraph Partition Log
      S1[Segment 0<br/>Offsets 0-999]
      S2[Segment 1<br/>Offsets 1000-1999]
      S3[Segment 2<br/>Offsets 2000-2999]
      S4[Active Segment<br/>Offsets 3000+]
  end
  S1 -->|time-based delete| D1[Deleted]
  S2 -->|compaction| CS[Compacted Segment]
  S3 --> S4
  S4 -->|append| W[New Writes]

Segment lifecycle: old segments are either deleted (time-based) or compacted (key-based). The active segment receives all new writes. Only closed segments are eligible for cleanup.

Time-based retention deletes segments older than the configured period. Simple and predictable. At 69 TB per day with 7-day retention, you need roughly 1.45 PB of raw storage across the cluster (before replication).

Log compaction keeps only the latest message per key. This turns the log into a materialized snapshot of current state. Useful for changelogs: a consumer that starts fresh can rebuild the entire current state without reading the full history.

Log compaction runs as a background process. It reads old segments, drops superseded records, and writes new compacted segments. During compaction the broker uses extra disk space temporarily (about 2x the segment being compacted), so you need headroom.

Tiered storage is the modern answer to retention cost. Hot data (last few hours) stays on local NVMe SSDs for fast reads. Warm data moves to cheaper network-attached storage. Cold data moves to object storage like S3 at roughly $0.02/GB/month. A tiered setup can reduce total storage cost by 70% while keeping the full retention window available for replay.

The key insight: most consumers read near the tail of the log. Only a small fraction of reads touch data older than a few hours. Tiered storage optimizes for this access pattern by keeping the hot path fast and the cold path cheap.

5. Trade-offs and alternative approaches

Push vs pull consumers. Our design uses pull (consumers fetch messages). Push gives lower latency but risks overwhelming slow consumers. Pull lets each consumer control its own throughput. Most production systems choose pull.

Partition count. More partitions mean higher parallelism but also more file handles on brokers, longer leader election times, and higher end-to-end latency (each partition adds replication overhead). A sweet spot for most workloads is 6-12 partitions per topic, scaling up only when consumer lag grows.

In-memory vs disk. Writing to disk sounds slow, but sequential disk writes at 2 GB/s rival network throughput. The OS page cache keeps hot data in memory. Explicitly managing an in-memory buffer adds complexity for marginal gain.

Coordination service. ZooKeeper works but adds operational burden. Newer designs embed a Raft-based controller directly into the broker cluster, eliminating the external dependency. This simplifies deployment and reduces metadata propagation latency.

Ordering guarantees. Global ordering across partitions is expensive (single partition = single writer = throughput bottleneck). Partition-level ordering with key-based routing handles 99% of real use cases.

Exactly-once cost. Enabling idempotent producers adds a sequence number check on every write. Transactions add a two-phase commit across partitions. Together they increase broker CPU usage by roughly 15% and add 5-15 ms to publish latency. For most analytics workloads, at-least-once with idempotent consumers is simpler and cheaper.

Batch size tuning. Larger batches amortize network overhead and improve compression ratios. A 64 KB batch of 1 KB messages compresses to about 16 KB with LZ4. But larger batches increase the time a message sits in the producer buffer before being sent, raising end-to-end latency. Start with linger.ms=5 and batch.size=64KB, then adjust based on your latency budget.

6. What real systems actually do

Apache Kafka pioneered the partitioned, replicated log model. Kafka 3.x replaced ZooKeeper with KRaft (Kafka Raft) for metadata management. It supports exactly-once semantics via idempotent producers and transactions. Tiered storage (KIP-405) moves cold segments to S3. A typical production cluster runs 20-50 brokers with 10,000+ partitions.

Amazon SQS takes a different approach: no partitions, no ordering guarantees (standard queues), and at-least-once delivery. FIFO queues add ordering within message groups at lower throughput (3,000 msg/s per queue with batching). SQS is simpler to operate but less flexible. It shines when you need a queue between two services and do not want to manage infrastructure.

Apache Pulsar separates compute (brokers) from storage (Apache BookKeeper). This lets you scale serving and storage independently. Pulsar also supports both queueing and streaming semantics in one system. Its geo-replication is built in, unlike Kafka where you bolt on MirrorMaker.

Redpanda reimplements the Kafka protocol in C++ with a thread-per-core architecture. It removes the JVM garbage collection pauses that cause Kafka’s tail latency spikes and ships with an embedded Raft controller. Benchmarks show 10x lower p99 latency compared to Kafka on the same hardware.

Each system makes a different trade between operational simplicity, throughput, and feature richness. Kafka remains the default choice for high-throughput event streaming. SQS wins for teams that want zero operational overhead. Pulsar fits multi-tenant environments where storage elasticity matters.

7. What comes next

This design handles the common case well but leaves several problems open:

  • Schema evolution. Producers and consumers need to agree on message format. A schema registry (like Confluent Schema Registry) enforces compatibility rules so producers cannot break consumers.
  • Multi-region replication. Replicating across data centers adds latency and conflict resolution challenges. MirrorMaker 2 or Cluster Linking handle this for Kafka.
  • Exactly-once across services. Our design covers exactly-once within the queue boundary. End-to-end exactly-once across multiple services requires the outbox pattern or saga orchestration.
  • Backpressure and dead letter queues. When consumers cannot keep up, messages pile up. Dead letter queues catch poison pill messages that fail processing repeatedly, preventing a single bad message from blocking the entire partition.
  • Observability. Consumer lag (difference between latest offset and committed offset) is the single most important metric. Alert when lag grows faster than the consumer can drain it.
  • Auto-scaling consumers. Tying consumer group size to partition lag lets you scale horizontally under load. When lag exceeds a threshold, spin up a new consumer instance. When lag drops, scale down. Kubernetes KEDA supports this pattern natively with its Kafka scaler.
  • Priority queues. A flat partitioned log has no built-in priority mechanism. The common workaround is separate topics per priority level (high, medium, low) with consumers polling the high-priority topic first. This is simple but requires careful tuning to avoid starvation of lower-priority messages.

Start with a single topic, a handful of partitions, and acks=all. Measure consumer lag, tune batch sizes, and add partitions only when you have data showing you need them. Premature optimization in distributed messaging leads to operational complexity that is harder to unwind than a slow consumer.

Start typing to search across all content
navigate Enter open Esc close