Skip to main content

Kafka Partitions

What are Kafka Partitions?

Kafka Partitions are the unit of parallelism in Kafka. Each topic is divided into one or more partitions, and each partition is an ordered, immutable sequence of records that is continually appended to. Partitions are the fundamental building blocks that enable Kafka's scalability and performance.

Key Characteristics of Kafka Partitions

1. Ordered Sequence

  • Records within a partition are ordered by their offset
  • Order is guaranteed only within a partition, not across partitions
  • Each record has a unique offset within its partition

2. Immutable Log

  • Once written, records cannot be modified or deleted (within retention period)
  • Records are appended to the end of the partition
  • Each partition is stored as a sequence of log files

3. Parallelism Unit

  • Each partition can be processed independently
  • Multiple consumers can read from different partitions simultaneously
  • Partitions enable horizontal scaling

Partition Structure and Storage

1. Physical Storage

Topic: my-topic
├── Partition 0
│ ├── 00000000000000000000.log
│ ├── 00000000000000000001.log
│ └── 00000000000000000002.log
├── Partition 1
│ ├── 00000000000000000000.log
│ ├── 00000000000000000001.log
│ └── 00000000000000000002.log
└── Partition 2
├── 00000000000000000000.log
├── 00000000000000000001.log
└── 00000000000000000002.log

2. Log File Structure

// Each log file contains records with the following structure:
// [Record Length][Record Data]
// Record Data = [CRC][Magic Byte][Attributes][Timestamp][Key Length][Key][Value Length][Value]

// Example log entry:
// 4 bytes: Record length (e.g., 100)
// 100 bytes: Record data
// - 4 bytes: CRC
// - 1 byte: Magic byte
// - 1 byte: Attributes
// - 8 bytes: Timestamp
// - 4 bytes: Key length
// - N bytes: Key data
// - 4 bytes: Value length
// - M bytes: Value data

Partition Management

1. Creating Partitions

# Create topic with specific number of partitions
kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic my-topic \
--partitions 6 \
--replication-factor 3

# Increase partitions (can only increase, never decrease)
kafka-topics.sh --alter \
--bootstrap-server localhost:9092 \
--topic my-topic \
--partitions 12

2. Partition Assignment

// Producer can specify partition
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic", 0, "key", "value"); // Explicit partition 0

// Or let Kafka choose partition based on key
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic", "key", "value"); // Hash-based partitioning

3. Partition Information

# Get partition information
kafka-topics.sh --describe \
--bootstrap-server localhost:9092 \
--topic my-topic

# Output example:
# Topic: my-topic PartitionCount: 6 ReplicationFactor: 3
# Topic: my-topic Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
# Topic: my-topic Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
# Topic: my-topic Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2

Partitioning Strategies

1. Hash-Based Partitioning (Default)

// Default partitioning strategy
public class DefaultPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();

if (keyBytes == null) {
// Round-robin for null keys
return counter.incrementAndGet() % numPartitions;
}

// Hash-based partitioning for non-null keys
return Math.abs(Utils.murmur2(keyBytes)) % numPartitions;
}
}

2. Custom Partitioning

public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();

if (key == null) {
return 0; // Send null keys to partition 0
}

// Business logic-based partitioning
if (key instanceof String) {
String keyStr = (String) key;

// User events to first half of partitions
if (keyStr.startsWith("user-")) {
return Math.abs(keyStr.hashCode()) % (numPartitions / 2);
}

// System events to second half of partitions
if (keyStr.startsWith("system-")) {
return (numPartitions / 2) + Math.abs(keyStr.hashCode()) % (numPartitions / 2);
}
}

// Default hash-based partitioning
return Math.abs(Utils.murmur2(keyBytes)) % numPartitions;
}
}

3. Round-Robin Partitioning

// For even distribution without ordering requirements
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic", null, "message"); // null key = round-robin

Partition Replication

1. Leader and Followers

// Each partition has one leader and multiple followers
public class PartitionInfo {
private final String topic;
private final int partition;
private final Node leader; // Handles all read/write requests
private final Node[] replicas; // All replicas (including leader)
private final Node[] inSyncReplicas; // Replicas that are in sync
}

2. Replication Factor

# Create topic with replication factor 3
kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic replicated-topic \
--partitions 6 \
--replication-factor 3

# Check replication status
kafka-topics.sh --describe \
--bootstrap-server localhost:9092 \
--topic replicated-topic

3. In-Sync Replicas (ISR)

// Check ISR for a partition
public class PartitionHealthCheck {
public boolean isPartitionHealthy(String topic, int partition) {
List<PartitionInfo> partitions = consumer.partitionsFor(topic);

for (PartitionInfo partitionInfo : partitions) {
if (partitionInfo.partition() == partition) {
// Check if we have enough in-sync replicas
return partitionInfo.inSyncReplicas().length >= 2;
}
}
return false;
}
}

Partition Consumption

1. Consumer Assignment

// Consumer group assignment
public class PartitionAssignment {
public void assignPartitions() {
// Subscribe to topic - Kafka assigns partitions automatically
consumer.subscribe(Collections.singletonList("my-topic"));

// Or manually assign specific partitions
List<TopicPartition> partitions = Arrays.asList(
new TopicPartition("my-topic", 0),
new TopicPartition("my-topic", 1)
);
consumer.assign(partitions);
}
}

2. Partition Rebalancing

public class RebalancingConsumer {
public void handleRebalancing() {
consumer.subscribe(Collections.singletonList("my-topic"),
new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// Commit offsets before partitions are revoked
consumer.commitSync();
System.out.println("Partitions revoked: " + partitions);
}

@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println("Partitions assigned: " + partitions);

// Optionally seek to specific offsets
for (TopicPartition partition : partitions) {
consumer.seek(partition, 0); // Start from beginning
}
}
});
}
}

Partition Offset Management

1. Offset Tracking

public class OffsetTracker {
public void trackOffsets() {
Set<TopicPartition> partitions = consumer.assignment();

for (TopicPartition partition : partitions) {
// Get current position
long currentOffset = consumer.position(Collections.singletonList(partition))
.get(partition);

// Get beginning offset
long beginningOffset = consumer.beginningOffsets(Collections.singletonList(partition))
.get(partition);

// Get end offset
long endOffset = consumer.endOffsets(Collections.singletonList(partition))
.get(partition);

System.out.printf("Partition %d: current=%d, beginning=%d, end=%d%n",
partition.partition(), currentOffset, beginningOffset, endOffset);
}
}
}

2. Offset Seeking

public class OffsetSeeker {
public void seekToOffset() {
// Seek to beginning
consumer.seekToBeginning(Collections.singletonList(
new TopicPartition("my-topic", 0)));

// Seek to end
consumer.seekToEnd(Collections.singletonList(
new TopicPartition("my-topic", 0)));

// Seek to specific offset
consumer.seek(new TopicPartition("my-topic", 0), 1000);

// Seek to timestamp
Map<TopicPartition, Long> timestamps = Collections.singletonMap(
new TopicPartition("my-topic", 0), System.currentTimeMillis() - 3600000); // 1 hour ago
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);

for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsets.entrySet()) {
consumer.seek(entry.getKey(), entry.getValue().offset());
}
}
}

Partition Performance Optimization

1. Partition Count Calculation

public class PartitionCalculator {
public int calculateOptimalPartitions(int targetThroughput, int maxThroughputPerPartition) {
return Math.max(1, targetThroughput / maxThroughputPerPartition);
}

public int calculatePartitionsForConsumers(int numConsumers, int numProducers) {
return Math.max(numConsumers, numProducers);
}

// Example usage
public void example() {
int targetThroughput = 10000; // messages per second
int maxThroughputPerPartition = 1000; // messages per second per partition
int optimalPartitions = calculateOptimalPartitions(targetThroughput, maxThroughputPerPartition);
// Result: 10 partitions
}
}

2. Partition Distribution

public class PartitionBalancer {
public void checkPartitionBalance() {
List<PartitionInfo> partitions = consumer.partitionsFor("my-topic");

Map<Integer, Integer> brokerPartitionCount = new HashMap<>();

for (PartitionInfo partition : partitions) {
int leaderId = partition.leader().id();
brokerPartitionCount.merge(leaderId, 1, Integer::sum);
}

// Check if partitions are evenly distributed
int minPartitions = brokerPartitionCount.values().stream().mapToInt(Integer::intValue).min().orElse(0);
int maxPartitions = brokerPartitionCount.values().stream().mapToInt(Integer::intValue).max().orElse(0);

double balanceRatio = (double) minPartitions / maxPartitions;
System.out.println("Partition balance ratio: " + balanceRatio);
}
}

Partition Monitoring

1. Partition Metrics

public class PartitionMonitor {
public Map<String, Object> getPartitionMetrics(String topic, int partition) {
Map<String, Object> metrics = new HashMap<>();

TopicPartition topicPartition = new TopicPartition(topic, partition);

// Get partition size
long endOffset = consumer.endOffsets(Collections.singletonList(topicPartition))
.get(topicPartition);
metrics.put("size", endOffset);

// Get consumer lag
long currentOffset = consumer.position(Collections.singletonList(topicPartition))
.get(topicPartition);
metrics.put("lag", endOffset - currentOffset);

// Get partition leader
List<PartitionInfo> partitions = consumer.partitionsFor(topic);
for (PartitionInfo partitionInfo : partitions) {
if (partitionInfo.partition() == partition) {
metrics.put("leader", partitionInfo.leader().id());
metrics.put("replicas", partitionInfo.replicas().length);
metrics.put("inSyncReplicas", partitionInfo.inSyncReplicas().length);
break;
}
}

return metrics;
}
}

2. Partition Health Checks

public class PartitionHealthCheck {
public boolean isPartitionHealthy(String topic, int partition) {
try {
TopicPartition topicPartition = new TopicPartition(topic, partition);

// Check if partition is accessible
long endOffset = consumer.endOffsets(Collections.singletonList(topicPartition))
.get(topicPartition);

// Check if we can read from partition
consumer.assign(Collections.singletonList(topicPartition));
consumer.seek(topicPartition, Math.max(0, endOffset - 1));

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

return !records.isEmpty() || endOffset == 0;

} catch (Exception e) {
return false;
}
}
}

Common Partition Patterns

1. Time-Based Partitioning

public class TimeBasedPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();

// Use timestamp for partitioning
long timestamp = System.currentTimeMillis();
int hourOfDay = (int) ((timestamp / (1000 * 60 * 60)) % 24);

return hourOfDay % numPartitions;
}
}

2. Geographic Partitioning

public class GeographicPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();

if (value instanceof String) {
String valueStr = (String) value;

// Extract region from value and partition accordingly
if (valueStr.contains("US")) return 0;
if (valueStr.contains("EU")) return 1;
if (valueStr.contains("ASIA")) return 2;
}

return Math.abs(Utils.murmur2(keyBytes)) % numPartitions;
}
}

Best Practices for Partitions

1. Partition Count Guidelines

# Calculate optimal partitions
# Formula: max(consumers, producers) * throughput_per_partition
# Example: 6 consumers, 1000 msg/sec per partition
kafka-topics.sh --create \
--topic high-throughput-topic \
--partitions 12 \
--replication-factor 3

2. Key Design for Partitioning

// Good: Consistent keys for related messages
ProducerRecord<String, String> record1 =
new ProducerRecord<>("user-events", "user-123", "login");
ProducerRecord<String, String> record2 =
new ProducerRecord<>("user-events", "user-123", "logout");

// Bad: Random keys for related messages
ProducerRecord<String, String> record1 =
new ProducerRecord<>("user-events", UUID.randomUUID().toString(), "login");
ProducerRecord<String, String> record2 =
new ProducerRecord<>("user-events", UUID.randomUUID().toString(), "logout");

3. Partition Monitoring

// Monitor partition health
public class PartitionMonitor {
public void monitorPartitions() {
List<PartitionInfo> partitions = consumer.partitionsFor("my-topic");

for (PartitionInfo partition : partitions) {
// Check leader availability
if (partition.leader() == null) {
System.err.println("Partition " + partition.partition() + " has no leader");
}

// Check replication
if (partition.inSyncReplicas().length < 2) {
System.warn.println("Partition " + partition.partition() + " has insufficient replicas");
}
}
}
}

Best Practices Summary

  1. Choose appropriate partition count based on throughput and parallelism needs
  2. Use consistent keys for related messages to maintain ordering
  3. Monitor partition health and replication status
  4. Plan for partition growth - can only increase, never decrease
  5. Balance partitions across brokers for even load distribution
  6. Use custom partitioning when business logic requires it
  7. Monitor consumer lag per partition for performance insights
  8. Implement proper error handling for partition failures
  9. Test partition behavior under various failure scenarios
  10. Document partitioning strategy for team knowledge

Kafka Partitions are the fundamental unit of parallelism and scalability in Kafka. Understanding partition behavior, management, and best practices is crucial for building high-performance data streaming applications.