Learn Kafka by rebuilding it from scracth
Kafka is a distributed event streaming platform that powers real-time data pipelines and applications. While many use Kafka for its scalability and fault tolerance, understanding its internals can be challenging. The best way to truly grasp Kafka’s architecture is to build a simplified version of it.
In this blog, I will do my best to break down Kafka’s internals by constructing a simple implementation of it. This will give you hands-on experience with fundamental concepts like brokers, topics, partitions, producers, consumers, and consumer groups. I will use Java for coding, and I will also provide a Node.js version of the implementation.
Before diving into building the internals of Kafka, let's first understand how Kafka works.
Understanding Kafka’s Core Components
Kafka consists of three main components:
- Brokers – Servers that store and manage messages.
- Producers – Send messages to Kafka topics.
- Consumers – Read messages from Kafka topics.
Kafka Brokers
Kafka brokers are servers that manage message storage and retrieval. They play a crucial role in ensuring data availability and reliability.
Here's how data flows in a broker:
- Message Reception – Brokers receive messages from producers.
- Partition Management – Messages are distributed across partitions within topics.
- Data Storage – Messages are persisted to disk for a configured retention period.
- Consumer Coordination – Brokers track which messages have been read by consumers.
- Load Balancing – Distribute workload across multiple brokers for scalability.
A Kafka cluster typically consists of multiple brokers working together to handle high volumes of data efficiently.
Topics and Partitions
What is Topic?
Unlike traditional messaging systems that use a point-to-point model, where each message is consumed by only one receiver, Kafka follows a publish-subscribe (pub-sub) model, where multiple consumers can read from the same topic independently.
A topic is a logical grouping of messages that belong to a particular category. Producers publish messages to a topic, and consumers subscribe to a topic to retrieve relevant data.
What are Partitions?
Each topic is divided into partitions, allowing Kafka to:
- Scale horizontally – Messages are distributed across multiple partitions, enabling parallel processing.
- Ensure fault tolerance – If one partition fails, another broker can take over.
- Maintain order – Messages within a partition maintain strict ordering.
Each partition acts as a separate, ordered log file stored on a broker. Every message within a partition is assigned a unique, incremental offset.
When producers send messages, Kafka automatically assigns them to partitions based on the partitioning strategy.
Producers and Consumers
Producers (Message Senders)
Producers are responsible for creating and sending messages to Kafka topics. Kafka uses different partitioning strategies to decide how messages are distributed across partitions:
-
Key-based Partitioning (if a key is provided)
- The producer assigns a key to each message.
- Kafka uses a hash function (hash(key) % num_partitions) to determine which partition the message should go to.
- Ensures that messages with the same key always go to the same partition, maintaining order for that key.
- Example: In an e-commerce system, messages for a specific user ID are always routed to the same partition.
-
Round-Robin Partitioning (Default if no key is provided)
- Messages are distributed evenly across all partitions.
- Ensures load balancing but does not guarantee order across partitions.
- Example: If logs are being sent without a key, they are spread across all partitions for better throughput.
-
Custom Partitioning (User-defined logic)
- Developers can implement a custom partitioning strategy using a Partitioner class.
- Useful for specific use cases, such as geographical routing or priority-based messaging.
To optimize performance, messages are batched before sending.
Consumers (Message Receivers)
Consumers subscribe to topics and retrieve messages from partitions. Each consumer tracks its position using an offset, which is a unique identifier for each message.
Kafka supports consumer groups, where multiple consumers share the workload:
Kafka supports consumer groups, where multiple consumers work together to process messages from a topic. This allows for scalability, fault tolerance, and parallel processing.
How Consumer Groups Work
- Each consumer group is identified by a group ID.
- Within a group, each consumer is assigned a subset of partitions from the topic.
- No two consumers in the same group read the same partition—each partition is assigned exclusively to one consumer.
- If the number of consumers exceeds the number of partitions, some consumers will remain idle (since each partition can only be assigned to one consumer at a time).
- If a consumer fails or disconnects, Kafka automatically reassigns its partitions to the remaining consumers. This is called rebalancing.
Consider a system with three consumers in a group:
Consumer | Assigned Partition |
---|---|
Consumer 1 | Reads from Partition 0 |
Consumer 2 | Reads from Partition 1 |
Consumer 3 | Reads from Partition 2 |
If Consumer 2 crashes, Kafka redistributes Partition 1 among the remaining consumers:
Consumer | Assigned Partition After Failure |
---|---|
Consumer 1 | Reads from Partition 0 and 1 |
Consumer 3 | Reads from Partition 2 |
When Consumer 2 comes back online, Kafka rebalances again and may assign Partition 1 back to it.
Kafka consumers always belong to a consumer group, which determines how messages are read from partitions.
Consider a we have three consumers in the Same Consumer Group
Scenario: We have one topic with three partitions and three consumers in the same consumer group (group-A
).
Consumer Group | Consumer | Assigned Partition |
---|---|---|
group-A | Consumer 1 | Reads from Partition 0 |
group-A | Consumer 2 | Reads from Partition 1 |
group-A | Consumer 3 | Reads from Partition 2 |
In this case:
- Each partition is assigned to one consumer in the group.
- Consumers in the same group divide the work; each message is processed by only one consumer in the group.
- If a consumer fails, Kafka reassigns its partition to another active consumer in the same group.
- Messages are not duplicated—each message is consumed only once per group.
In this case data is evenly distributed among multiple consumers to scale processing.
Now consider a different case where we have three consumers in different consumer Groups.
Scenario: We have one topic with three partitions and three consumers, but each belongs to a different consumer group (group-A
, group-B
, group-C
).
Consumer Group | Consumer | Assigned Partition |
---|---|---|
group-A | Consumer 1 | Reads all partitions (0,1,2) |
group-B | Consumer 2 | Reads all partitions (0,1,2) |
group-C | Consumer 3 | Reads all partitions (0,1,2) |
In this case:
- Since each consumer belongs to a different group, each group gets a full copy of all messages.
- Consumers in different groups do not compete for messages; they each process all messages independently.
- This allows multiple applications to consume the same Kafka topic without interfering with each other.
Key Takeaways about consumer group
Scenario | Partition Sharing | Message Duplication |
---|---|---|
Same Consumer Group | Consumers share partitions (each message processed by only one consumer) | ❌ No duplication |
Different Consumer Groups | Each group reads all partitions (each message is processed by each group) | ✅ Messages are duplicated across groups |
How Kafka Works
Kafka follows a Producer → Broker → Consumer pipeline.
-
Producer Sends Messages
- The producer connects to a Kafka broker.
- Messages are published to a topic.
- Kafka assigns messages to partitions.
- Messages are sent in batches to improve efficiency.
-
Broker Stores Messages
- Kafka brokers store messages persistently.
- Each message gets an incremental offset.
- Data is replicated across multiple brokers for fault tolerance.
- Brokers retain messages for a configurable period (default: 7 days).
-
Consumer Reads Messages
- Consumers subscribe to topics and request data from brokers.
- Kafka keeps track of the consumer's offset (position in the log).
- Consumers can auto-commit or manually commit offsets to track progress.
- If a consumer crashes, Kafka rebalances partitions to active consumers.
How Kafka Handles Message Ordering
Kafka guarantees message order within a partition, but not across partitions.
Partition-Level Ordering
- Each partition maintains a strict order.
- Messages in a partition are appended sequentially.
- Consumers read messages in the same order they were produced.
Global Ordering Across Partitions
- If a topic has multiple partitions, Kafka does not guarantee global ordering.
- Messages from different partitions may be processed in parallel, leading to out-of-order delivery.
Key-Based Partitioning
- Messages with the same key are always sent to the same partition.
- Ensures messages related to a specific key are processed in order.
Consumer Group Behavior
- A consumer group distributes partitions among multiple consumers.
- Each partition is assigned to only one consumer at a time.
- Messages within a partition are read sequentially, but messages across partitions may be out of order.
Rebalancing Impact
- When a new consumer joins or leaves a group, Kafka reassigns partitions.
- During rebalancing, there may be a short delay in message processing.
- However, partition-level order is preserved.
Replication and Ordering
- Kafka follows a leader-follower replication model.
- Producers write messages to the leader partition.
- Followers replicate the data for fault tolerance.
- If a leader broker fails, a follower is promoted as the new leader.
- A short delay may occur during leader election, but message order remains intact.
Now, let's summarize everything we have covered so far.
Component | Description |
---|---|
Broker | Stores and manages messages in partitions. |
Producer | Sends messages to Kafka topics. |
Consumer | Reads messages from topics. |
Partition | A segment of a topic that enables parallel processing. |
Offset | A unique identifier for each message within a partition. |
Consumer Group | A set of consumers that share the workload of processing messages from a topic. |
Building TopicManager and Partition Classes
Now that we have a solid understanding of how Kafka works and the flow of data from a producer to a consumer through topics and partitions, it's time to translate this knowledge into code.
In this section, we will design and implement two fundamental classes:
- TopicManager – This class will be responsible for managing topics, partitions, producers, and consumers.
- Partition – This class will represent an individual partition, where messages are stored and retrieved.
Why Do We Need a TopicManager?
In Kafka, a topic is a logical grouping of messages, while partitions allow for parallel processing and scalability. Our TopicManager class will act as a central hub, handling:
- Creating and managing topics
- Distributing messages across partitions
- Registering producers and consumers
- Coordinating message retrieval for consumers
- By building this class, we are essentially simulating Kafka's topic and partition management system, giving us deeper insight into how Kafka efficiently handles data flow.
The Role of the Partition Class
Each topic in Kafka is divided into multiple partitions to achieve fault tolerance and parallel processing. The Partition class will:
- Store messages in an ordered sequence
- Maintain an offset to track message positions
- Support reading and writing operations
In the next steps, we will implement these classes step by step, ensuring that our design mimics how Kafka works under the hood.
Defining Variables for TopicManager
Before implementing the logic, let's first define the key data structures that will help us manage topics and partitions efficiently:
private final Map<String, List<Partition>> topicPartitions;
final Map<String, Map<String, List<Integer>>> consumerGroupAssignments;
private final Map<String, Set<String>> topicSubscriptions;
private final Map<String, Map<String, Map<Integer, ReentrantLock>>> consumerPartitionLocks;
private final int defaultPartitionCount;
Understanding Each Variable
-
topicPartitions (
Map<String, List<Partition>>
)- Stores all topics and their corresponding partitions.
- Helps us quickly retrieve partitions when producers send messages or consumers read them.
-
consumerGroupAssignments (
Map<String, Map<String, List<Integer>>>
)- Key: Consumer Group ID → Value: Map of Consumer ID → Assigned Partition IDs
- Ensures that no two consumers within the same group read the same partition.
-
topicSubscriptions (
Map<String, Set<String>>
)- Tracks which consumer groups are subscribed to which topics.
- Helps during consumer rebalancing when new consumers join.
-
consumerPartitionLocks (
Map<String, Map<String, Map<Integer, ReentrantLock>>>
)- Prevents multiple consumers from reading the same partition at the same time.
- Ensures message order and prevents race conditions.
-
defaultPartitionCount (
int
)- Defines how many partitions a topic will have by default.
- Helps distribute messages evenly across partitions.
Implementing the Constructor
Now that we've defined our variables, let's initialize them inside our constructor. We'll use ConcurrentHashMap
to ensure thread
safety in a multi-threaded environment (just like Kafka does).
class TopicManager {
private final Map<String, List<Partition>> topicPartitions;
final Map<String, Map<String, List<Integer>>> consumerGroupAssignments;
private final Map<String, Set<String>> topicSubscriptions;
private final Map<String, Map<String, Map<Integer, ReentrantLock>>> consumerPartitionLocks;
private final int defaultPartitionCount;
public TopicManager(int partitionCount) {
this.topicPartitions = new ConcurrentHashMap<>();
this.consumerGroupAssignments = new ConcurrentHashMap<>();
this.topicSubscriptions = new ConcurrentHashMap<>();
this.consumerPartitionLocks = new ConcurrentHashMap<>();
this.defaultPartitionCount = partitionCount;
}
}
We initialize each data structure with ConcurrentHashMap for thread safety. The defaultPartitionCount is set via the constructor, allowing us to configure the number of partitions per topic. Now that we've built the TopicManager, let's create an instance of it and set up our default partitions:
TopicManager topicManger = new TopicManager(4) // Default is three
Every topic we create will have 4 partitions unless specified otherwise. A higher partition count allows for better parallelism when multiple consumers process data. However, having too many partitions increases overhead, so we need to find a balance.
Defining Variables for Partition class
In Kafka, a partition is a fundamental unit of storage within a topic. It ensures that messages are stored in an ordered sequence and allows parallel processing by multiple consumers.
Now, let's build the Partition class, which will be responsible for:
- Storing messages in order
- Tracking consumer offsets for efficient retrieval
- Ensuring thread safety when multiple producers and consumers interact
Before writing the constructor, let's first define the core variables needed to manage a partition effectively:
private final String topic;
private final int partitionId;
private final List<String> messages;
private final Map<String, Integer> groupOffsets;
private final ReentrantLock lock;
Understanding Each Variable of partition class
- topic (
String
)- Stores the name of the topic to which this partition belongs.
- Helps in identifying which topic this partition is associated with.
- partitionId (
int
)- Represents the partition ID within a topic.
- Each topic consists of multiple partitions, and each partition has a unique ID for easy identification.
- messages (
List<String>
)- Stores all messages added to this partition.
- Messages are added in sequential order, ensuring they can be retrieved in the same order.
- groupOffsets (
Map<String, Integer>
)- Key: Consumer Group ID → Value: Last Read Offset
- Tracks where each consumer group left off, ensuring that they start reading from the correct position.
- lock (
ReentrantLock
)- Ensures thread safety when multiple producers or consumers interact with this partition.
- Prevents race conditions when adding or consuming messages.
Implementing the Constructor
Now that we've defined the variables, let's initialize them in our constructor:
class Partition {
private final String topic;
private final int partitionId;
private final List<String> messages;
private final Map<String, Integer> groupOffsets;
private final ReentrantLock lock;
public Partition(String topic, int partitionId) {
this.topic = topic;
this.partitionId = partitionId;
this.messages = new ArrayList<>();
this.groupOffsets = new HashMap<>();
this.lock = new ReentrantLock();
}
}
this.topic = topic;
→ Associates the partition with a specific topic.this.partitionId = partitionId;
→ Assigns a unique partition ID for easy management.this.messages = new ArrayList<>();
→ Initializes an empty list to store messages in order.this.groupOffsets = new HashMap<>();
→ Creates a map to track each consumer group's last read position.this.lock = new ReentrantLock();
→ Ensures safe access to partition data in a multi-threaded environment.
Now that we've built the Partition class, let's create an instance of it:
Partition partition = new Partition("topic12", "1");
A new partition is created for topic12 with partition ID 1. It is empty at first, but producers can add messages, and consumers can retrieve them.
Adding method to partition class
Now that we have defined our Partition class, it's time to implement the core functionality:
- Adding messages into the partition
- Consuming messages sequentially
- Tracking consumer progress using offsets
- Ensuring thread safety to handle multiple producers and consumers
Step 1: Implementing addMessage - Storing Messages in the Partition
When a producer sends a message, it needs to be added to the correct partition. Since multiple producers might write to the same partition simultaneously, we use a lock to ensure safe modification.
public void addMessage(String message) {
lock.lock(); // Acquire lock to ensure thread safety
try {
messages.add(message); // Add the message to the partition's message list
} finally {
lock.unlock(); // Release the lock to allow other operations
}
}
- The lock.lock() ensures that only one producer at a time can modify the partition.
- The message is added to the end of the list, maintaining order.
- The lock.unlock() releases the lock after adding the message, allowing other operations to
Step 2: Implementing consumeMessage - Retrieving Messages for Consumers
Now, let's implement message consumption. A consumer reads messages from the partition and updates its offset, so it doesn’t read the same message again.
public String consumeMessage(String groupId) {
lock.lock(); // Ensure only one consumer modifies the offset at a time
try {
int offset = groupOffsets.getOrDefault(groupId, 0); // Get the last read position
if (offset >= messages.size()) { // If there are no new messages, return null
return null;
}
String message = messages.get(offset); // Retrieve the next unread message
groupOffsets.put(groupId, offset + 1); // Update the offset to mark it as read
return message;
} finally {
lock.unlock(); // Release lock after consumption
}
}
- Retrieve the consumer group’s offset to find the last read position.
- If the offset exceeds the total messages, return null (no new messages).
- Otherwise, return the next unread message and increment the offset so the consumer doesn’t read the same message again.
Step 3: Implementing hasMessages - Checking for Unread Messages
Before a consumer tries to consume a message, it should check whether there are unread messages. This method prevents unnecessary polling and optimizes consumer behavior.
public boolean hasMessages(String groupId) {
lock.lock(); // Ensure safe access to offsets
try {
int offset = groupOffsets.getOrDefault(groupId, 0); // Get the last read position
return offset < messages.size(); // Return true if there are unread messages
} finally {
lock.unlock(); // Release lock
}
}
- Retrieve the current offset of the consumer group.
- Compare it with messages.size() → If the offset is less, there are unread messages.
- This helps in optimizing consumer polling, preventing unnecessary retries.
Final Partition Class
Now, let's put everything together into the final version of our Partition class:
class Partition {
private final String topic;
private final int partitionId;
private final List<String> messages;
private final Map<String, Integer> groupOffsets;
private final ReentrantLock lock;
public Partition(String topic, int partitionId) {
this.topic = topic;
this.partitionId = partitionId;
this.messages = new ArrayList<>();
this.groupOffsets = new HashMap<>();
this.lock = new ReentrantLock();
}
public void addMessage(String message) {
lock.lock();
try {
messages.add(message);
} finally {
lock.unlock();
}
}
public String consumeMessage(String groupId) {
lock.lock();
try {
int offset = groupOffsets.getOrDefault(groupId, 0);
if (offset >= messages.size()) {
return null;
}
String message = messages.get(offset);
groupOffsets.put(groupId, offset + 1);
return message;
} finally {
lock.unlock();
}
}
public boolean hasMessages(String groupId) {
lock.lock();
try {
int offset = groupOffsets.getOrDefault(groupId, 0);
return offset < messages.size();
} finally {
lock.unlock();
}
}
public int getPartitionId() {
return partitionId;
}
}
Adding method to TopicManager class
The TopicManager class is the core of our messaging system. It handles topics, partitions, and message distribution while ensuring that consumers receive messages efficiently and fairly.
We'll go through each method one by one, understanding what they do and why they are important.
Adding Messages to a Topic (addMessage method)
The first method is createTopic
. It creates a new topic if it doesn’t exist. It also Initializes the default number of partitions.
public void createTopic(String topic) {
topicPartitions.putIfAbsent(topic, new ArrayList<>());
if (topicPartitions.get(topic).isEmpty()) {
List<Partition> partitions = new ArrayList<>();
for (int i = 0; i < defaultPartitionCount; i++) {
partitions.add(new Partition(topic, i));
}
topicPartitions.put(topic, partitions);
}
}
Adding Messages to a Topic (addMessage method)
addMessage
method adds a message to a topic. It Uses consistent hashing (key.hashCode()) to always send the same key to the same partition.
public void addMessage(String topic, String key, String message) {
createTopic(topic); // First, we ensure the topic exists by calling createTopic(topic).
List<Partition> partitions = topicPartitions.get(topic);
// Use a consistent hashing mechanism to ensure same keys go to same partitions
int partition = Math.abs(key.hashCode() % defaultPartitionCount);
partitions.get(partition).addMessage(message);
}
consumeMessage to a Topic (consumeMessage method)
consumeMessage
Finds the partitions assigned to the given consumer and it reads the next available message from one of those partitions.
First, we find the partitions assigned to the consumer using getAssignedPartitions(topic, groupId, consumerId) and then
We acquire locks on partitions to ensure that no two consumers read the same message at the same time.We read the next available message
from the assigned partitions.
public String consumeMessage(String topic, String groupId, String consumerId) {
if (!topicPartitions.containsKey(topic)) {
return null;
}
List<Integer> assignedPartitions = getAssignedPartitions(topic, groupId, consumerId);
Map<Integer, ReentrantLock> partitionLocks = getConsumerPartitionLocks(topic, groupId, consumerId);
// Iterate over assigned partitions in order and block until message is read
for (Integer partitionId : assignedPartitions) {
ReentrantLock partitionLock = partitionLocks.get(partitionId);
partitionLock.lock(); // Ensure message order by blocking
try {
Partition partition = topicPartitions.get(topic).get(partitionId);
if (partition.hasMessages(groupId)) {
return partition.consumeMessage(groupId);
}
} finally {
partitionLock.unlock();
}
}
return null;
}
getConsumerPartitionLocks to a Topic (getConsumerPartitionLocks method)
getConsumerPartitionLocks
returns the locks for the partitions assigned to a specific consumer. It Ensures that only one consumer reads a
partition at a time.
private Map<Integer, ReentrantLock> getConsumerPartitionLocks(String topic, String groupId, String consumerId) {
consumerPartitionLocks.putIfAbsent(topic, new ConcurrentHashMap<>());
Map<String, Map<Integer, ReentrantLock>> topicLocks = consumerPartitionLocks.get(topic);
String consumerKey = groupId + "-" + consumerId;
topicLocks.putIfAbsent(consumerKey, new ConcurrentHashMap<>());
Map<Integer, ReentrantLock> partitionLocks = topicLocks.get(consumerKey);
List<Integer> assignedPartitions = getAssignedPartitions(topic, groupId, consumerId);
// Ensure we have locks for all assigned partitions
for (Integer partitionId : assignedPartitions) {
partitionLocks.putIfAbsent(partitionId, new ReentrantLock());
}
return partitionLocks;
}
getAssignedPartitions to a Topic (getAssignedPartitions method)
getAssignedPartitions
returns which partitions a consumer is assigned to. If the consumer has no assignments, it triggers a rebalance.
public List<Integer> getAssignedPartitions(String topic, String groupId, String consumerId) {
consumerGroupAssignments.putIfAbsent(groupId, new ConcurrentHashMap<>());
Map<String, List<Integer>> groupAssignments = consumerGroupAssignments.get(groupId);
groupAssignments.putIfAbsent(consumerId, new ArrayList<>());
// Keep track of topic subscriptions for the group
topicSubscriptions.putIfAbsent(topic, new HashSet<>());
topicSubscriptions.get(topic).add(groupId);
// If empty or we explicitly need a rebalance
if (groupAssignments.get(consumerId).isEmpty()) {
rebalanceGroup(topic, groupId);
}
return groupAssignments.get(consumerId);
}
subscribeToTopic to a Topic (subscribeToTopic method)
subscribeToTopic
allows a consumer to join a topic. It Ensures that partitions are rebalanced among all consumers in the group.
public void subscribeToTopic(String topic, String groupId, String consumerId) {
// Create topic if it doesn't exist
createTopic(topic);
// Register this group for the topic
topicSubscriptions.putIfAbsent(topic, new HashSet<>());
topicSubscriptions.get(topic).add(groupId);
// Ensure consumer is in the group
consumerGroupAssignments.putIfAbsent(groupId, new ConcurrentHashMap<>());
consumerGroupAssignments.get(groupId).putIfAbsent(consumerId, new ArrayList<>());
// Force rebalance for this topic and group
rebalanceGroup(topic, groupId);
}
rebalanceGroup to a Topic (rebalanceGroup method)
When new consumers join, we need to redistribute partitions evenly.
rebalanceGroup
distributes partitions evenly among consumers in a group. It Ensures fair partition assignment when new consumers join.
public void rebalanceGroup(String topic, String groupId) {
Map<String, List<Integer>> groupAssignments = consumerGroupAssignments.get(groupId);
List<String> consumers = new ArrayList<>(groupAssignments.keySet());
Collections.sort(consumers);
// Get actual partitions for the topic (not the default!)
int numPartitions = topicPartitions.get(topic).size();
// Clear existing assignments for this topic only
for (String consumer : consumers) {
// Remove only assignments for this topic
groupAssignments.put(consumer, new ArrayList<>());
}
// Assign partitions to consumers in round-robin
int consumerIndex = 0;
for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
if (consumers.isEmpty()) break;
String consumer = consumers.get(consumerIndex % consumers.size());
groupAssignments.get(consumer).add(partitionId);
consumerIndex++;
}
System.out.println("REBALANCE for topic " + topic + " and group " + groupId);
}
Final Partition Class
Now, let's put everything together into the final version of our Partition class:
class TopicManager {
private final Map<String, List<Partition>> topicPartitions;
final Map<String, Map<String, List<Integer>>> consumerGroupAssignments;
private final Map<String, Set<String>> topicSubscriptions; // Track topic -> groups that are subscribed
private final Map<String, Map<String, Map<Integer, ReentrantLock>>> consumerPartitionLocks;
private final int defaultPartitionCount;
public TopicManager(int partitionCount) {
this.topicPartitions = new ConcurrentHashMap<>();
this.consumerGroupAssignments = new ConcurrentHashMap<>();
this.topicSubscriptions = new ConcurrentHashMap<>();
this.consumerPartitionLocks = new ConcurrentHashMap<>();
this.defaultPartitionCount = partitionCount;
}
public void createTopic(String topic) {
topicPartitions.putIfAbsent(topic, new ArrayList<>());
if (topicPartitions.get(topic).isEmpty()) {
List<Partition> partitions = new ArrayList<>();
for (int i = 0; i < defaultPartitionCount; i++) {
partitions.add(new Partition(topic, i));
}
topicPartitions.put(topic, partitions);
}
}
public String listTopics() {
StringBuilder result = new StringBuilder();
for (Map.Entry<String, List<Partition>> entry : topicPartitions.entrySet()) {
String topic = entry.getKey();
List<Partition> partitions = entry.getValue();
result.append(topic).append(" (Partitions: ").append(partitions.size()).append(") [");
for (Partition partition : partitions) {
result.append("P").append(partition.getPartitionId()).append(", ");
}
if (!partitions.isEmpty()) {
result.setLength(result.length() - 2);
}
result.append("]\n");
}
return result.toString().trim();
}
public void addMessage(String topic, String key, String message) {
createTopic(topic);
List<Partition> partitions = topicPartitions.get(topic);
// Use a consistent hashing mechanism to ensure same keys go to same partitions
int partition = Math.abs(key.hashCode() % defaultPartitionCount);
partitions.get(partition).addMessage(message);
}
public String consumeMessage(String topic, String groupId, String consumerId) {
if (!topicPartitions.containsKey(topic)) {
return null;
}
List<Integer> assignedPartitions = getAssignedPartitions(topic, groupId, consumerId);
Map<Integer, ReentrantLock> partitionLocks = getConsumerPartitionLocks(topic, groupId, consumerId);
// Iterate over assigned partitions in order and block until message is read
for (Integer partitionId : assignedPartitions) {
ReentrantLock partitionLock = partitionLocks.get(partitionId);
partitionLock.lock(); // Ensure message order by blocking
try {
Partition partition = topicPartitions.get(topic).get(partitionId);
if (partition.hasMessages(groupId)) {
return partition.consumeMessage(groupId);
}
} finally {
partitionLock.unlock();
}
}
return null;
}
private Map<Integer, ReentrantLock> getConsumerPartitionLocks(String topic, String groupId, String consumerId) {
consumerPartitionLocks.putIfAbsent(topic, new ConcurrentHashMap<>());
Map<String, Map<Integer, ReentrantLock>> topicLocks = consumerPartitionLocks.get(topic);
String consumerKey = groupId + "-" + consumerId;
topicLocks.putIfAbsent(consumerKey, new ConcurrentHashMap<>());
Map<Integer, ReentrantLock> partitionLocks = topicLocks.get(consumerKey);
List<Integer> assignedPartitions = getAssignedPartitions(topic, groupId, consumerId);
// Ensure we have locks for all assigned partitions
for (Integer partitionId : assignedPartitions) {
partitionLocks.putIfAbsent(partitionId, new ReentrantLock());
}
return partitionLocks;
}
public List<Integer> getAssignedPartitions(String topic, String groupId, String consumerId) {
consumerGroupAssignments.putIfAbsent(groupId, new ConcurrentHashMap<>());
Map<String, List<Integer>> groupAssignments = consumerGroupAssignments.get(groupId);
groupAssignments.putIfAbsent(consumerId, new ArrayList<>());
// Keep track of topic subscriptions for the group
topicSubscriptions.putIfAbsent(topic, new HashSet<>());
topicSubscriptions.get(topic).add(groupId);
// If empty or we explicitly need a rebalance
if (groupAssignments.get(consumerId).isEmpty()) {
rebalanceGroup(topic, groupId);
}
return groupAssignments.get(consumerId);
}
public void subscribeToTopic(String topic, String groupId, String consumerId) {
// Create topic if it doesn't exist
createTopic(topic);
// Register this group for the topic
topicSubscriptions.putIfAbsent(topic, new HashSet<>());
topicSubscriptions.get(topic).add(groupId);
// Ensure consumer is in the group
consumerGroupAssignments.putIfAbsent(groupId, new ConcurrentHashMap<>());
consumerGroupAssignments.get(groupId).putIfAbsent(consumerId, new ArrayList<>());
// Force rebalance for this topic and group
rebalanceGroup(topic, groupId);
}
public void rebalanceGroup(String topic, String groupId) {
Map<String, List<Integer>> groupAssignments = consumerGroupAssignments.get(groupId);
List<String> consumers = new ArrayList<>(groupAssignments.keySet());
Collections.sort(consumers);
// Get actual partitions for the topic (not the default!)
int numPartitions = topicPartitions.get(topic).size();
// Clear existing assignments for this topic only
for (String consumer : consumers) {
// Remove only assignments for this topic
groupAssignments.put(consumer, new ArrayList<>());
}
// Assign partitions to consumers in round-robin
int consumerIndex = 0;
for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
if (consumers.isEmpty()) break;
String consumer = consumers.get(consumerIndex % consumers.size());
groupAssignments.get(consumer).add(partitionId);
consumerIndex++;
}
// Debug output
System.out.println("REBALANCE for topic " + topic + " and group " + groupId);
for (String consumer : consumers) {
System.out.println(" Consumer " + consumer + " assigned: " + groupAssignments.get(consumer));
}
}
}
Now that we have our TopicManager and Partition class is ready, We can now move to building KafkaBroker, KafkaConsumer and KafkaProducer class. Before going in actual implementation of these kafka class, we first need to know how producer, broker and consumer talk to each other.
Deep Dive into Kafka Producer, Broker, and Consumer Communication
Kafka follows a distributed messaging system where producers, brokers, and consumers communicate efficiently using the Kafka protocol over optimized TCP connections. Let’s break it down in more detail.
Producer-to-Broker Communication (Push Model with Optimizations)
The producer is responsible for sending messages to the broker. However, unlike a pure push model, Kafka producers follow an optimized batching mechanism to enhance performance. Here’s how it works:
-
Persistent TCP Connection
- A Kafka producer establishes a long-lived TCP connection with the Kafka broker to reduce the overhead of repeated connections.
- The connection is maintained across multiple messages to optimize resource usage.
-
Message Batching and Linger Time
- Kafka does not send every message immediately; instead, it batches messages to optimize network throughput.
- The
linger.ms
configuration controls how long the producer should wait before sending a batch. - If a batch fills up before
linger.ms
expires, it is sent immediately.
-
Acknowledgments (
acks
)
Kafka producers can send messages in three different acknowledgment modes:
acks=0
: Fire-and-forget (no broker acknowledgment, fastest but unreliable).acks=1
: The leader broker acknowledges receipt (default, moderate reliability).acks=all
: The leader and all in-sync replicas acknowledge (highest reliability, but slower).
-
Partitioning and Routing
- Producers decide which partition a message should go to based on partition keys or Kafka’s internal partitioning strategy.
- Once the partition is determined, the producer sends the message to the correct broker responsible for that partition.
-
Error Handling and Retries
- If a message fails due to a network issue or a broker being unavailable, the producer can retry sending messages based on the
retries
configuration. - The
retry.backoff.ms
setting controls the time between retries.
- If a message fails due to a network issue or a broker being unavailable, the producer can retry sending messages based on the
Consumer-to-Broker Communication (Polling Model with Long Polling)
Kafka consumers do not receive messages automatically. Instead, they poll the broker at regular intervals to fetch available messages.
-
Long-Lived TCP Connection
- Just like producers, consumers maintain a persistent TCP connection with brokers to reduce connection overhead.
-
Polling Mechanism (
poll()
)- The consumer must call
poll()
periodically to fetch new messages. - If no new messages are available, the broker waits before responding (this is called long polling), reducing the number of empty responses.
- The consumer must call
-
Long Polling (
fetch.min.bytes
&fetch.max.wait.ms
)- Consumers avoid unnecessary requests by waiting for at least one message before returning data.
- The
fetch.min.bytes
setting ensures the broker accumulates a certain amount of data before responding. - The
fetch.max.wait.ms
setting defines the maximum wait time before the broker responds, even if the data size is small.
-
Consumer Groups and Partition Assignment
- Consumers belong to consumer groups to balance the workload across multiple instances.
- Each partition is assigned to only one consumer in the group at a time to prevent duplicate processing.
-
Heartbeat and Rebalancing
- Consumers send heartbeats (
heartbeat.interval.ms
) to let the broker know they are alive. - If a consumer fails to send a heartbeat within
session.timeout.ms
, it is removed from the group, and Kafka triggers a rebalance to redistribute partitions among active consumers.
- Consumers send heartbeats (
-
Commit Offset Mechanism
- Consumers keep track of the last processed message offset in Kafka.
- Offsets can be committed automatically (
enable.auto.commit=true
) or manually to prevent message loss.
Summary of Communication Flow
- Producers push messages to brokers over a persistent TCP connection, using batching, acknowledgments, and retries.
- Brokers store messages in partitions and manage leader-follower replication for fault tolerance.
- Consumers pull messages using a polling model with long polling, ensuring efficient data retrieval.
- Heartbeat mechanisms and rebalancing handle consumer failures dynamically.
With this understanding, we can now proceed to implement the KafkaBroker, KafkaConsumer, and KafkaProducer classes.
KafkaBroker Class (Main Server)
This class initializes the Kafka-like broker and manages client connections for producers and consumers. Upon execution, it starts listening for incoming TCP connections on port 9092. Additionally, it creates a global TopicManager instance (as defined in the previous section) with three default partitions. This default partition size you can customized based on your requirements.
public class KafkaBroker {
private static final int PORT = 9092;
private static final TopicManager topicManager = new TopicManager(3);
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(PORT);
System.out.println("Broker is running on port " + PORT);
while (true) {
try {
Socket clientSocket = serverSocket.accept();
System.out.println("New client connected: " + clientSocket);
new ClientHandler(clientSocket, topicManager).start();
} catch (IOException e) {
System.err.println("Error accepting client connection: " + e.getMessage());
}
}
}
}
Starting the Server:
ServerSocket serverSocket = new ServerSocket(PORT);
System.out.println("Broker is running on port " + PORT);
Creates a server socket → Listens for incoming connections on PORT 9092.
Handling Clients (Producers & Consumers)
while (true) {
try {
Socket clientSocket = serverSocket.accept();
System.out.println("New client connected: " + clientSocket);
new ClientHandler(clientSocket, topicManager).start();
} catch (IOException e) {
System.err.println("Error accepting client connection: " + e.getMessage());
}
}
serverSocket.accept() → Waits for a client (producer/consumer) to connect. Creates a new thread (ClientHandler) to handle that client.
Why use threads?
Each client needs independent processing. So, we run a new ClientHandler for each client.
ClientHandler Class (Handles Each Client)
Each connected client (producer/consumer) is handled by this class.
This ClientHandler
class is a thread that handles communication between a client and a broker server.
public class ClientHandler extends Thread {
private final Socket socket;
private final TopicManager topicManager;
public ClientHandler(Socket socket, TopicManager topicManager) {
this.socket = socket;
this.topicManager = topicManager;
}
public void run() {
try (BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriter out = new PrintWriter(socket.getOutputStream(), true)) {
out.println("CONNECTED: Welcome to KafkaLikeBroker");
String input;
String currentGroupId = null;
String currentConsumerId = null;
while ((input = in.readLine()) != null) {
if (input.startsWith("REGISTER:")) {
String[] parts = input.split(":", 3);
currentGroupId = parts[1];
currentConsumerId = parts[2];
// Add consumer to group and trigger rebalance
topicManager.consumerGroupAssignments
.computeIfAbsent(currentGroupId, k -> new ConcurrentHashMap<>())
.putIfAbsent(currentConsumerId, new ArrayList<>());
out.println("REGISTERED: Consumer " + currentConsumerId + " in group " + currentGroupId);
} else if (input.startsWith("SUBSCRIBE:")) {
String[] parts = input.split(":", 2);
if (parts.length < 2) {
out.println("ERROR: Invalid subscribe format. Use SUBSCRIBE:topic");
continue;
}
if (currentGroupId == null || currentConsumerId == null) {
out.println("ERROR: Consumer must be registered before subscribing");
continue;
}
String topic = parts[1];
// Handle subscription and trigger rebalance
topicManager.subscribeToTopic(topic, currentGroupId, currentConsumerId);
out.println("SUBSCRIBED: to topic " + topic + " with group " + currentGroupId);
} else if (input.startsWith("PRODUCE:")) {
String[] parts = input.split(":", 4);
if (parts.length < 4) {
out.println("ERROR: Invalid produce format. Use PRODUCE:topic:key:message");
continue;
}
String topic = parts[1];
String key = parts[2];
String message = parts[3];
topicManager.addMessage(topic, key, message);
out.println("ACK: Message stored in topic " + topic);
} else if (input.startsWith("CONSUME:")) {
String[] parts = input.split(":", 4);
if (parts.length < 4) {
out.println("ERROR: Invalid consume format. Use CONSUME:topic:groupId:consumerId");
continue;
}
String topic = parts[1];
String groupId = parts[2];
String consumerId = parts[3];
String message = topicManager.consumeMessage(topic, groupId, consumerId);
out.println("MESSAGE: " + (message != null ? message : "NO_MESSAGES"));
} else if (input.startsWith("DISCONNECT:")) {
String[] parts = input.split(":", 3);
if (parts.length >= 3) {
currentGroupId = parts[1];
currentConsumerId = parts[2];
}
out.println("DISCONNECTED: Goodbye!");
socket.close();
break;
} else if(input.startsWith("CREATE-TOPIC:")) {
String[] parts = input.split(":", 2);
String topicName = parts[1];
System.out.println("TOPIC NAME from createTopic: "+topicName);
topicManager.createTopic(topicName);
out.println("ACK: TOPIC CREATED");
} else if(input.startsWith("LIST-TOPIC:")) {
String topicMetaData = topicManager.listTopics();
if (topicMetaData.isEmpty()) {
out.println("TOPIC-METADATA: No topics available");
} else {
out.println("TOPIC-METADATA: " + topicMetaData);
}
} else {
out.println("ERROR: Unknown command");
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
Lets discuss each part in details of this ClientHandler class:
Constructor
public ClientHandler(Socket socket, TopicManager topicManager) {
this.socket = socket;
this.topicManager = topicManager;
}
Whenever we create a object fron this instance it stores socket (connection) and topicManager (manages topics/messages).
Handling Client Requests
try (BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriter out = new PrintWriter(socket.getOutputStream(), true)) {
- in (BufferedReader) → Reads incoming data from the client via the socket's input stream. It wraps an InputStreamReader, which converts byte streams into character streams, making it easier to handle text-based communication.
- out (PrintWriter) → Sends data to the client via the socket's output stream. The true parameter enables automatic flushing, ensuring that messages are immediately sent without requiring an explicit flush() call. This makes communication more efficient, especially in interactive applications.
Understanding Commands
In our Kafka implementation, the producer and consumer will communicate with the broker using a command-based protocol. Each client (producer or consumer) sends commands in a structured format:
COMMAND:argument1:argument2:...
- COMMAND → Specifies the action to be performed, such as PUBLISH, SUBSCRIBE, or FETCH.
- argument1, argument2, ... → Provide additional details required for the command, such as the topic name, message payload, or consumer group ID.
This format ensures a standardized and efficient way for clients to interact with the broker, making message transmission and processing straightforward.
For each command, the TopicManager handles the underlying operations behind the scenes. It processes the command, manages topics, and ensures proper message flow between producers, consumers, and the broker.
- When a producer sends a message, TopicManager routes it to the appropriate topic.
- When a consumer subscribes or fetches messages, TopicManager retrieves the relevant data efficiently.
Register Consumer
if (input.startsWith("REGISTER:")) {
String[] parts = input.split(":", 3);
currentGroupId = parts[1];
currentConsumerId = parts[2];
topicManager.consumerGroupAssignments
.computeIfAbsent(currentGroupId, k -> new ConcurrentHashMap<>())
.putIfAbsent(currentConsumerId, new ArrayList<>());
out.println("REGISTERED: Consumer " + currentConsumerId + " in group " + currentGroupId);
}
A consumer registers itself to a consumer group using "REGISTER:" + groupId + ":" + consumerId
this command.
The broker stores this consumer inside consumerGroupAssignments.
Subscribe to a Topic
A consumer subscribes to a topic using "SUBSCRIBE:" + topic
command.
The broker associates this consumer with the topic.
if (input.startsWith("SUBSCRIBE:")) {
String[] parts = input.split(":", 2);
if (currentGroupId == null || currentConsumerId == null) {
out.println("ERROR: Consumer must be registered before subscribing");
continue;
}
String topic = parts[1];
topicManager.subscribeToTopic(topic, currentGroupId, currentConsumerId);
out.println("SUBSCRIBED: to topic " + topic + " with group " + currentGroupId);
}
Produce a Message
A producer sends a message to a topic using PRODUCE:topic:key:message
this command.
if (input.startsWith("PRODUCE:")) {
String[] parts = input.split(":", 4);
String topic = parts[1];
String key = parts[2];
String message = parts[3];
topicManager.addMessage(topic, key, message);
out.println("ACK: Message stored in topic " + topic);
}
Consume a Message
A consumer requests a message from a topic using "CONSUME:subscribedTopic:groupId:consumerId
.
if (input.startsWith("CONSUME:")) {
String[] parts = input.split(":", 4);
String topic = parts[1];
String groupId = parts[2];
String consumerId = parts[3];
String message = topicManager.consumeMessage(topic, groupId, consumerId);
out.println("MESSAGE: " + (message != null ? message : "NO_MESSAGES"));
}
Create a Topic
if (input.startsWith("CREATE-TOPIC:")) {
String[] parts = input.split(":", 2);
String topicName = parts[1];
topicManager.createTopic(topicName);
out.println("ACK: TOPIC CREATED");
}
KafkaConsumer Class
The KafkaConsumer is responsible for:
- Connecting to the broker.
- Subscribing to a topic.
- Polling messages from the broker.
- Handling connection and disconnection.
Our consumer class mainly sends REGISTER
, SUBSCRIBE
, CONSUME
, DISCONNECT
command to a broker.
- Fields (Instance Variables)
private Socket socket;
private PrintWriter out;
private BufferedReader in;
private final String consumerId;
private final String groupId;
private String subscribedTopic;
private static final int DEFAULT_POLL_TIMEOUT = 1000;
Explanation of Each Variable:
- socket → Represents the TCP connection between the consumer and the broker, enabling communication over the network.
- out → A PrintWriter used for sending commands and messages from the consumer to the broker.
- in → A BufferedReader used to receive and process responses from the broker.
- consumerId → A unique identifier assigned to each consumer, typically generated using a UUID, ensuring distinct identification within the system.
- groupId → Specifies the consumer group to which this consumer belongs, allowing multiple consumers to coordinate message consumption.
- subscribedTopic → Stores the name of the topic that the consumer is subscribed to, determining which messages it will receive.
- DEFAULT_POLL_TIMEOUT → Defines the default time (in milliseconds) a consumer waits while polling for new messages from the broker before timing out.
Constructor
public KafkaConsumer(String groupId) {
this.consumerId = UUID.randomUUID().toString();
this.groupId = groupId;
}
While creating instance of this KafkaConsumer, We pass unique ID (UUID) and groupId which specified to support consumer groups.
Connecting to broker
public void connect(String host, int port) {
try {
socket = new Socket(host, port);
out = new PrintWriter(socket.getOutputStream(), true);
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
// Send consumer registration
out.println("REGISTER:" + groupId + ":" + consumerId);
String response = in.readLine();
System.out.println("Broker Response: " + response);
} catch (IOException e) {
e.printStackTrace();
}
}
connect
method establishes a TCP connection with the broker. It sends a REGISTER request with this
command REGISTER:<groupId>:<consumerId>
to a broker.
Subscribing to a Topic
public void subscribe(String topic) {
if (socket == null || out == null) {
System.out.println("ERROR: Consumer is not connected to broker!");
return;
}
out.println("SUBSCRIBE:" + topic);
try {
String response = in.readLine();
System.out.println("Subscription Response: " + response);
} catch (IOException e) {
System.out.println("ERROR: Failed to subscribe: " + e.getMessage());
return;
}
this.subscribedTopic = topic;
}
This method Sends a SUBSCRIBE request (SUBSCRIBE:<topic>
). Broker responds to confirm the subscription.
Polling a message
public List<String> poll(long timeoutMs) {
if (subscribedTopic == null) {
throw new IllegalStateException("No topic subscribed. Please subscribe to a topic first.");
}
if (socket == null || out == null) {
throw new IllegalStateException("Consumer is not connected to broker!");
}
List<String> records = new ArrayList<>();
long startTime = System.currentTimeMillis();
try {
while (System.currentTimeMillis() - startTime < timeoutMs) { // It continue to read message from broker untill timeoutMs
out.println("CONSUME:" + subscribedTopic + ":" + groupId + ":" + consumerId);
String response = in.readLine();
if (response != null && !response.contains("NO_MESSAGES")) {
String message = response.replace("MESSAGE: ", "");
records.add(message);
} else {
Thread.sleep(100);
}
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
return records;
}
The poll method is a message consumption mechanism that fetches messages from a broker. It continuously requests messages from a subscribed topic within a given timeout period. If no messages are available, it waits briefly (100ms) and retries until the timeout expires.
disconnect
Sends a DISCONNECT message. Closes the socket.
public void disconnect() {
try {
if (out != null) {
out.println("DISCONNECT:" + groupId + ":" + consumerId);
}
if (socket != null) socket.close();
System.out.println("Consumer disconnected from broker.");
} catch (IOException e) {
e.printStackTrace();
}
}
Here is the full implementation of consumer class:
public class KafkaConsumer {
private Socket socket;
private PrintWriter out;
private BufferedReader in;
private final String consumerId;
private final String groupId;
private String subscribedTopic;
private static final int DEFAULT_POLL_TIMEOUT = 1000; // milliseconds
public KafkaConsumer(String groupId) {
this.consumerId = UUID.randomUUID().toString();
this.groupId = groupId;
}
public void connect(String host, int port) {
try {
socket = new Socket(host, port);
out = new PrintWriter(socket.getOutputStream(), true);
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
// Send consumer registration
out.println("REGISTER:" + groupId + ":" + consumerId);
String response = in.readLine();
System.out.println("Broker Response: " + response);
} catch (IOException e) {
e.printStackTrace();
}
}
public void subscribe(String topic) {
if (socket == null || out == null) {
System.out.println("ERROR: Consumer is not connected to broker!");
return;
}
// Send SUBSCRIBE command to broker - this is the key change
out.println("SUBSCRIBE:" + topic);
try {
String response = in.readLine();
System.out.println("Subscription Response: " + response);
} catch (IOException e) {
System.out.println("ERROR: Failed to subscribe: " + e.getMessage());
return;
}
this.subscribedTopic = topic;
}
public List<String> poll() {
return poll(DEFAULT_POLL_TIMEOUT);
}
public List<String> poll(long timeoutMs) {
if (subscribedTopic == null) {
throw new IllegalStateException("No topic subscribed. Please subscribe to a topic first.");
}
if (socket == null || out == null) {
throw new IllegalStateException("Consumer is not connected to broker!");
}
List<String> records = new ArrayList<>();
long startTime = System.currentTimeMillis();
try {
while (System.currentTimeMillis() - startTime < timeoutMs) {
out.println("CONSUME:" + subscribedTopic + ":" + groupId + ":" + consumerId);
String response = in.readLine();
if (response != null && !response.contains("NO_MESSAGES")) {
String message = response.replace("MESSAGE: ", "");
records.add(message);
} else {
// If no messages, wait a bit before trying again
Thread.sleep(100);
}
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
return records;
}
public void disconnect() {
try {
if (out != null) {
out.println("DISCONNECT:" + groupId + ":" + consumerId);
}
if (socket != null) socket.close();
System.out.println("Consumer disconnected from broker.");
} catch (IOException e) {
e.printStackTrace();
}
}
// Getter for consumerId
public String getConsumerId() {
return consumerId;
}
}
KafkaProducer Class
The KafkaProducer is responsible for:
- Connecting to the broker.
- Sending messages to a topic.
- Creating topics.
- Listing available topics.
- Handling connection retries.
Our producer class mainly sends PRODUCE
, CREATE-TOPIC
, DISCONNECT
command to a broker.
Fields (Instance Variables) of KafkaProducer
private static final String HOST = "localhost";
private static final int PORT = 9092;
private static final int DEFAULT_RETRIES = 3;
private static final long RETRY_BACKOFF_MS = 1000;
private final Properties properties;
private Socket socket;
private PrintWriter out;
private BufferedReader in;
private boolean connected = false;
Explanation of Each Variable:
- HOST & PORT: Defaults to localhost:9092.
- DEFAULT_RETRIES: Max retries for connection.
- RETRY_BACKOFF_MS: Delay before retrying.
- properties: Stores configuration settings.
- connected: Tracks connection status.
Connecting to the Broker
connect
method helps us in connecting to a broker. If it fails then retries connection 3(DEFAULT_RETRIES) times before failing.
public void connect() throws IOException {
if (connected) return;
int retries = 0;
while (retries < DEFAULT_RETRIES) {
try {
socket = new Socket(HOST, PORT);
out = new PrintWriter(socket.getOutputStream(), true);
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String welcome = in.readLine();
if (welcome != null && welcome.startsWith("CONNECTED")) {
connected = true;
System.out.println("Connected to KafkaLikeBroker");
return;
}
} catch (IOException e) {
retries++;
Thread.sleep(RETRY_BACKOFF_MS);
}
}
}
Sending a message
This method sends a PRODUCE request (PRODUCE:<topic>:<key>:<message>
). key is required here as it helps partition strategy(which partition to send).
public boolean send(String topic, String key, String message) throws IOException {
if (!connected) throw new IllegalStateException("Producer is not connected to broker");
if (topic == null || topic.trim().isEmpty()) throw new IllegalArgumentException("Topic cannot be null or empty");
if (key == null || key.trim().isEmpty()) throw new IllegalArgumentException("Key cannot be null or empty");
try {
out.println("PRODUCE:" + topic + ":" + key + ":" + message);
String response = in.readLine();
return response != null && response.startsWith("ACK");
} catch (IOException e) {
reconnect();
return send(topic, key, message);
}
}
Creating a Topic
Sends a CREATE-TOPIC request (CREATE-TOPIC:<topicName>
).
public boolean createTopic(String topicName) throws IOException {
out.println("CREATE-TOPIC:" + topicName);
String response = in.readLine();
return response != null && response.startsWith("ACK");
}
disconnect method
public void disconnect() {
out.println("DISCONNECT");
connected = false;
}
Here is the full implementation of KafkaConsumer class
public class KafkaProducer {
private static final String HOST = "localhost";
private static final int PORT = 9092;
private static final int DEFAULT_RETRIES = 3;
private static final long RETRY_BACKOFF_MS = 1000;
private final Properties properties;
private Socket socket;
private PrintWriter out;
private BufferedReader in;
private boolean connected = false;
public KafkaProducer() {
this(new Properties());
}
public KafkaProducer(Properties properties) {
this.properties = properties;
}
public void connect() throws IOException {
if (connected) {
return;
}
int retries = 0;
while (retries < DEFAULT_RETRIES) {
try {
socket = new Socket(HOST, PORT);
out = new PrintWriter(socket.getOutputStream(), true);
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String welcome = in.readLine();
if (welcome != null && welcome.startsWith("CONNECTED")) {
connected = true;
System.out.println("Connected to KafkaLikeBroker");
System.out.println(welcome);
return;
}
} catch (IOException e) {
retries++;
if (retries >= DEFAULT_RETRIES) {
throw new IOException("Failed to connect after " + DEFAULT_RETRIES + " attempts", e);
}
try {
Thread.sleep(RETRY_BACKOFF_MS);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new IOException("Connection interrupted", ie);
}
}
}
}
public boolean send(String topic, String key, String message) throws IOException {
if (!connected) {
throw new IllegalStateException("Producer is not connected to broker");
}
if (topic == null || topic.trim().isEmpty()) {
throw new IllegalArgumentException("Topic cannot be null or empty");
}
if (key == null || key.trim().isEmpty()) {
throw new IllegalArgumentException("Key cannot be null or empty");
}
try {
// Send the message with key
out.println("PRODUCE:" + topic + ":" + key + ":" + message);
// Wait for acknowledgment
String response = in.readLine();
if (response != null && response.startsWith("ACK")) {
System.out.println("Message sent successfully: " + response);
return true;
} else {
System.err.println("Failed to send message: " + response);
return false;
}
} catch (IOException e) {
System.err.println("Error sending message: " + e.getMessage());
reconnect();
out.println("PRODUCE:" + topic + ":" + key + ":" + message);
String response = in.readLine();
return response != null && response.startsWith("ACK");
}
}
public boolean createTopic(String topicName) throws IOException {
if (!connected) {
throw new IllegalStateException("Producer is not connected to broker");
}
if (topicName == null || topicName.trim().isEmpty()) {
throw new IllegalArgumentException("Topic cannot be null or empty");
}
try {
// Send the message with key
out.println("CREATE-TOPIC:" + topicName);
// Wait for acknowledgment
String response = in.readLine();
if (response != null && response.startsWith("ACK")) {
System.out.println("TOPIC CREATED: " + response);
return true;
} else {
System.err.println("Failed to CREATE: " + response);
return false;
}
} catch (IOException e) {
System.err.println("Error sending message: " + e.getMessage());
reconnect();
out.println("CREATE-TOPIC:" + topicName);
String response = in.readLine();
return response != null && response.startsWith("ACK");
}
}
public void listTopics() throws IOException {
if (!connected) {
throw new IllegalStateException("Producer is not connected to broker");
}
out.println("LIST-TOPIC:");
String response = in.readLine();
if (response != null && response.startsWith("TOPIC-METADATA:")) {
System.out.println("Available Topics: " + response.substring(15));
} else {
System.err.println("Failed to list topics: " + response);
}
}
private void reconnect() throws IOException {
disconnect();
connect();
}
public void disconnect() {
if (!connected) {
return;
}
try {
if (out != null) {
out.println("DISCONNECT");
String response = in.readLine();
System.out.println(response);
}
} catch (IOException e) {
System.err.println("Error during disconnect: " + e.getMessage());
} finally {
connected = false;
try {
if (socket != null) socket.close();
if (out != null) out.close();
if (in != null) in.close();
} catch (IOException e) {
System.err.println("Error closing resources: " + e.getMessage());
}
System.out.println("Disconnected from broker.");
}
}
}
We have now implementaed all components of kafka. We can test our implementation of full working bu starting broker , consumer and producer class.
Summary
In this blog, we explored the internal workings of Kafka, exploring into how it operates behind the scenes. We discussed the roles and functionalities of key components such as the broker, consumer, and producer, explaining how they interact to enable efficient message streaming.
Additionally, we examined the concepts of topics and partitions, detailing how messages are stored and managed within Kafka. To get practical our understanding, we also built a simplified Kafka-like application, demonstrating the core principles in action.
However, due to space constraints, we were unable to cover important topics like replication and permanent storage in this blog. These are critical aspects of Kafka that ensure data durability, fault tolerance, and high availability. We will cover these topics in detail in a future blog post. Stay tuned!
You can get the full working code on GitHub