Home / System Design / Kafka Study Guide

Kafka Study Guide

Covers: Topics & Partitions, Producer Partitioning, Consumer Groups & Assignment, Replication & Durability, Offset Management, Compacted Topics & CDC, Exactly-Once Semantics Language: Java


1. Topics vs Partitions

Aspect Topic Partition
Nature Logical abstraction (the "what") Physical log segment on disk (the "how")
Ordering No global order Strict ordering guaranteed within partition
Replication Configured at topic level Each partition has N replicas
Parallelism Defined by partition count One consumer per partition per group
Scalability unit N/A Each partition has one broker leader

Key insight: Partition count is irreversible (can increase, not decrease). Over-partitioning causes:

  • Leader election overhead
  • 5MB memory buffer per partition per broker
  • Slower rebalancing

Rule of thumb: partitions = max expected consumers × 2–3, backed by throughput math.

Hard Limits to Know

Limit Value Notes
Throughput Millions of msgs/sec per broker LinkedIn processes 7 trillion messages/day across their cluster
Default message size 1MB Configurable up to 10MB+, but large messages hurt throughput significantly
Partitions per broker ~4K practical limit More partitions = more memory + slower failover
Consumer lag tolerance Hours or days behind Catches up without loss within retention window
Retention Configurable by time or size e.g. 7 days or 100GB per partition — kept regardless of consumption
End-to-end latency 5–15ms at acks=1 10–30ms at acks=all due to replication wait

2. Who Partitions Messages? The Partitioner

The Producer decides which partition to send to — not Kafka, not the broker. The broker just accepts and stores.

Producer → [Partitioner] → decides partition number → sends to correct broker

Scenario 1: Key Provided

ProducerRecord<String, String> record = new ProducerRecord<>(
    "orders",       // topic
    "user-123",     // key
    "{amount:500}"  // value
);
producer.send(record);
// Kafka runs: partition = murmur2Hash("user-123") % numberOfPartitions
// "user-123" ALWAYS goes to the same partition → ordering guaranteed

Guarantee: Same key → always same partition → ordered processing for that key.

Scenario 2: No Key (null)

ProducerRecord<String, String> record = new ProducerRecord<>(
    "orders",
    null,            // no key
    "{amount:500}"
);
// Kafka 2.4+: Sticky Partitioner (default)
// Fills one partition until linger.ms expires or batch.size reached → then picks new partition
// Before 2.4: pure round-robin (P0 → P1 → P2 → ...)

Why Sticky beats Round Robin: Round-robin creates tiny batches (bad throughput). Sticky fills full batches first → better compression and throughput.

Scenario 3: Custom Partitioner

public class OrderPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        int numPartitions = cluster.partitionCountForTopic(topic);
        String orderType = extractOrderType(value.toString());

        // VIP orders → dedicated partition 0
        if ("VIP".equals(orderType)) {
            return 0;
        }
        // Everyone else → spread across remaining partitions
        return (murmur2Hash(keyBytes) % (numPartitions - 1)) + 1;
    }
}

// Register it:
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, OrderPartitioner.class.getName());

Partitioner Decision Tree

Message arrives at producer
        │
        ▼
   Has a key?
   ├── YES → hash(key) % numPartitions → same partition every time
   │
   └── NO  → Sticky Partitioner
              ├── linger.ms not expired AND batch not full? → stay on same partition
              └── linger.ms expired OR batch full? → pick new partition

Key Design Cheat Sheet

Goal Key Strategy
Strict ordering per entity Use entity ID (userId, orderId)
Max throughput, no ordering needed null key (round-robin/sticky)
Co-locate related events Same key for related event types
Avoid hot partitions Composite key with time bucket or shard suffix
Transactional grouping Same key = same partition = same consumer

The Hot Partition Trap

// BAD — "US" = 80% of traffic → one partition overwhelmed
String key = order.getCountry();

// GOOD — composite key spreads load
String key = order.getCountry() + "-" + (order.getUserId() % 10);
// "US-0", "US-1" ... "US-9" → spread across 10 effective partitions
// Tradeoff: lose strict global ordering for US (keep ordering within a shard)

3. linger.ms and Batch Sending

Think of it like a bus departure schedule — how long the bus waits before leaving, hoping more passengers arrive.

props.put(ProducerConfig.LINGER_MS_CONFIG, 0);   // leave immediately (default) — low latency
props.put(ProducerConfig.LINGER_MS_CONFIG, 5);   // wait 5ms — better throughput
props.put(ProducerConfig.LINGER_MS_CONFIG, 20);  // wait 20ms — even bigger batches, higher latency

Two Triggers (whichever comes first)

props.put(ProducerConfig.LINGER_MS_CONFIG, 5);        // Trigger 1: timer expires
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);   // Trigger 2: 16KB batch full

Timeline with Sticky Partitioner + linger.ms=5

t=0ms:  msg1 arrives → goes to P2 → start 5ms timer
t=1ms:  msg2 arrives → also P2 (sticky — same partition, within 5ms window)
t=3ms:  msg3 arrives → also P2 (sticky)
t=5ms:  TIMER EXPIRES → send batch [msg1, msg2, msg3] to P2 together
t=5ms:  pick new partition P4 → start fresh

Real World Tuning

System linger.ms Reason
Stripe payments 0 Every ms counts
Walmart analytics 20 Throughput > latency
IoT sensors 50 Millions of tiny msgs, batch aggressively
Audit logs 10 Balanced

4. VIP Partitions — When and Why

How VIP Partitioning Works

// VIP consumer: manually assigned to partition 0 — dedicated, no rebalancing
KafkaConsumer<String, String> vipConsumer = new KafkaConsumer<>(props);
vipConsumer.assign(List.of(new TopicPartition("orders", 0)));
// Polls with smallest interval, highest priority processing

// Regular consumers: get P1-P5 via normal group assignment
KafkaConsumer<String, String> regularConsumer = new KafkaConsumer<>(props);
regularConsumer.subscribe(List.of("orders")); // coordinator assigns P1-P5

When VIP Partition Makes Sense

  • Topic has genuinely mixed priority traffic
  • SLA difference is significant (VIP < 100ms, regular < 5s)
  • You can reliably classify messages as VIP at produce time

When It's Overengineering

  • All messages have the same SLA
  • VIP traffic is < 1% of volume (dedicated consumer sits idle 99% of the time)
  • Can't reliably classify VIP at produce time

The Better Pattern: Separate Topics

// Separate topics beat VIP partitions for true priority differences
producer.send(new ProducerRecord<>("orders-vip",      key, value));
producer.send(new ProducerRecord<>("orders-standard", key, value));
producer.send(new ProducerRecord<>("orders-bulk",     key, value));

Why separate topics win:

  • Independent scaling — scale VIP consumers without touching standard
  • Independent retention — VIP 30 days, bulk 3 days
  • Independent replication — VIP RF=3, bulk RF=2 (cost saving)
  • Cleaner monitoring — lag alerts per topic, not per partition
  • Simpler consumer code — no "is this VIP?" check

Decision Framework

Same message type, mixed priority?
  └── Small SLA difference  → same topic, custom partitioner
  └── Large SLA difference  → separate topics

Multiple message types?
  └── Always separate topics

Same SLA for everything?
  └── No VIP needed — key-based partitioning for ordering only

5. Consumer Groups & Partition Assignment

The Core Rule

One partition can only be consumed by ONE consumer within a group — but one consumer can hold multiple partitions.

// Consumer subscribes to a TOPIC, not a partition — Kafka assigns partitions
Properties props = new Properties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processors");  // group is key
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
    CooperativeStickyAssignor.class.getName());

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("orders"));  // Kafka decides which partitions you get

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("Partition: %d | Offset: %d | Key: %s%n",
            record.partition(), record.offset(), record.key());
    }
}

How Assignment Works Internally

Step 1: Consumer joins group → sends JoinGroup to Group Coordinator broker
Step 2: One consumer elected "Group Leader" (first to join)
Step 3: Group Leader runs PartitionAssignor algorithm
Step 4: Leader sends assignments back to Coordinator
Step 5: Coordinator distributes via SyncGroup response to each consumer

Three Assignment Strategies

RangeAssignor (default — can be uneven):

Topic A: P0,P1,P2  |  Topic B: P0,P1,P2  |  2 Consumers
Consumer 1 → A-P0, A-P1, B-P0, B-P1  (4 partitions)
Consumer 2 → A-P2, B-P2              (2 partitions) ← UNEVEN

RoundRobinAssignor:

Consumer 1 → A-P0, A-P2, B-P1  (3 partitions)
Consumer 2 → A-P1, B-P0, B-P2  (3 partitions) ← balanced

CooperativeStickyAssignor (use in production):

Consumer3 dies.
Eager (old):    STOP ALL → reassign everything → resume  ← stop-the-world!
Cooperative:    ONLY P4,P5 revoked → redistributed → rest keep running ✓

Rebalance Listener — Walmart Inventory Example

consumer.subscribe(List.of("inventory-updates"), new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        consumer.commitSync();  // commit before giving up partitions
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        partitions.forEach(tp ->
            System.out.println("Now responsible for region: " +
                PARTITION_TO_REGION.get(tp.partition())));
    }
});

What Triggers a Rebalance

  • Consumer joins (scale up)
  • Consumer leaves gracefully (scale down)
  • Consumer crashes (missed heartbeats)
  • Topic partition count changes

Key Configs to Tune

props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000");   // ping every 3s
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "45000");     // dead after 45s
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");  // 5 min to process batch
// WARNING: if processing takes > max.poll.interval.ms → consumer kicked → rebalance triggered

Independent Consumer Groups Per Service

Each service gets its own group.id — meaning every service receives all events independently. They don't compete. Each maintains its own offset position in the log.

Topic: payment-events
        │
        ├──► group.id="reconciliation-service"   → owns its own offsets
        ├──► group.id="webhook-delivery-service" → owns its own offsets
        └──► group.id="analytics-service"        → owns its own offsets

Reconciliation being slow does NOT affect webhook delivery.
A slow analytics consumer does NOT cause any other service to lag.
// Reconciliation service
props.put(ConsumerConfig.GROUP_ID_CONFIG, "reconciliation-service");

// Webhook delivery service — completely independent
props.put(ConsumerConfig.GROUP_ID_CONFIG, "webhook-delivery-service");

// Analytics — can lag hours behind, nobody else is affected
props.put(ConsumerConfig.GROUP_ID_CONFIG, "analytics-service");

Real examples:

  • Stripe uses consumer groups per service — webhook delivery, reconciliation, and analytics each consume the same payment events independently at their own pace.
  • Uber publishes every GPS location update to Kafka. Multiple consumer groups (routing, surge pricing, analytics) read the same events independently without interfering with each other.

Consumer Scaling — Staged Model

Start with minimum consumers, scale up as lag grows, cap at partition count for maximum parallelism, optionally exceed it for hot standby.

Topic: payment-events — 5 partitions

Stage 1: Normal traffic — 1 consumer handles all 5 partitions
P0 ──┐
P1 ──┤
P2 ──┼──► Consumer A
P3 ──┤
P4 ──┘

Stage 2: Lag detected → scale up to 3 consumers
P0 ──► Consumer A
P1 ──► Consumer A
P2 ──► Consumer B
P3 ──► Consumer B
P4 ──► Consumer C

Stage 3: Peak traffic → 5 consumers → maximum parallelism
P0 ──► Consumer A
P1 ──► Consumer B
P2 ──► Consumer C
P3 ──► Consumer D
P4 ──► Consumer E

Stage 4: Hot standby → 7 consumers (2 extra as backup)
P0 ──► Consumer A
P1 ──► Consumer B
P2 ──► Consumer C
P3 ──► Consumer D
P4 ──► Consumer E
       Consumer F  ← idle, ready
       Consumer G  ← idle, ready

Consumer A crashes → Kafka immediately assigns P0 to Consumer F
No waiting for a new pod to spin up ✓

Who triggers each stage in production:

Stage 1→2→3  KEDA watches consumer lag metric → spins up pods automatically
Stage 4       Pre-provisioned hot standby, always running
Stage 3→2→1  Scale down after lag clears (cost saving)

The key rules:

  • Partitions = the ceiling on parallelism (you set this, never changes automatically)
  • Consumers = the workers (you scale these, Kafka assigns partitions to them)
  • More consumers than partitions = idle consumers, but useful as hot standby
  • Kafka only auto-handles who gets which partition as consumers come and go — everything else is your infrastructure

6. Replication & Durability (acks, ISR)

Replication Architecture

Topic: payments  |  Partition 0  |  Replication Factor: 3

Broker 1 (Leader)  ← Producers write HERE only
    ├──► Broker 2 (Follower / ISR)  fetches from leader
    └──► Broker 3 (Follower / ISR)  fetches from leader

Consumers read from Leader (or followers with rack-aware config)

ISR — In-Sync Replicas

ISR = replicas that are caught up with leader within replica.lag.time.max.ms (default 30s).

ISR = {B1, B2, B3}  ← healthy
Broker3 gets slow (GC pause, network):
ISR = {B1, B2}      ← B3 kicked from ISR
Broker3 catches up:
ISR = {B1, B2, B3}  ← rejoins

The acks Setting

// acks=0: Fire and forget
props.put(ProducerConfig.ACKS_CONFIG, "0");
// No wait for acknowledgment | Use: metrics, logs | Throughput: MAX | Durability: NONE | Latency: ~1ms

// acks=1: Leader only
props.put(ProducerConfig.ACKS_CONFIG, "1");
// Wait for leader to write | Risk: leader acks → dies → MESSAGE LOST
// Use: moderate importance | Throughput: HIGH | Durability: PARTIAL | Latency: 5–15ms

// acks=all: All ISR must acknowledge
props.put(ProducerConfig.ACKS_CONFIG, "all");
// Wait for ALL ISR replicas | Use: payments, orders, financial
// Throughput: LOWER | Durability: STRONG | Latency: 10–30ms (replication overhead)

The Critical Combo: acks=all + min.insync.replicas

// Producer:
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

// Topic/Broker config: min.insync.replicas=2
// Replication Factor=3, min.insync.replicas=2

// ISR={B1,B2,B3} → writes succeed ✓
// ISR={B1,B2}    → writes succeed ✓ (meets minimum of 2)
// ISR={B1}       → writes FAIL with NotEnoughReplicasException ✗
//                   Better to reject than silently lose data!

Stripe Payment Producer — Full Example

public class StripePaymentProducer {
    public StripePaymentProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
        props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 5);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        this.producer = new KafkaProducer<>(props);
    }

    public CompletableFuture<RecordMetadata> sendPayment(String paymentId, String json) {
        ProducerRecord<String, String> record = new ProducerRecord<>(
            "payments", paymentId, json  // paymentId as key → ordering per payment
        );
        CompletableFuture<RecordMetadata> future = new CompletableFuture<>();
        producer.send(record, (metadata, ex) -> {
            if (ex != null) future.completeExceptionally(ex);
            else future.complete(metadata);
        });
        return future;
    }
}

Leader Failure and Election

Broker1 (Leader) crashes
Controller detects via KRaft (within replica.lag.time.max.ms)
Only ISR members eligible → Broker2 elected as new leader
Producer gets LeaderNotAvailableException → retries → connects to Broker2

unclean.leader.election.enable=true  ← DANGEROUS, never in finance
  → Out-of-sync replica can become leader → data loss possible
  → Only acceptable for log aggregation where some loss is OK

7. Offset Management

What Is an Offset?

Partition 0:
  Offset 0: {"orderId":"A1","amount":100}
  Offset 1: {"orderId":"A2","amount":200}
  Offset 2: {"orderId":"A3","amount":300}  ← consumer processed this
  Offset 3: {"orderId":"A4","amount":400}  ← next to read

Committed offset = 2 → "I have successfully processed up to offset 2"
Stored in internal topic: __consumer_offsets (50 partitions by default)

Auto Commit — The Danger

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);

// PROBLEM TIMELINE:
// t=0s:  Poll [offset 0,1,2,3,4]
// t=3s:  Processing offset 2
// t=5s:  AUTO COMMIT fires → commits offset 4 ← HAVEN'T PROCESSED 3,4 YET!
// t=6s:  Consumer crashes
// t=7s:  Restarts → reads from offset 5 → offsets 3,4 NEVER PROCESSED

Manual Commit — Production Standard

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        process(record);

        // Commit per record (safe, slower):
        Map<TopicPartition, OffsetAndMetadata> offset = Map.of(
            new TopicPartition(record.topic(), record.partition()),
            new OffsetAndMetadata(record.offset() + 1)  // +1 = next offset to read
        );
        consumer.commitSync(offset);
    }
    // OR commit entire batch (faster):
    consumer.commitSync();
}

commitSync vs commitAsync

// commitSync: blocks, retries on failure — use for critical checkpoints
try {
    consumer.commitSync();
} catch (CommitFailedException e) {
    // partition was revoked (rebalance) — handle gracefully
}

// commitAsync: non-blocking, does NOT retry (to avoid out-of-order commits)
consumer.commitAsync((offsets, exception) -> {
    if (exception != null) log.error("Async commit failed: {}", offsets, exception);
    // Don't retry — a later commitAsync may have already succeeded
});

// BEST PRACTICE — hybrid (Stripe/Walmart scale):
try {
    while (running) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        processRecords(records);
        consumer.commitAsync();   // fast path: non-blocking during normal operation
    }
} finally {
    consumer.commitSync();        // on shutdown: blocking, ensures last offsets saved
    consumer.close();
}

Offset Reset and Seeking

// First start or lost offsets:
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // replay from beginning
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");   // only new messages
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");     // throw exception

// Seek to specific offset (after bad deploy):
consumer.seek(new TopicPartition("payments", 2), 10500L);

// Seek by timestamp — replay last 1 hour (Walmart incident recovery):
Map<TopicPartition, Long> timestampMap = consumer.assignment().stream()
    .collect(Collectors.toMap(tp -> tp,
        tp -> System.currentTimeMillis() - Duration.ofHours(1).toMillis()));

consumer.offsetsForTimes(timestampMap).forEach((tp, offsetAndTs) -> {
    if (offsetAndTs != null) consumer.seek(tp, offsetAndTs.offset());
});

8. Compacted Topics & CDC

Log Compaction vs Regular Retention

Regular topic (time-based retention):
  [key=A val=1] [key=B val=2] [key=A val=3] [key=C val=4] [key=A val=5]
  After 7 days → everything deleted

Compacted topic:
  [key=A val=1] [key=B val=2] [key=A val=3] [key=C val=4] [key=A val=5]
  After compaction:
  [key=B val=2] [key=C val=4] [key=A val=5]  ← only LATEST value per key
  → Like a changelog: current state of every entity, forever

Creating a Compacted Topic

NewTopic inventoryTopic = new NewTopic("inventory-state", 12, (short) 3);
inventoryTopic.configs(Map.of(
    "cleanup.policy",            "compact",
    "min.cleanable.dirty.ratio", "0.1",       // compact when 10% of log is dirty
    "segment.ms",                "3600000",    // roll new segment every hour
    "delete.retention.ms",       "86400000"    // tombstones kept 24h before deletion
));
AdminClient.create(props).createTopics(List.of(inventoryTopic));

Tombstone Records — Deleting from Compacted Topic

ProducerRecord<String, String> tombstone = new ProducerRecord<>(
    "inventory-state",
    "SKU-12345",   // key
    null           // null value = tombstone = delete this key after delete.retention.ms
);
producer.send(tombstone);

CDC with Debezium — Walmart Inventory

MySQL inventory DB → Debezium (reads binlog) → Kafka compacted topic
                                                      │
                                    ┌─────────────────┼─────────────────┐
                                    ▼                 ▼                 ▼
                              Elasticsearch         Redis          Warehouse
                             (search index)    (cache inval)    (restock logic)
// Debezium CDC event:
{
  "before": {"sku": "W123", "quantity": 50},
  "after":  {"sku": "W123", "quantity": 45},
  "op": "u",   // u=update, c=create, d=delete, r=read(snapshot)
  "ts_ms": 1714000000000
}

// Consumer:
public void processCDCEvent(ConsumerRecord<String, String> record) {
    JsonNode event = objectMapper.readTree(record.value());
    switch (event.get("op").asText()) {
        case "u":
            int delta = event.get("after").get("quantity").asInt()
                      - event.get("before").get("quantity").asInt();
            if (delta < 0 && event.get("after").get("quantity").asInt() < LOW_STOCK) {
                triggerRestockAlert(event.get("after").get("sku").asText());
            }
            break;
        case "d":
            removeFromSearchIndex(record.key());
            invalidateCache(record.key());
            break;
        case "c":
            indexNewProduct(event.get("after"));
            break;
    }
}

Bootstrap State from Compacted Topic

// On startup: replay entire compacted topic → rebuild in-memory state (no DB call needed)
Map<String, Integer> inventoryCache = new HashMap<>();
consumer.subscribe(List.of("inventory-state"));
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumer.assignment());

while (!isCaughtUp(consumer, endOffsets)) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        if (record.value() == null) {
            inventoryCache.remove(record.key());           // tombstone = delete
        } else {
            inventoryCache.put(record.key(), Integer.parseInt(record.value()));
        }
    }
}
// inventoryCache = full current inventory state, loaded from Kafka alone

9. Exactly-Once Semantics (EOS)

The Three Delivery Guarantees

At-most-once:   may LOSE messages, never duplicates
                → commit offset BEFORE processing

At-least-once:  never lost, may DUPLICATE  ← most common
                → commit offset AFTER processing

Exactly-once:   never lost, never duplicated  ← hardest, most important

Layer 1: Idempotent Producer

props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Kafka assigns producer a PID + sequence number per partition
// Broker deduplicates: same PID + sequence arrives twice → second ignored
// Scope: single producer session, single partition
// Protects against: network retries creating duplicates

Layer 2: Transactions — Atomic Multi-Partition Writes

props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "payment-processor-1"); // stable across restarts

producer.initTransactions();

try {
    producer.beginTransaction();

    // ALL or NOTHING — atomic across multiple partitions/topics
    producer.send(new ProducerRecord<>("account-debits",  accountId, debitJson));
    producer.send(new ProducerRecord<>("account-credits", accountId, creditJson));
    producer.send(new ProducerRecord<>("payment-events",  paymentId, eventJson));

    producer.commitTransaction();  // ALL visible atomically

} catch (ProducerFencedException e) {
    producer.close();              // zombie — another instance with same ID took over
} catch (KafkaException e) {
    producer.abortTransaction();   // NONE visible → safe to retry
}

Layer 3: Transactional Read-Process-Write (Full EOS)

// consume → process → produce → commit offsets — ALL atomic
// Consumer must use isolation.level=read_committed to only see committed transactions
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

producer.initTransactions();

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    if (records.isEmpty()) continue;

    producer.beginTransaction();
    try {
        for (ConsumerRecord<String, String> record : records) {
            String processed = enrichPayment(record.value());
            producer.send(new ProducerRecord<>("enriched-payments", record.key(), processed));
        }

        // Commit consumer offsets INSIDE the transaction — this is what makes it atomic
        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
        for (TopicPartition tp : records.partitions()) {
            long lastOffset = records.records(tp).get(records.records(tp).size() - 1).offset();
            offsets.put(tp, new OffsetAndMetadata(lastOffset + 1));
        }
        producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
        producer.commitTransaction();

    } catch (Exception e) {
        producer.abortTransaction();
        // Records not committed → will be re-polled → safe retry
    }
}

When to Use What

System Guarantee Reason
Stripe payments Exactly-once Double charge = disaster
Walmart inventory At-least-once Duplicate stock update is idempotent (SET not ADD)
Click analytics At-most-once Losing 0.01% of clicks is fine, throughput > accuracy
Audit logs At-least-once Duplicates filtered by log ID downstream
Order fulfillment Exactly-once Ship twice = costly
Metrics/monitoring At-most-once Approximate is fine, latency matters more

10. Brokers, Partitions & Memory

What is a Broker?

A broker is simply a Kafka server — a machine (or container) running the Kafka process that stores messages on disk and serves producers and consumers.

Your Kafka Cluster

┌─────────────┐  ┌─────────────┐  ┌─────────────┐
│   Broker 1  │  │   Broker 2  │  │   Broker 3  │
│  (server)   │  │  (server)   │  │  (server)   │
└─────────────┘  └─────────────┘  └─────────────┘

Producers write to brokers.
Brokers store messages on disk.
Consumers read from brokers.

A cluster typically has 3, 6, or more brokers. Stripe might have dozens. LinkedIn has thousands.

What Does Each Broker Hold?

Each broker holds partitions — some as leader, some as follower replicas.

Topic: payment-events  6 partitions  Replication Factor: 3

Broker 1: Leader of P0, P2    Follower of P1, P3, P4, P5
Broker 2: Leader of P1, P3    Follower of P0, P2, P4, P5
Broker 3: Leader of P4, P5    Follower of P0, P1, P2, P3

Every partition — whether leader or follower — consumes resources on that broker.

The 5MB Memory Buffer — What It Actually Is

For every partition it holds, a broker allocates memory buffers to handle reads and writes efficiently.

Per partition, per broker:

  Producer buffer    ~1MB   holds incoming messages before writing to disk
  Consumer buffer    ~1MB   holds messages being sent out to consumers
  Replication buffer ~1MB   holds messages being replicated to followers
  Index + metadata   ~2MB   tracks offsets, file positions, partition state

  Total:             ~5MB per partition per broker

The 4K Partition Limit — Not the Same as 5MB

These are two separate facts that are related but not the same:

5MB  = the cost of ONE partition on ONE broker (memory buffers)
4K   = the maximum number of partitions recommended PER broker

5MB is one ingredient.
4K  is the total limit when ALL costs combined become dangerous.

Each partition costs more than just memory:

Each partition on a broker costs:

  1. ~5MB memory buffers       ← the 5MB rule
  2. File handles on disk      ← OS has limits on open files
  3. Network connections       ← followers replicate from leader
  4. Metadata tracked by       ← Kafka controller tracks every
     the Kafka controller         partition state in the cluster

All four add up → at ~4K partitions per broker the combined
cost becomes dangerous, especially during failure and recovery.

Leader Election Overhead — Why Too Many Partitions Hurts

Every partition has one leader broker. When a broker crashes, all its leader partitions need a new leader elected.

Broker 1 crashes — it was leader of 500 partitions

Kafka Controller must now run election for each partition:
  election for P0  → pick new leader from ISR  (~10ms)
  election for P1  → pick new leader from ISR  (~10ms)
  ... 500 times

500 elections × 10ms = 5 seconds of disruption

Now imagine over-partitioned:
4000 elections × 10ms = 40 seconds of disruption
                        ← producers getting errors
                        ← consumers stopped receiving
                        ← your system looks down

RAM Calculation — How Much Do You Actually Need?

Your math:
4000 partitions × 5MB = 20GB  ← partition buffers alone ✓

But a broker needs more than just partition buffers:

  Partition buffers       20GB   your calculation, correct
  JVM heap (Kafka)         6GB   recommended minimum for broker process
  OS page cache           10GB+  Kafka relies on this for fast reads (see below)
  OS itself + other        2GB

  Realistic minimum:     ~38GB
  Production standard:   64GB–128GB per broker

Page Cache — The Part Most People Miss

Kafka does not read from disk directly when serving consumers. It reads from the OS page cache — messages the OS has cached in RAM from recent disk writes.

Producer writes message → saved to disk → OS also caches it in RAM (page cache)
Consumer reads message  → Kafka serves it from page cache, NOT from disk
                          ← this is why Kafka is so fast

More page cache RAM = more messages served from RAM = lower latency
Less page cache RAM = more disk reads = slower consumers

Full Picture Together

More partitions =

  1. More memory consumed        (5MB × partitions × brokers)
  2. Slower failover             (election per partition on broker crash)
  3. More file handles on disk   (each partition = multiple segment files)
  4. Slower consumer rebalancing (more partitions to reassign)

~4K per broker is the practical ceiling where these costs become dangerous.

RAM sizing summary:
  20GB  = floor (partition buffers only)
  38GB  = realistic minimum
  64GB–128GB = what you actually provision in production

11. Partition Count Math

Target throughput:    500 MB/s ingress
Single partition:     ~10 MB/s (typical broker write speed)
Partitions needed:    500 / 10 = 50 partitions minimum

Consumer parallelism: 20 consumer instances max planned → need at least 20 partitions
                      50 satisfies both constraints

Final decision:       60 partitions (next multiple of common consumer counts: 2,3,4,5,6)
                      → can run 60,30,20,15,12,10,6,5,4,3,2,1 consumers evenly

Replication factor:   3 (tolerate 2 broker failures, min.insync.replicas=2)

12. Key Answers Cheat Sheet

"Who decides which partition a message goes to?"

The Producer, via a Partitioner class. Default: murmur2 hash of key mod partition count for keyed messages; sticky partitioning for null-key messages. Fully overridable with a custom Partitioner.

"How does a consumer know which partition to consume?"

It doesn't choose — it subscribes to a topic and the Group Coordinator assigns partitions via a PartitionAssignor. The consumer uses CooperativeStickyAssignor in production to avoid stop-the-world rebalances.

"What is ISR and why does it matter?"

ISR is the set of replicas caught up with the leader within replica.lag.time.max.ms. Combined with acks=all and min.insync.replicas=2, it guarantees writes are only acknowledged when durably replicated. If ISR falls below min.insync.replicas, writes fail fast rather than silently losing data.

"Explain exactly-once semantics."

Three layers: idempotent producer (deduplicates retries per partition), transactions (atomic multi-partition writes), and transactional read-process-write with sendOffsetsToTransaction (atomic consume + produce + offset commit). Consumer needs isolation.level=read_committed to only see committed data.

"How do you handle a hot partition?"

Diagnose with consumer lag metrics per partition. Fix options: composite key with shard suffix to spread load, custom partitioner for business-driven routing, or separate topics for fundamentally different traffic classes. Always backed by throughput math — partitions = target MB/s / 10 MB/s per partition.

"The tradeoff triangle"

Every Kafka config decision trades off throughput, durability, and latency. Stripe sacrifices throughput for durability (acks=all, linger.ms=0). Walmart analytics sacrifices durability for throughput (acks=1, linger.ms=20). Justify your choice for the system being designed.


13. System Design — Kafka Usage, Partition Keys & Hot Key Solutions


Stripe — Payment Processing

Where Kafka is used:

Payment submitted → Kafka → reconciliation, webhook delivery, fraud detection, analytics

Topics & Keys:

Topic Partition Key Why
payment-events paymentId All events for one payment stay ordered
account-events accountId Debit + credit for same account stay ordered
fraud-signals userId All fraud signals per user in sequence
webhook-delivery merchantId All webhooks per merchant in order

Hot Key Problem:

Key = countryCode → "US" = 80% of all payments → P2 overwhelmed

Fix: composite key with shard
key = countryCode + "-" + (paymentId.hashCode() % 10)
→ "US-0", "US-1" ... "US-9"
→ US traffic spread across 10 effective slots

Tradeoff: lose strict global ordering across all US payments
          keep ordering within each shard (acceptable for Stripe)

acks setting: acks=all — double charge = disaster, durability over throughput.


Uber — Real-Time Location Tracking

Where Kafka is used:

Driver GPS update (every 4 seconds) → Kafka → routing, surge pricing, ETA, analytics

Topics & Keys:

Topic Partition Key Why
driver-location driverId All location updates for one driver in order
ride-events rideId All events for one ride stay ordered
surge-signals cityZoneId Group signals by geographic zone
driver-status driverId Status changes per driver in sequence

Hot Key Problem:

Key = cityId → "NYC" has 50,000 active drivers
→ All NYC updates hammer one partition

Fix 1: composite key with geohash
key = geohash(lat, lng, precision=6)
→ splits city into ~1km grid cells
→ "dr5ru", "dr5rv", "dr5rw" ... each gets own partition

Fix 2: separate topic per city tier
  topic: driver-location-tier1  (NYC, LA, Chicago — high volume)
  topic: driver-location-tier2  (mid-size cities)
  topic: driver-location-tier3  (small cities)
→ independent scaling per tier

acks setting: acks=1 — losing one GPS ping is acceptable, latency matters more.


Walmart — Inventory Management

Where Kafka is used:

POS sale → Kafka → inventory update, restock alerts, analytics, CDC to search index

Topics & Keys:

Topic Partition Key Why
inventory-updates skuId All updates for one SKU ordered — no race conditions
inventory-state skuId Compacted topic — current state per SKU
restock-alerts warehouseId Group alerts by warehouse for local processing
sales-events storeId All sales per store in order
price-changes skuId Price updates per SKU stay ordered

Hot Key Problem:

Key = categoryId → "Electronics" category has 10,000 SKUs
→ All electronics inventory updates go to one partition

Fix: use skuId as key, not categoryId
→ each SKU gets naturally distributed across partitions
→ ordering guaranteed per SKU (what actually matters)
→ no artificial grouping that creates hot spots

acks setting: acks=1 — duplicate inventory update is idempotent (SET quantity, not ADD). Special: inventory-state is a compacted topic — keeps only latest quantity per SKU.


LinkedIn — Activity Feed & Notifications

Where Kafka is used:

User action (post, like, connect) → Kafka → feed ranking, notifications, search index, analytics

Topics & Keys:

Topic Partition Key Why
user-activity userId All actions per user in order for feed ranking
notifications recipientId All notifs for one user delivered in order
connection-events userId Connection graph updates per user
feed-updates userId Feed items per user in sequence

Hot Key Problem:

Key = userId → celebrity/influencer with 5M followers
→ Every action by that user fans out to millions of notifications
→ Single userId key = one partition overwhelmed

Fix: fan-out at consumer side, not producer side
  Producer: key = sourceUserId (stays on one partition — correct)
  Consumer: reads event → fans out to follower notification queues
  
  topic: notification-fanout  key = recipientId + "-" + bucket(0-9)
  → spread notification delivery across shards per recipient

acks setting: acks=1 — losing a feed update is acceptable, throughput is critical at LinkedIn scale (7 trillion messages/day).


Netflix — Video Streaming Events

Where Kafka is used:

Play/pause/seek/error events → Kafka → recommendations, billing, A/B testing, error tracking

Topics & Keys:

Topic Partition Key Why
playback-events sessionId All events for one viewing session in order
user-events userId User behaviour in sequence for recommendations
error-events deviceType Group errors by device for monitoring
billing-events accountId Billing events per account ordered

Hot Key Problem:

Key = contentId → "Stranger Things S4 premiere" 
→ 10M concurrent viewers all generate events with same contentId
→ one partition overwhelmed

Fix: key = sessionId (not contentId)
→ each viewing session is independent
→ naturally distributed across all partitions
→ you still know which content via the message value, not the key

Rule: The key controls distribution and ordering. If you don't need ordering across sessions for the same content, don't use contentId as the key.


Twitter/X — Tweet & Timeline Events

Where Kafka is used:

Tweet posted → Kafka → timeline fanout, search index, trends, moderation, analytics

Topics & Keys:

Topic Partition Key Why
tweet-events tweetId All events for one tweet (edit, delete) ordered
timeline-fanout followerId Fanout delivery per follower
trend-signals hashtag Group signals per hashtag for trend calculation
moderation-queue null No ordering needed, max throughput

Hot Key Problem:

Key = hashtag → #WorldCup trending
→ millions of tweets per minute with same hashtag
→ one partition overwhelmed

Fix 1: composite key with time bucket
key = hashtag + "-" + (epochSecond / 10)
→ "#WorldCup-171400000", "#WorldCup-171400001" ...
→ each 10-second window gets its own partition slot
→ trend aggregation merges buckets downstream

Fix 2: null key for ingestion, aggregate downstream
→ tweets distributed evenly across partitions
→ Kafka Streams or Flink aggregates by hashtag with windowed counts
→ trend topic emits results per hashtag

Ride-Share / Food Delivery (Generic — DoorDash, Lyft)

Where Kafka is used:

Order placed → Kafka → driver matching, restaurant notification, ETA, payment, analytics

Topics & Keys:

Topic Partition Key Why
order-events orderId All order lifecycle events in sequence
driver-assignments driverId All assignments per driver ordered
restaurant-events restaurantId All events per restaurant in order
delivery-tracking orderId All tracking updates per order in sequence

Hot Key Problem:

Key = restaurantId → popular restaurant gets 500 orders/minute during lunch
→ one partition overwhelmed

Fix: composite key
key = restaurantId + "-" + (orderId.hashCode() % 5)
→ "rest-123-0", "rest-123-1" ... "rest-123-4"
→ spreads load while keeping most restaurant orders grouped
→ 5 shards = 5x throughput for hot restaurants

Hot Key — Universal Fix Patterns

No matter the system, hot keys are solved with the same small set of patterns:

Pattern 1: Composite Key with Shard Suffix

// When: key is naturally skewed (country, category, popular entity)
String key = naturalKey + "-" + (entityId.hashCode() % NUM_SHARDS);
// Tradeoff: lose strict global ordering for that key — acceptable if per-entity ordering is enough

Pattern 2: Geohash Key (location data)

// When: key is a location (city, region)
String key = GeoHash.encode(lat, lng, precision); // precision=6 → ~1km cells
// Tradeoff: nearby cells may have different load — tune precision to balance

Pattern 3: Time-Bucketed Key (trending/aggregation)

// When: key is a trending entity (hashtag, event)
String key = naturalKey + "-" + (Instant.now().getEpochSecond() / BUCKET_SIZE_SECONDS);
// Tradeoff: downstream must merge buckets to get full aggregation

Pattern 4: Null Key + Downstream Aggregation

// When: ordering at ingest doesn't matter, aggregate later
producer.send(new ProducerRecord<>("events", null, value)); // max throughput
// Kafka Streams or Flink aggregates by real key downstream
// Tradeoff: no ordering guarantee at ingest — only use when truly not needed

Pattern 5: Separate Topics by Tier

// When: a small number of entities generate disproportionate volume
producer.send(new ProducerRecord<>("events-tier1", key, value)); // top 100 entities
producer.send(new ProducerRecord<>("events-tier2", key, value)); // everyone else
// Tradeoff: need to classify at produce time, manage two consumer pipelines

Quick Reference — All Systems

System Key Topic Partition Key Hot Key Risk Fix
Stripe payment-events paymentId countryCode Composite with shard
Uber driver-location driverId cityId Geohash key
Walmart inventory-updates skuId categoryId Use skuId not category
LinkedIn user-activity userId Celebrity userId Fan-out at consumer
Netflix playback-events sessionId contentId Use sessionId not content
Twitter tweet-events tweetId Trending hashtag Time-bucketed key
DoorDash order-events orderId Popular restaurant Composite with shard