system-design

Learn Kafka by rebuilding it from scracth

43 min read
#system-design

Kafka is a distributed event streaming platform that powers real-time data pipelines and applications. While many use Kafka for its scalability and fault tolerance, understanding its internals can be challenging. The best way to truly grasp Kafka’s architecture is to build a simplified version of it.

In this blog, I will do my best to break down Kafka’s internals by constructing a simple implementation of it. This will give you hands-on experience with fundamental concepts like brokers, topics, partitions, producers, consumers, and consumer groups. I will use Java for coding, and I will also provide a Node.js version of the implementation.

Before diving into building the internals of Kafka, let's first understand how Kafka works.

Understanding Kafka’s Core Components

Kafka consists of three main components:

  1. Brokers – Servers that store and manage messages.
  2. Producers – Send messages to Kafka topics.
  3. Consumers – Read messages from Kafka topics.

Kafka Brokers

Kafka brokers are servers that manage message storage and retrieval. They play a crucial role in ensuring data availability and reliability.

Here's how data flows in a broker:

  • Message Reception – Brokers receive messages from producers.
  • Partition Management – Messages are distributed across partitions within topics.
  • Data Storage – Messages are persisted to disk for a configured retention period.
  • Consumer Coordination – Brokers track which messages have been read by consumers.
  • Load Balancing – Distribute workload across multiple brokers for scalability.

A Kafka cluster typically consists of multiple brokers working together to handle high volumes of data efficiently.

Topics and Partitions

What is Topic?

Unlike traditional messaging systems that use a point-to-point model, where each message is consumed by only one receiver, Kafka follows a publish-subscribe (pub-sub) model, where multiple consumers can read from the same topic independently.

A topic is a logical grouping of messages that belong to a particular category. Producers publish messages to a topic, and consumers subscribe to a topic to retrieve relevant data.

What are Partitions?

Each topic is divided into partitions, allowing Kafka to:

  • Scale horizontally – Messages are distributed across multiple partitions, enabling parallel processing.
  • Ensure fault tolerance – If one partition fails, another broker can take over.
  • Maintain order – Messages within a partition maintain strict ordering.

Each partition acts as a separate, ordered log file stored on a broker. Every message within a partition is assigned a unique, incremental offset.

When producers send messages, Kafka automatically assigns them to partitions based on the partitioning strategy.

Producers and Consumers

Producers (Message Senders)

Producers are responsible for creating and sending messages to Kafka topics. Kafka uses different partitioning strategies to decide how messages are distributed across partitions:

  1. Key-based Partitioning (if a key is provided)

    • The producer assigns a key to each message.
    • Kafka uses a hash function (hash(key) % num_partitions) to determine which partition the message should go to.
    • Ensures that messages with the same key always go to the same partition, maintaining order for that key.
    • Example: In an e-commerce system, messages for a specific user ID are always routed to the same partition.
  2. Round-Robin Partitioning (Default if no key is provided)

    • Messages are distributed evenly across all partitions.
    • Ensures load balancing but does not guarantee order across partitions.
    • Example: If logs are being sent without a key, they are spread across all partitions for better throughput.
  3. Custom Partitioning (User-defined logic)

    • Developers can implement a custom partitioning strategy using a Partitioner class.
    • Useful for specific use cases, such as geographical routing or priority-based messaging.

To optimize performance, messages are batched before sending.

Consumers (Message Receivers)

Consumers subscribe to topics and retrieve messages from partitions. Each consumer tracks its position using an offset, which is a unique identifier for each message.

Kafka supports consumer groups, where multiple consumers share the workload:

Kafka supports consumer groups, where multiple consumers work together to process messages from a topic. This allows for scalability, fault tolerance, and parallel processing.

How Consumer Groups Work

  • Each consumer group is identified by a group ID.
  • Within a group, each consumer is assigned a subset of partitions from the topic.
  • No two consumers in the same group read the same partition—each partition is assigned exclusively to one consumer.
  • If the number of consumers exceeds the number of partitions, some consumers will remain idle (since each partition can only be assigned to one consumer at a time).
  • If a consumer fails or disconnects, Kafka automatically reassigns its partitions to the remaining consumers. This is called rebalancing.

Consider a system with three consumers in a group:

ConsumerAssigned Partition
Consumer 1Reads from Partition 0
Consumer 2Reads from Partition 1
Consumer 3Reads from Partition 2

If Consumer 2 crashes, Kafka redistributes Partition 1 among the remaining consumers:

ConsumerAssigned Partition After Failure
Consumer 1Reads from Partition 0 and 1
Consumer 3Reads from Partition 2

When Consumer 2 comes back online, Kafka rebalances again and may assign Partition 1 back to it.

Kafka consumers always belong to a consumer group, which determines how messages are read from partitions.

Consider a we have three consumers in the Same Consumer Group Scenario: We have one topic with three partitions and three consumers in the same consumer group (group-A).

Consumer GroupConsumerAssigned Partition
group-AConsumer 1Reads from Partition 0
group-AConsumer 2Reads from Partition 1
group-AConsumer 3Reads from Partition 2

In this case:

  • Each partition is assigned to one consumer in the group.
  • Consumers in the same group divide the work; each message is processed by only one consumer in the group.
  • If a consumer fails, Kafka reassigns its partition to another active consumer in the same group.
  • Messages are not duplicated—each message is consumed only once per group.

In this case data is evenly distributed among multiple consumers to scale processing.

Now consider a different case where we have three consumers in different consumer Groups. Scenario: We have one topic with three partitions and three consumers, but each belongs to a different consumer group (group-A, group-B, group-C).

Consumer GroupConsumerAssigned Partition
group-AConsumer 1Reads all partitions (0,1,2)
group-BConsumer 2Reads all partitions (0,1,2)
group-CConsumer 3Reads all partitions (0,1,2)

In this case:

  • Since each consumer belongs to a different group, each group gets a full copy of all messages.
  • Consumers in different groups do not compete for messages; they each process all messages independently.
  • This allows multiple applications to consume the same Kafka topic without interfering with each other.

Key Takeaways about consumer group

ScenarioPartition SharingMessage Duplication
Same Consumer GroupConsumers share partitions (each message processed by only one consumer)❌ No duplication
Different Consumer GroupsEach group reads all partitions (each message is processed by each group)✅ Messages are duplicated across groups

How Kafka Works

Kafka follows a Producer → Broker → Consumer pipeline.

  1. Producer Sends Messages

    • The producer connects to a Kafka broker.
    • Messages are published to a topic.
    • Kafka assigns messages to partitions.
    • Messages are sent in batches to improve efficiency.
  2. Broker Stores Messages

    • Kafka brokers store messages persistently.
    • Each message gets an incremental offset.
    • Data is replicated across multiple brokers for fault tolerance.
    • Brokers retain messages for a configurable period (default: 7 days).
  3. Consumer Reads Messages

    • Consumers subscribe to topics and request data from brokers.
    • Kafka keeps track of the consumer's offset (position in the log).
    • Consumers can auto-commit or manually commit offsets to track progress.
    • If a consumer crashes, Kafka rebalances partitions to active consumers.

How Kafka Handles Message Ordering

Kafka guarantees message order within a partition, but not across partitions.

Partition-Level Ordering

  • Each partition maintains a strict order.
  • Messages in a partition are appended sequentially.
  • Consumers read messages in the same order they were produced.

Global Ordering Across Partitions

  • If a topic has multiple partitions, Kafka does not guarantee global ordering.
  • Messages from different partitions may be processed in parallel, leading to out-of-order delivery.

Key-Based Partitioning

  • Messages with the same key are always sent to the same partition.
  • Ensures messages related to a specific key are processed in order.

Consumer Group Behavior

  • A consumer group distributes partitions among multiple consumers.
  • Each partition is assigned to only one consumer at a time.
  • Messages within a partition are read sequentially, but messages across partitions may be out of order.

Rebalancing Impact

  • When a new consumer joins or leaves a group, Kafka reassigns partitions.
  • During rebalancing, there may be a short delay in message processing.
  • However, partition-level order is preserved.

Replication and Ordering

  • Kafka follows a leader-follower replication model.
    • Producers write messages to the leader partition.
    • Followers replicate the data for fault tolerance.
  • If a leader broker fails, a follower is promoted as the new leader.
  • A short delay may occur during leader election, but message order remains intact.

Now, let's summarize everything we have covered so far.

ComponentDescription
BrokerStores and manages messages in partitions.
ProducerSends messages to Kafka topics.
ConsumerReads messages from topics.
PartitionA segment of a topic that enables parallel processing.
OffsetA unique identifier for each message within a partition.
Consumer GroupA set of consumers that share the workload of processing messages from a topic.

Building TopicManager and Partition Classes

Now that we have a solid understanding of how Kafka works and the flow of data from a producer to a consumer through topics and partitions, it's time to translate this knowledge into code.

In this section, we will design and implement two fundamental classes:

  1. TopicManager – This class will be responsible for managing topics, partitions, producers, and consumers.
  2. Partition – This class will represent an individual partition, where messages are stored and retrieved.

Why Do We Need a TopicManager?

In Kafka, a topic is a logical grouping of messages, while partitions allow for parallel processing and scalability. Our TopicManager class will act as a central hub, handling:

  • Creating and managing topics
  • Distributing messages across partitions
  • Registering producers and consumers
  • Coordinating message retrieval for consumers
  • By building this class, we are essentially simulating Kafka's topic and partition management system, giving us deeper insight into how Kafka efficiently handles data flow.

The Role of the Partition Class

Each topic in Kafka is divided into multiple partitions to achieve fault tolerance and parallel processing. The Partition class will:

  • Store messages in an ordered sequence
  • Maintain an offset to track message positions
  • Support reading and writing operations

In the next steps, we will implement these classes step by step, ensuring that our design mimics how Kafka works under the hood.

Defining Variables for TopicManager

Before implementing the logic, let's first define the key data structures that will help us manage topics and partitions efficiently:

private final Map<String, List<Partition>> topicPartitions;
final Map<String, Map<String, List<Integer>>> consumerGroupAssignments;
private final Map<String, Set<String>> topicSubscriptions;
private final Map<String, Map<String, Map<Integer, ReentrantLock>>> consumerPartitionLocks;
private final int defaultPartitionCount;

Understanding Each Variable

  1. topicPartitions (Map<String, List<Partition>>)

    • Stores all topics and their corresponding partitions.
    • Helps us quickly retrieve partitions when producers send messages or consumers read them.
  2. consumerGroupAssignments (Map<String, Map<String, List<Integer>>>)

    • Key: Consumer Group ID → Value: Map of Consumer ID → Assigned Partition IDs
    • Ensures that no two consumers within the same group read the same partition.
  3. topicSubscriptions (Map<String, Set<String>>)

    • Tracks which consumer groups are subscribed to which topics.
    • Helps during consumer rebalancing when new consumers join.
  4. consumerPartitionLocks (Map<String, Map<String, Map<Integer, ReentrantLock>>>)

    • Prevents multiple consumers from reading the same partition at the same time.
    • Ensures message order and prevents race conditions.
  5. defaultPartitionCount (int)

    • Defines how many partitions a topic will have by default.
    • Helps distribute messages evenly across partitions.

Implementing the Constructor

Now that we've defined our variables, let's initialize them inside our constructor. We'll use ConcurrentHashMap to ensure thread safety in a multi-threaded environment (just like Kafka does).

class TopicManager {
    private final Map<String, List<Partition>> topicPartitions;
    final Map<String, Map<String, List<Integer>>> consumerGroupAssignments;
    private final Map<String, Set<String>> topicSubscriptions;
    private final Map<String, Map<String, Map<Integer, ReentrantLock>>> consumerPartitionLocks;
    private final int defaultPartitionCount;

    public TopicManager(int partitionCount) {
        this.topicPartitions = new ConcurrentHashMap<>();
        this.consumerGroupAssignments = new ConcurrentHashMap<>();
        this.topicSubscriptions = new ConcurrentHashMap<>();
        this.consumerPartitionLocks = new ConcurrentHashMap<>();
        this.defaultPartitionCount = partitionCount;
    }
}

We initialize each data structure with ConcurrentHashMap for thread safety. The defaultPartitionCount is set via the constructor, allowing us to configure the number of partitions per topic. Now that we've built the TopicManager, let's create an instance of it and set up our default partitions:

TopicManager topicManger = new TopicManager(4) // Default is three

Every topic we create will have 4 partitions unless specified otherwise. A higher partition count allows for better parallelism when multiple consumers process data. However, having too many partitions increases overhead, so we need to find a balance.

Defining Variables for Partition class

In Kafka, a partition is a fundamental unit of storage within a topic. It ensures that messages are stored in an ordered sequence and allows parallel processing by multiple consumers.

Now, let's build the Partition class, which will be responsible for:

  • Storing messages in order
  • Tracking consumer offsets for efficient retrieval
  • Ensuring thread safety when multiple producers and consumers interact

Before writing the constructor, let's first define the core variables needed to manage a partition effectively:

private final String topic;
private final int partitionId;
private final List<String> messages;
private final Map<String, Integer> groupOffsets;
private final ReentrantLock lock;

Understanding Each Variable of partition class

  1. topic (String)
    • Stores the name of the topic to which this partition belongs.
    • Helps in identifying which topic this partition is associated with.
  2. partitionId (int)
    • Represents the partition ID within a topic.
    • Each topic consists of multiple partitions, and each partition has a unique ID for easy identification.
  3. messages (List<String>)
    • Stores all messages added to this partition.
    • Messages are added in sequential order, ensuring they can be retrieved in the same order.
  4. groupOffsets ( Map<String, Integer>)
    • Key: Consumer Group ID → Value: Last Read Offset
    • Tracks where each consumer group left off, ensuring that they start reading from the correct position.
  5. lock (ReentrantLock)
    • Ensures thread safety when multiple producers or consumers interact with this partition.
    • Prevents race conditions when adding or consuming messages.

Implementing the Constructor

Now that we've defined the variables, let's initialize them in our constructor:

class Partition {
    private final String topic;
    private final int partitionId;
    private final List<String> messages;
    private final Map<String, Integer> groupOffsets;
    private final ReentrantLock lock;

    public Partition(String topic, int partitionId) {
        this.topic = topic;
        this.partitionId = partitionId;
        this.messages = new ArrayList<>();
        this.groupOffsets = new HashMap<>();
        this.lock = new ReentrantLock();
    }
}
  • this.topic = topic; → Associates the partition with a specific topic.
  • this.partitionId = partitionId; → Assigns a unique partition ID for easy management.
  • this.messages = new ArrayList<>(); → Initializes an empty list to store messages in order.
  • this.groupOffsets = new HashMap<>(); → Creates a map to track each consumer group's last read position.
  • this.lock = new ReentrantLock(); → Ensures safe access to partition data in a multi-threaded environment.

Now that we've built the Partition class, let's create an instance of it:

Partition partition = new Partition("topic12", "1");

A new partition is created for topic12 with partition ID 1. It is empty at first, but producers can add messages, and consumers can retrieve them.

Adding method to partition class

Now that we have defined our Partition class, it's time to implement the core functionality:

  • Adding messages into the partition
  • Consuming messages sequentially
  • Tracking consumer progress using offsets
  • Ensuring thread safety to handle multiple producers and consumers

Step 1: Implementing addMessage - Storing Messages in the Partition

When a producer sends a message, it needs to be added to the correct partition. Since multiple producers might write to the same partition simultaneously, we use a lock to ensure safe modification.

public void addMessage(String message) {
    lock.lock();  // Acquire lock to ensure thread safety
    try {
        messages.add(message);  // Add the message to the partition's message list
    } finally {
        lock.unlock();  // Release the lock to allow other operations
    }
}
  • The lock.lock() ensures that only one producer at a time can modify the partition.
  • The message is added to the end of the list, maintaining order.
  • The lock.unlock() releases the lock after adding the message, allowing other operations to

Step 2: Implementing consumeMessage - Retrieving Messages for Consumers

Now, let's implement message consumption. A consumer reads messages from the partition and updates its offset, so it doesn’t read the same message again.

public String consumeMessage(String groupId) {
    lock.lock();  // Ensure only one consumer modifies the offset at a time
    try {
        int offset = groupOffsets.getOrDefault(groupId, 0);  // Get the last read position
        if (offset >= messages.size()) {  // If there are no new messages, return null
            return null;
        }
        String message = messages.get(offset);  // Retrieve the next unread message
        groupOffsets.put(groupId, offset + 1);  // Update the offset to mark it as read
        return message;
    } finally {
        lock.unlock();  // Release lock after consumption
    }
}
  • Retrieve the consumer group’s offset to find the last read position.
  • If the offset exceeds the total messages, return null (no new messages).
  • Otherwise, return the next unread message and increment the offset so the consumer doesn’t read the same message again.

Step 3: Implementing hasMessages - Checking for Unread Messages

Before a consumer tries to consume a message, it should check whether there are unread messages. This method prevents unnecessary polling and optimizes consumer behavior.

public boolean hasMessages(String groupId) {
    lock.lock();  // Ensure safe access to offsets
    try {
        int offset = groupOffsets.getOrDefault(groupId, 0);  // Get the last read position
        return offset < messages.size();  // Return true if there are unread messages
    } finally {
        lock.unlock();  // Release lock
    }
}
  • Retrieve the current offset of the consumer group.
  • Compare it with messages.size() → If the offset is less, there are unread messages.
  • This helps in optimizing consumer polling, preventing unnecessary retries.

Final Partition Class

Now, let's put everything together into the final version of our Partition class:

class Partition {
    private final String topic;
    private final int partitionId;
    private final List<String> messages;
    private final Map<String, Integer> groupOffsets;
    private final ReentrantLock lock;

    public Partition(String topic, int partitionId) {
        this.topic = topic;
        this.partitionId = partitionId;
        this.messages = new ArrayList<>();
        this.groupOffsets = new HashMap<>();
        this.lock = new ReentrantLock();
    }

    public void addMessage(String message) {
        lock.lock();
        try {
            messages.add(message);
        } finally {
            lock.unlock();
        }
    }

    public String consumeMessage(String groupId) {
        lock.lock();
        try {
            int offset = groupOffsets.getOrDefault(groupId, 0);
            if (offset >= messages.size()) {
                return null;
            }
            String message = messages.get(offset);
            groupOffsets.put(groupId, offset + 1);
            return message;
        } finally {
            lock.unlock();
        }
    }

    public boolean hasMessages(String groupId) {
        lock.lock();
        try {
            int offset = groupOffsets.getOrDefault(groupId, 0);
            return offset < messages.size();
        } finally {
            lock.unlock();
        }
    }

    public int getPartitionId() {
        return partitionId;
    }
}

Adding method to TopicManager class

The TopicManager class is the core of our messaging system. It handles topics, partitions, and message distribution while ensuring that consumers receive messages efficiently and fairly.

We'll go through each method one by one, understanding what they do and why they are important.

Adding Messages to a Topic (addMessage method)

The first method is createTopic. It creates a new topic if it doesn’t exist. It also Initializes the default number of partitions.

    public void createTopic(String topic) {
        topicPartitions.putIfAbsent(topic, new ArrayList<>());
        if (topicPartitions.get(topic).isEmpty()) {
            List<Partition> partitions = new ArrayList<>();
            for (int i = 0; i < defaultPartitionCount; i++) {
                partitions.add(new Partition(topic, i));
            }
            topicPartitions.put(topic, partitions);
        }
    }

Adding Messages to a Topic (addMessage method)

addMessage method adds a message to a topic. It Uses consistent hashing (key.hashCode()) to always send the same key to the same partition.

    public void addMessage(String topic, String key, String message) {
        createTopic(topic); // First, we ensure the topic exists by calling createTopic(topic).
        List<Partition> partitions = topicPartitions.get(topic);

        // Use a consistent hashing mechanism to ensure same keys go to same partitions
        int partition = Math.abs(key.hashCode() % defaultPartitionCount);
        partitions.get(partition).addMessage(message);
    }

consumeMessage to a Topic (consumeMessage method)

consumeMessage Finds the partitions assigned to the given consumer and it reads the next available message from one of those partitions. First, we find the partitions assigned to the consumer using getAssignedPartitions(topic, groupId, consumerId) and then We acquire locks on partitions to ensure that no two consumers read the same message at the same time.We read the next available message from the assigned partitions.

    public String consumeMessage(String topic, String groupId, String consumerId) {
        if (!topicPartitions.containsKey(topic)) {
            return null;
        }

        List<Integer> assignedPartitions = getAssignedPartitions(topic, groupId, consumerId);
        Map<Integer, ReentrantLock> partitionLocks = getConsumerPartitionLocks(topic, groupId, consumerId);

        // Iterate over assigned partitions in order and block until message is read
        for (Integer partitionId : assignedPartitions) {
            ReentrantLock partitionLock = partitionLocks.get(partitionId);
            partitionLock.lock();  // Ensure message order by blocking
            try {
                Partition partition = topicPartitions.get(topic).get(partitionId);
                if (partition.hasMessages(groupId)) {
                    return partition.consumeMessage(groupId);
                }
            } finally {
                partitionLock.unlock();
            }
        }
        return null;
    }

getConsumerPartitionLocks to a Topic (getConsumerPartitionLocks method)

getConsumerPartitionLocks returns the locks for the partitions assigned to a specific consumer. It Ensures that only one consumer reads a partition at a time.

    private Map<Integer, ReentrantLock> getConsumerPartitionLocks(String topic, String groupId, String consumerId) {
        consumerPartitionLocks.putIfAbsent(topic, new ConcurrentHashMap<>());
        Map<String, Map<Integer, ReentrantLock>> topicLocks = consumerPartitionLocks.get(topic);

        String consumerKey = groupId + "-" + consumerId;
        topicLocks.putIfAbsent(consumerKey, new ConcurrentHashMap<>());

        Map<Integer, ReentrantLock> partitionLocks = topicLocks.get(consumerKey);
        List<Integer> assignedPartitions = getAssignedPartitions(topic, groupId, consumerId);

        // Ensure we have locks for all assigned partitions
        for (Integer partitionId : assignedPartitions) {
            partitionLocks.putIfAbsent(partitionId, new ReentrantLock());
        }

        return partitionLocks;
    }

getAssignedPartitions to a Topic (getAssignedPartitions method)

getAssignedPartitions returns which partitions a consumer is assigned to. If the consumer has no assignments, it triggers a rebalance.

    public List<Integer> getAssignedPartitions(String topic, String groupId, String consumerId) {
        consumerGroupAssignments.putIfAbsent(groupId, new ConcurrentHashMap<>());
        Map<String, List<Integer>> groupAssignments = consumerGroupAssignments.get(groupId);
        groupAssignments.putIfAbsent(consumerId, new ArrayList<>());

        // Keep track of topic subscriptions for the group
        topicSubscriptions.putIfAbsent(topic, new HashSet<>());
        topicSubscriptions.get(topic).add(groupId);

        // If empty or we explicitly need a rebalance
        if (groupAssignments.get(consumerId).isEmpty()) {
            rebalanceGroup(topic, groupId);
        }

        return groupAssignments.get(consumerId);
    }

subscribeToTopic to a Topic (subscribeToTopic method)

subscribeToTopic allows a consumer to join a topic. It Ensures that partitions are rebalanced among all consumers in the group.

    public void subscribeToTopic(String topic, String groupId, String consumerId) {
        // Create topic if it doesn't exist
        createTopic(topic);

        // Register this group for the topic
        topicSubscriptions.putIfAbsent(topic, new HashSet<>());
        topicSubscriptions.get(topic).add(groupId);

        // Ensure consumer is in the group
        consumerGroupAssignments.putIfAbsent(groupId, new ConcurrentHashMap<>());
        consumerGroupAssignments.get(groupId).putIfAbsent(consumerId, new ArrayList<>());

        // Force rebalance for this topic and group
        rebalanceGroup(topic, groupId);
    }

rebalanceGroup to a Topic (rebalanceGroup method)

When new consumers join, we need to redistribute partitions evenly. rebalanceGroup distributes partitions evenly among consumers in a group. It Ensures fair partition assignment when new consumers join.

    public void rebalanceGroup(String topic, String groupId) {
        Map<String, List<Integer>> groupAssignments = consumerGroupAssignments.get(groupId);
        List<String> consumers = new ArrayList<>(groupAssignments.keySet());
        Collections.sort(consumers);

        // Get actual partitions for the topic (not the default!)
        int numPartitions = topicPartitions.get(topic).size();

        // Clear existing assignments for this topic only
        for (String consumer : consumers) {
            // Remove only assignments for this topic
            groupAssignments.put(consumer, new ArrayList<>());
        }

        // Assign partitions to consumers in round-robin
        int consumerIndex = 0;
        for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
            if (consumers.isEmpty()) break;
            String consumer = consumers.get(consumerIndex % consumers.size());
            groupAssignments.get(consumer).add(partitionId);
            consumerIndex++;
        }
        System.out.println("REBALANCE for topic " + topic + " and group " + groupId);
    }

Final Partition Class

Now, let's put everything together into the final version of our Partition class:

class TopicManager {
    private final Map<String, List<Partition>> topicPartitions;
    final Map<String, Map<String, List<Integer>>> consumerGroupAssignments;
    private final Map<String, Set<String>> topicSubscriptions; // Track topic -> groups that are subscribed
    private final Map<String, Map<String, Map<Integer, ReentrantLock>>> consumerPartitionLocks;
    private final int defaultPartitionCount;

    public TopicManager(int partitionCount) {
        this.topicPartitions = new ConcurrentHashMap<>();
        this.consumerGroupAssignments = new ConcurrentHashMap<>();
        this.topicSubscriptions = new ConcurrentHashMap<>();
        this.consumerPartitionLocks = new ConcurrentHashMap<>();
        this.defaultPartitionCount = partitionCount;
    }

    public void createTopic(String topic) {
        topicPartitions.putIfAbsent(topic, new ArrayList<>());
        if (topicPartitions.get(topic).isEmpty()) {
            List<Partition> partitions = new ArrayList<>();
            for (int i = 0; i < defaultPartitionCount; i++) {
                partitions.add(new Partition(topic, i));
            }
            topicPartitions.put(topic, partitions);
        }
    }

    public String listTopics() {
        StringBuilder result = new StringBuilder();
        for (Map.Entry<String, List<Partition>> entry : topicPartitions.entrySet()) {
            String topic = entry.getKey();
            List<Partition> partitions = entry.getValue();
            result.append(topic).append(" (Partitions: ").append(partitions.size()).append(") [");
            for (Partition partition : partitions) {
                result.append("P").append(partition.getPartitionId()).append(", ");
            }
            if (!partitions.isEmpty()) {
                result.setLength(result.length() - 2);
            }
            result.append("]\n");
        }
        return result.toString().trim();
    }

    public void addMessage(String topic, String key, String message) {
        createTopic(topic);
        List<Partition> partitions = topicPartitions.get(topic);

        // Use a consistent hashing mechanism to ensure same keys go to same partitions
        int partition = Math.abs(key.hashCode() % defaultPartitionCount);
        partitions.get(partition).addMessage(message);
    }

    public String consumeMessage(String topic, String groupId, String consumerId) {
        if (!topicPartitions.containsKey(topic)) {
            return null;
        }

        List<Integer> assignedPartitions = getAssignedPartitions(topic, groupId, consumerId);
        Map<Integer, ReentrantLock> partitionLocks = getConsumerPartitionLocks(topic, groupId, consumerId);

        // Iterate over assigned partitions in order and block until message is read
        for (Integer partitionId : assignedPartitions) {
            ReentrantLock partitionLock = partitionLocks.get(partitionId);
            partitionLock.lock();  // Ensure message order by blocking
            try {
                Partition partition = topicPartitions.get(topic).get(partitionId);
                if (partition.hasMessages(groupId)) {
                    return partition.consumeMessage(groupId);
                }
            } finally {
                partitionLock.unlock();
            }
        }
        return null;
    }

    private Map<Integer, ReentrantLock> getConsumerPartitionLocks(String topic, String groupId, String consumerId) {
        consumerPartitionLocks.putIfAbsent(topic, new ConcurrentHashMap<>());
        Map<String, Map<Integer, ReentrantLock>> topicLocks = consumerPartitionLocks.get(topic);

        String consumerKey = groupId + "-" + consumerId;
        topicLocks.putIfAbsent(consumerKey, new ConcurrentHashMap<>());

        Map<Integer, ReentrantLock> partitionLocks = topicLocks.get(consumerKey);
        List<Integer> assignedPartitions = getAssignedPartitions(topic, groupId, consumerId);

        // Ensure we have locks for all assigned partitions
        for (Integer partitionId : assignedPartitions) {
            partitionLocks.putIfAbsent(partitionId, new ReentrantLock());
        }

        return partitionLocks;
    }

    public List<Integer> getAssignedPartitions(String topic, String groupId, String consumerId) {
        consumerGroupAssignments.putIfAbsent(groupId, new ConcurrentHashMap<>());
        Map<String, List<Integer>> groupAssignments = consumerGroupAssignments.get(groupId);
        groupAssignments.putIfAbsent(consumerId, new ArrayList<>());

        // Keep track of topic subscriptions for the group
        topicSubscriptions.putIfAbsent(topic, new HashSet<>());
        topicSubscriptions.get(topic).add(groupId);

        // If empty or we explicitly need a rebalance
        if (groupAssignments.get(consumerId).isEmpty()) {
            rebalanceGroup(topic, groupId);
        }

        return groupAssignments.get(consumerId);
    }

    public void subscribeToTopic(String topic, String groupId, String consumerId) {
        // Create topic if it doesn't exist
        createTopic(topic);

        // Register this group for the topic
        topicSubscriptions.putIfAbsent(topic, new HashSet<>());
        topicSubscriptions.get(topic).add(groupId);

        // Ensure consumer is in the group
        consumerGroupAssignments.putIfAbsent(groupId, new ConcurrentHashMap<>());
        consumerGroupAssignments.get(groupId).putIfAbsent(consumerId, new ArrayList<>());

        // Force rebalance for this topic and group
        rebalanceGroup(topic, groupId);
    }

    public void rebalanceGroup(String topic, String groupId) {
        Map<String, List<Integer>> groupAssignments = consumerGroupAssignments.get(groupId);
        List<String> consumers = new ArrayList<>(groupAssignments.keySet());
        Collections.sort(consumers);

        // Get actual partitions for the topic (not the default!)
        int numPartitions = topicPartitions.get(topic).size();

        // Clear existing assignments for this topic only
        for (String consumer : consumers) {
            // Remove only assignments for this topic
            groupAssignments.put(consumer, new ArrayList<>());
        }

        // Assign partitions to consumers in round-robin
        int consumerIndex = 0;
        for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
            if (consumers.isEmpty()) break;
            String consumer = consumers.get(consumerIndex % consumers.size());
            groupAssignments.get(consumer).add(partitionId);
            consumerIndex++;
        }

        // Debug output
        System.out.println("REBALANCE for topic " + topic + " and group " + groupId);
        for (String consumer : consumers) {
            System.out.println("  Consumer " + consumer + " assigned: " + groupAssignments.get(consumer));
        }
    }
}

Now that we have our TopicManager and Partition class is ready, We can now move to building KafkaBroker, KafkaConsumer and KafkaProducer class. Before going in actual implementation of these kafka class, we first need to know how producer, broker and consumer talk to each other.

Deep Dive into Kafka Producer, Broker, and Consumer Communication

Kafka follows a distributed messaging system where producers, brokers, and consumers communicate efficiently using the Kafka protocol over optimized TCP connections. Let’s break it down in more detail.

Producer-to-Broker Communication (Push Model with Optimizations)

The producer is responsible for sending messages to the broker. However, unlike a pure push model, Kafka producers follow an optimized batching mechanism to enhance performance. Here’s how it works:

  1. Persistent TCP Connection

    • A Kafka producer establishes a long-lived TCP connection with the Kafka broker to reduce the overhead of repeated connections.
    • The connection is maintained across multiple messages to optimize resource usage.
  2. Message Batching and Linger Time

    • Kafka does not send every message immediately; instead, it batches messages to optimize network throughput.
    • The linger.ms configuration controls how long the producer should wait before sending a batch.
    • If a batch fills up before linger.ms expires, it is sent immediately.
  3. Acknowledgments (acks)

Kafka producers can send messages in three different acknowledgment modes:

  • acks=0: Fire-and-forget (no broker acknowledgment, fastest but unreliable).
  • acks=1: The leader broker acknowledges receipt (default, moderate reliability).
  • acks=all: The leader and all in-sync replicas acknowledge (highest reliability, but slower).
  1. Partitioning and Routing

    • Producers decide which partition a message should go to based on partition keys or Kafka’s internal partitioning strategy.
    • Once the partition is determined, the producer sends the message to the correct broker responsible for that partition.
  2. Error Handling and Retries

    • If a message fails due to a network issue or a broker being unavailable, the producer can retry sending messages based on the retries configuration.
    • The retry.backoff.ms setting controls the time between retries.

Consumer-to-Broker Communication (Polling Model with Long Polling)

Kafka consumers do not receive messages automatically. Instead, they poll the broker at regular intervals to fetch available messages.

  1. Long-Lived TCP Connection

    • Just like producers, consumers maintain a persistent TCP connection with brokers to reduce connection overhead.
  2. Polling Mechanism (poll())

    • The consumer must call poll() periodically to fetch new messages.
    • If no new messages are available, the broker waits before responding (this is called long polling), reducing the number of empty responses.
  3. Long Polling (fetch.min.bytes & fetch.max.wait.ms)

    • Consumers avoid unnecessary requests by waiting for at least one message before returning data.
    • The fetch.min.bytes setting ensures the broker accumulates a certain amount of data before responding.
    • The fetch.max.wait.ms setting defines the maximum wait time before the broker responds, even if the data size is small.
  4. Consumer Groups and Partition Assignment

    • Consumers belong to consumer groups to balance the workload across multiple instances.
    • Each partition is assigned to only one consumer in the group at a time to prevent duplicate processing.
  5. Heartbeat and Rebalancing

    • Consumers send heartbeats (heartbeat.interval.ms) to let the broker know they are alive.
    • If a consumer fails to send a heartbeat within session.timeout.ms, it is removed from the group, and Kafka triggers a rebalance to redistribute partitions among active consumers.
  6. Commit Offset Mechanism

    • Consumers keep track of the last processed message offset in Kafka.
    • Offsets can be committed automatically (enable.auto.commit=true) or manually to prevent message loss.

Summary of Communication Flow

  1. Producers push messages to brokers over a persistent TCP connection, using batching, acknowledgments, and retries.
  2. Brokers store messages in partitions and manage leader-follower replication for fault tolerance.
  3. Consumers pull messages using a polling model with long polling, ensuring efficient data retrieval.
  4. Heartbeat mechanisms and rebalancing handle consumer failures dynamically.

With this understanding, we can now proceed to implement the KafkaBroker, KafkaConsumer, and KafkaProducer classes.

KafkaBroker Class (Main Server)

This class initializes the Kafka-like broker and manages client connections for producers and consumers. Upon execution, it starts listening for incoming TCP connections on port 9092. Additionally, it creates a global TopicManager instance (as defined in the previous section) with three default partitions. This default partition size you can customized based on your requirements.

public class KafkaBroker {
    private static final int PORT = 9092;
    private static final TopicManager topicManager = new TopicManager(3);

    public static void main(String[] args) throws IOException {
        ServerSocket serverSocket = new ServerSocket(PORT);
        System.out.println("Broker is running on port " + PORT);

        while (true) {
            try {
                Socket clientSocket = serverSocket.accept();
                System.out.println("New client connected: " + clientSocket);
                new ClientHandler(clientSocket, topicManager).start();
            } catch (IOException e) {
                System.err.println("Error accepting client connection: " + e.getMessage());
            }
        }
    }
}

Starting the Server:

ServerSocket serverSocket = new ServerSocket(PORT);
System.out.println("Broker is running on port " + PORT);

Creates a server socket → Listens for incoming connections on PORT 9092.

Handling Clients (Producers & Consumers)

while (true) {
    try {
        Socket clientSocket = serverSocket.accept();
        System.out.println("New client connected: " + clientSocket);
        new ClientHandler(clientSocket, topicManager).start();
    } catch (IOException e) {
        System.err.println("Error accepting client connection: " + e.getMessage());
    }
}

serverSocket.accept() → Waits for a client (producer/consumer) to connect. Creates a new thread (ClientHandler) to handle that client.

Why use threads?

Each client needs independent processing. So, we run a new ClientHandler for each client.

ClientHandler Class (Handles Each Client)

Each connected client (producer/consumer) is handled by this class.

This ClientHandler class is a thread that handles communication between a client and a broker server.

public class ClientHandler extends Thread {
    private final Socket socket;
    private final TopicManager topicManager;

    public ClientHandler(Socket socket, TopicManager topicManager) {
        this.socket = socket;
        this.topicManager = topicManager;
    }

    public void run() {
        try (BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
             PrintWriter out = new PrintWriter(socket.getOutputStream(), true)) {

            out.println("CONNECTED: Welcome to KafkaLikeBroker");

            String input;
            String currentGroupId = null;
            String currentConsumerId = null;

            while ((input = in.readLine()) != null) {
                if (input.startsWith("REGISTER:")) {
                    String[] parts = input.split(":", 3);
                    currentGroupId = parts[1];
                    currentConsumerId = parts[2];

                    // Add consumer to group and trigger rebalance
                    topicManager.consumerGroupAssignments
                            .computeIfAbsent(currentGroupId, k -> new ConcurrentHashMap<>())
                            .putIfAbsent(currentConsumerId, new ArrayList<>());

                    out.println("REGISTERED: Consumer " + currentConsumerId + " in group " + currentGroupId);
                } else if (input.startsWith("SUBSCRIBE:")) {
                    String[] parts = input.split(":", 2);
                    if (parts.length < 2) {
                        out.println("ERROR: Invalid subscribe format. Use SUBSCRIBE:topic");
                        continue;
                    }

                    if (currentGroupId == null || currentConsumerId == null) {
                        out.println("ERROR: Consumer must be registered before subscribing");
                        continue;
                    }

                    String topic = parts[1];

                    // Handle subscription and trigger rebalance
                    topicManager.subscribeToTopic(topic, currentGroupId, currentConsumerId);

                    out.println("SUBSCRIBED: to topic " + topic + " with group " + currentGroupId);

                } else if (input.startsWith("PRODUCE:")) {
                    String[] parts = input.split(":", 4);
                    if (parts.length < 4) {
                        out.println("ERROR: Invalid produce format. Use PRODUCE:topic:key:message");
                        continue;
                    }
                    String topic = parts[1];
                    String key = parts[2];
                    String message = parts[3];

                    topicManager.addMessage(topic, key, message);
                    out.println("ACK: Message stored in topic " + topic);
                } else if (input.startsWith("CONSUME:")) {
                    String[] parts = input.split(":", 4);
                    if (parts.length < 4) {
                        out.println("ERROR: Invalid consume format. Use CONSUME:topic:groupId:consumerId");
                        continue;
                    }
                    String topic = parts[1];
                    String groupId = parts[2];
                    String consumerId = parts[3];

                    String message = topicManager.consumeMessage(topic, groupId, consumerId);
                    out.println("MESSAGE: " + (message != null ? message : "NO_MESSAGES"));
                } else if (input.startsWith("DISCONNECT:")) {
                    String[] parts = input.split(":", 3);
                    if (parts.length >= 3) {
                        currentGroupId = parts[1];
                        currentConsumerId = parts[2];
                    }
                    out.println("DISCONNECTED: Goodbye!");
                    socket.close();
                    break;
                } else if(input.startsWith("CREATE-TOPIC:")) {
                    String[] parts = input.split(":", 2);

                    String topicName = parts[1];
                    System.out.println("TOPIC NAME from createTopic: "+topicName);
                    topicManager.createTopic(topicName);
                    out.println("ACK: TOPIC CREATED");
                } else if(input.startsWith("LIST-TOPIC:")) {
                    String topicMetaData = topicManager.listTopics();
                    if (topicMetaData.isEmpty()) {
                        out.println("TOPIC-METADATA: No topics available");
                    } else {
                        out.println("TOPIC-METADATA: " + topicMetaData);
                    }
                } else {
                    out.println("ERROR: Unknown command");
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

Lets discuss each part in details of this ClientHandler class:

Constructor

public ClientHandler(Socket socket, TopicManager topicManager) {
    this.socket = socket;
    this.topicManager = topicManager;
}

Whenever we create a object fron this instance it stores socket (connection) and topicManager (manages topics/messages).

Handling Client Requests

try (BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
     PrintWriter out = new PrintWriter(socket.getOutputStream(), true)) {
  • in (BufferedReader) → Reads incoming data from the client via the socket's input stream. It wraps an InputStreamReader, which converts byte streams into character streams, making it easier to handle text-based communication.
  • out (PrintWriter) → Sends data to the client via the socket's output stream. The true parameter enables automatic flushing, ensuring that messages are immediately sent without requiring an explicit flush() call. This makes communication more efficient, especially in interactive applications.

Understanding Commands

In our Kafka implementation, the producer and consumer will communicate with the broker using a command-based protocol. Each client (producer or consumer) sends commands in a structured format:

COMMAND:argument1:argument2:...
  • COMMAND → Specifies the action to be performed, such as PUBLISH, SUBSCRIBE, or FETCH.
  • argument1, argument2, ... → Provide additional details required for the command, such as the topic name, message payload, or consumer group ID.

This format ensures a standardized and efficient way for clients to interact with the broker, making message transmission and processing straightforward.

For each command, the TopicManager handles the underlying operations behind the scenes. It processes the command, manages topics, and ensures proper message flow between producers, consumers, and the broker.

  • When a producer sends a message, TopicManager routes it to the appropriate topic.
  • When a consumer subscribes or fetches messages, TopicManager retrieves the relevant data efficiently.

Register Consumer

if (input.startsWith("REGISTER:")) {
    String[] parts = input.split(":", 3);
    currentGroupId = parts[1];
    currentConsumerId = parts[2];

    topicManager.consumerGroupAssignments
        .computeIfAbsent(currentGroupId, k -> new ConcurrentHashMap<>())
        .putIfAbsent(currentConsumerId, new ArrayList<>());

    out.println("REGISTERED: Consumer " + currentConsumerId + " in group " + currentGroupId);
}

A consumer registers itself to a consumer group using "REGISTER:" + groupId + ":" + consumerId this command. The broker stores this consumer inside consumerGroupAssignments.

Subscribe to a Topic

A consumer subscribes to a topic using "SUBSCRIBE:" + topic command. The broker associates this consumer with the topic.

if (input.startsWith("SUBSCRIBE:")) {
    String[] parts = input.split(":", 2);
    if (currentGroupId == null || currentConsumerId == null) {
        out.println("ERROR: Consumer must be registered before subscribing");
        continue;
    }
    String topic = parts[1];
    topicManager.subscribeToTopic(topic, currentGroupId, currentConsumerId);
    out.println("SUBSCRIBED: to topic " + topic + " with group " + currentGroupId);
}

Produce a Message

A producer sends a message to a topic using PRODUCE:topic:key:message this command.

if (input.startsWith("PRODUCE:")) {
    String[] parts = input.split(":", 4);
    String topic = parts[1];
    String key = parts[2];
    String message = parts[3];

    topicManager.addMessage(topic, key, message);
    out.println("ACK: Message stored in topic " + topic);
}

Consume a Message

A consumer requests a message from a topic using "CONSUME:subscribedTopic:groupId:consumerId.

if (input.startsWith("CONSUME:")) {
    String[] parts = input.split(":", 4);
    String topic = parts[1];
    String groupId = parts[2];
    String consumerId = parts[3];

    String message = topicManager.consumeMessage(topic, groupId, consumerId);
    out.println("MESSAGE: " + (message != null ? message : "NO_MESSAGES"));
}

Create a Topic

if (input.startsWith("CREATE-TOPIC:")) {
    String[] parts = input.split(":", 2);
    String topicName = parts[1];
    topicManager.createTopic(topicName);
    out.println("ACK: TOPIC CREATED");
}

KafkaConsumer Class

The KafkaConsumer is responsible for:

  • Connecting to the broker.
  • Subscribing to a topic.
  • Polling messages from the broker.
  • Handling connection and disconnection.

Our consumer class mainly sends REGISTER, SUBSCRIBE, CONSUME, DISCONNECT command to a broker.

  1. Fields (Instance Variables)
private Socket socket;
private PrintWriter out;
private BufferedReader in;
private final String consumerId;
private final String groupId;
private String subscribedTopic;
private static final int DEFAULT_POLL_TIMEOUT = 1000;

Explanation of Each Variable:

  • socket → Represents the TCP connection between the consumer and the broker, enabling communication over the network.
  • out → A PrintWriter used for sending commands and messages from the consumer to the broker.
  • in → A BufferedReader used to receive and process responses from the broker.
  • consumerId → A unique identifier assigned to each consumer, typically generated using a UUID, ensuring distinct identification within the system.
  • groupId → Specifies the consumer group to which this consumer belongs, allowing multiple consumers to coordinate message consumption.
  • subscribedTopic → Stores the name of the topic that the consumer is subscribed to, determining which messages it will receive.
  • DEFAULT_POLL_TIMEOUT → Defines the default time (in milliseconds) a consumer waits while polling for new messages from the broker before timing out.

Constructor

public KafkaConsumer(String groupId) {
    this.consumerId = UUID.randomUUID().toString();
    this.groupId = groupId;
}

While creating instance of this KafkaConsumer, We pass unique ID (UUID) and groupId which specified to support consumer groups.

Connecting to broker

public void connect(String host, int port) {
    try {
        socket = new Socket(host, port);
        out = new PrintWriter(socket.getOutputStream(), true);
        in = new BufferedReader(new InputStreamReader(socket.getInputStream()));

        // Send consumer registration
        out.println("REGISTER:" + groupId + ":" + consumerId);
        String response = in.readLine();
        System.out.println("Broker Response: " + response);
    } catch (IOException e) {
        e.printStackTrace();
    }
}

connect method establishes a TCP connection with the broker. It sends a REGISTER request with this command REGISTER:<groupId>:<consumerId> to a broker.

Subscribing to a Topic

public void subscribe(String topic) {
    if (socket == null || out == null) {
        System.out.println("ERROR: Consumer is not connected to broker!");
        return;
    }

    out.println("SUBSCRIBE:" + topic);
    try {
        String response = in.readLine();
        System.out.println("Subscription Response: " + response);
    } catch (IOException e) {
        System.out.println("ERROR: Failed to subscribe: " + e.getMessage());
        return;
    }

    this.subscribedTopic = topic;
}

This method Sends a SUBSCRIBE request (SUBSCRIBE:<topic>). Broker responds to confirm the subscription.

Polling a message

public List<String> poll(long timeoutMs) {
    if (subscribedTopic == null) {
        throw new IllegalStateException("No topic subscribed. Please subscribe to a topic first.");
    }

    if (socket == null || out == null) {
        throw new IllegalStateException("Consumer is not connected to broker!");
    }

    List<String> records = new ArrayList<>();
    long startTime = System.currentTimeMillis();

    try {
        while (System.currentTimeMillis() - startTime < timeoutMs) { // It continue to read message from broker untill timeoutMs
            out.println("CONSUME:" + subscribedTopic + ":" + groupId + ":" + consumerId);
            String response = in.readLine();

            if (response != null && !response.contains("NO_MESSAGES")) {
                String message = response.replace("MESSAGE: ", "");
                records.add(message);
            } else {
                Thread.sleep(100);
            }
        }
    } catch (IOException | InterruptedException e) {
        e.printStackTrace();
    }

    return records;
}

The poll method is a message consumption mechanism that fetches messages from a broker. It continuously requests messages from a subscribed topic within a given timeout period. If no messages are available, it waits briefly (100ms) and retries until the timeout expires.

disconnect

Sends a DISCONNECT message. Closes the socket.

public void disconnect() {
    try {
        if (out != null) {
            out.println("DISCONNECT:" + groupId + ":" + consumerId);
        }
        if (socket != null) socket.close();
        System.out.println("Consumer disconnected from broker.");
    } catch (IOException e) {
        e.printStackTrace();
    }
}

Here is the full implementation of consumer class:

public class KafkaConsumer {
    private Socket socket;
    private PrintWriter out;
    private BufferedReader in;
    private final String consumerId;
    private final String groupId;
    private String subscribedTopic;
    private static final int DEFAULT_POLL_TIMEOUT = 1000; // milliseconds
    public KafkaConsumer(String groupId) {
        this.consumerId = UUID.randomUUID().toString();
        this.groupId = groupId;
    }
    public void connect(String host, int port) {
        try {
            socket = new Socket(host, port);
            out = new PrintWriter(socket.getOutputStream(), true);
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));

            // Send consumer registration
            out.println("REGISTER:" + groupId + ":" + consumerId);
            String response = in.readLine();
            System.out.println("Broker Response: " + response);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public void subscribe(String topic) {
        if (socket == null || out == null) {
            System.out.println("ERROR: Consumer is not connected to broker!");
            return;
        }

        // Send SUBSCRIBE command to broker - this is the key change
        out.println("SUBSCRIBE:" + topic);
        try {
            String response = in.readLine();
            System.out.println("Subscription Response: " + response);
        } catch (IOException e) {
            System.out.println("ERROR: Failed to subscribe: " + e.getMessage());
            return;
        }

        this.subscribedTopic = topic;
    }
    public List<String> poll() {
        return poll(DEFAULT_POLL_TIMEOUT);
    }
    public List<String> poll(long timeoutMs) {
        if (subscribedTopic == null) {
            throw new IllegalStateException("No topic subscribed. Please subscribe to a topic first.");
        }

        if (socket == null || out == null) {
            throw new IllegalStateException("Consumer is not connected to broker!");
        }

        List<String> records = new ArrayList<>();
        long startTime = System.currentTimeMillis();

        try {
            while (System.currentTimeMillis() - startTime < timeoutMs) {
                out.println("CONSUME:" + subscribedTopic + ":" + groupId + ":" + consumerId);
                String response = in.readLine();

                if (response != null && !response.contains("NO_MESSAGES")) {
                    String message = response.replace("MESSAGE: ", "");
                    records.add(message);
                } else {
                    // If no messages, wait a bit before trying again
                    Thread.sleep(100);
                }
            }
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }
        return records;
    }
    public void disconnect() {
        try {
            if (out != null) {
                out.println("DISCONNECT:" + groupId + ":" + consumerId);
            }
            if (socket != null) socket.close();
            System.out.println("Consumer disconnected from broker.");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    // Getter for consumerId
    public String getConsumerId() {
        return consumerId;
    }
}

KafkaProducer Class

The KafkaProducer is responsible for:

  • Connecting to the broker.
  • Sending messages to a topic.
  • Creating topics.
  • Listing available topics.
  • Handling connection retries.

Our producer class mainly sends PRODUCE, CREATE-TOPIC, DISCONNECT command to a broker.

Fields (Instance Variables) of KafkaProducer

private static final String HOST = "localhost";
private static final int PORT = 9092;
private static final int DEFAULT_RETRIES = 3;
private static final long RETRY_BACKOFF_MS = 1000;

private final Properties properties;
private Socket socket;
private PrintWriter out;
private BufferedReader in;
private boolean connected = false;

Explanation of Each Variable:

  • HOST & PORT: Defaults to localhost:9092.
  • DEFAULT_RETRIES: Max retries for connection.
  • RETRY_BACKOFF_MS: Delay before retrying.
  • properties: Stores configuration settings.
  • connected: Tracks connection status.

Connecting to the Broker connect method helps us in connecting to a broker. If it fails then retries connection 3(DEFAULT_RETRIES) times before failing.

public void connect() throws IOException {
    if (connected) return;

    int retries = 0;
    while (retries < DEFAULT_RETRIES) {
        try {
            socket = new Socket(HOST, PORT);
            out = new PrintWriter(socket.getOutputStream(), true);
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));

            String welcome = in.readLine();
            if (welcome != null && welcome.startsWith("CONNECTED")) {
                connected = true;
                System.out.println("Connected to KafkaLikeBroker");
                return;
            }
        } catch (IOException e) {
            retries++;
            Thread.sleep(RETRY_BACKOFF_MS);
        }
    }
}

Sending a message This method sends a PRODUCE request (PRODUCE:<topic>:<key>:<message>). key is required here as it helps partition strategy(which partition to send).

public boolean send(String topic, String key, String message) throws IOException {
   if (!connected) throw new IllegalStateException("Producer is not connected to broker");
   if (topic == null || topic.trim().isEmpty()) throw new IllegalArgumentException("Topic cannot be null or empty");
   if (key == null || key.trim().isEmpty()) throw new IllegalArgumentException("Key cannot be null or empty");

   try {
       out.println("PRODUCE:" + topic + ":" + key + ":" + message);
       String response = in.readLine();
       return response != null && response.startsWith("ACK");
   } catch (IOException e) {
       reconnect();
       return send(topic, key, message);
   }
}

Creating a Topic

Sends a CREATE-TOPIC request (CREATE-TOPIC:<topicName>).

public boolean createTopic(String topicName) throws IOException {
    out.println("CREATE-TOPIC:" + topicName);
    String response = in.readLine();
    return response != null && response.startsWith("ACK");
}

disconnect method

public void disconnect() {
    out.println("DISCONNECT");
    connected = false;
}

Here is the full implementation of KafkaConsumer class

public class KafkaProducer {
    private static final String HOST = "localhost";
    private static final int PORT = 9092;
    private static final int DEFAULT_RETRIES = 3;
    private static final long RETRY_BACKOFF_MS = 1000;

    private final Properties properties;
    private Socket socket;
    private PrintWriter out;
    private BufferedReader in;
    private boolean connected = false;

    public KafkaProducer() {
        this(new Properties());
    }

    public KafkaProducer(Properties properties) {
        this.properties = properties;
    }

    public void connect() throws IOException {
        if (connected) {
            return;
        }

        int retries = 0;
        while (retries < DEFAULT_RETRIES) {
            try {
                socket = new Socket(HOST, PORT);
                out = new PrintWriter(socket.getOutputStream(), true);
                in = new BufferedReader(new InputStreamReader(socket.getInputStream()));

                String welcome = in.readLine();
                if (welcome != null && welcome.startsWith("CONNECTED")) {
                    connected = true;
                    System.out.println("Connected to KafkaLikeBroker");
                    System.out.println(welcome);
                    return;
                }
            } catch (IOException e) {
                retries++;
                if (retries >= DEFAULT_RETRIES) {
                    throw new IOException("Failed to connect after " + DEFAULT_RETRIES + " attempts", e);
                }
                try {
                    Thread.sleep(RETRY_BACKOFF_MS);
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    throw new IOException("Connection interrupted", ie);
                }
            }
        }
    }

    public boolean send(String topic, String key, String message) throws IOException {
        if (!connected) {
            throw new IllegalStateException("Producer is not connected to broker");
        }

        if (topic == null || topic.trim().isEmpty()) {
            throw new IllegalArgumentException("Topic cannot be null or empty");
        }

        if (key == null || key.trim().isEmpty()) {
            throw new IllegalArgumentException("Key cannot be null or empty");
        }

        try {
            // Send the message with key
            out.println("PRODUCE:" + topic + ":" + key + ":" + message);

            // Wait for acknowledgment
            String response = in.readLine();
            if (response != null && response.startsWith("ACK")) {
                System.out.println("Message sent successfully: " + response);
                return true;
            } else {
                System.err.println("Failed to send message: " + response);
                return false;
            }
        } catch (IOException e) {
            System.err.println("Error sending message: " + e.getMessage());
            reconnect();
            out.println("PRODUCE:" + topic + ":" + key + ":" + message);
            String response = in.readLine();
            return response != null && response.startsWith("ACK");
        }
    }

    public boolean createTopic(String topicName) throws IOException {
        if (!connected) {
            throw new IllegalStateException("Producer is not connected to broker");
        }

        if (topicName == null || topicName.trim().isEmpty()) {
            throw new IllegalArgumentException("Topic cannot be null or empty");
        }

        try {
            // Send the message with key
            out.println("CREATE-TOPIC:" + topicName);

            // Wait for acknowledgment
            String response = in.readLine();
            if (response != null && response.startsWith("ACK")) {
                System.out.println("TOPIC CREATED: " + response);
                return true;
            } else {
                System.err.println("Failed to CREATE: " + response);
                return false;
            }
        } catch (IOException e) {
            System.err.println("Error sending message: " + e.getMessage());
            reconnect();
            out.println("CREATE-TOPIC:" + topicName);
            String response = in.readLine();
            return response != null && response.startsWith("ACK");
        }
    }

    public void listTopics() throws IOException {
        if (!connected) {
            throw new IllegalStateException("Producer is not connected to broker");
        }

        out.println("LIST-TOPIC:");

        String response = in.readLine();
        if (response != null && response.startsWith("TOPIC-METADATA:")) {
            System.out.println("Available Topics: " + response.substring(15));
        } else {
            System.err.println("Failed to list topics: " + response);
        }
    }

    private void reconnect() throws IOException {
        disconnect();
        connect();
    }

    public void disconnect() {
        if (!connected) {
            return;
        }

        try {
            if (out != null) {
                out.println("DISCONNECT");
                String response = in.readLine();
                System.out.println(response);
            }
        } catch (IOException e) {
            System.err.println("Error during disconnect: " + e.getMessage());
        } finally {
            connected = false;
            try {
                if (socket != null) socket.close();
                if (out != null) out.close();
                if (in != null) in.close();
            } catch (IOException e) {
                System.err.println("Error closing resources: " + e.getMessage());
            }
            System.out.println("Disconnected from broker.");
        }
    }
}

We have now implementaed all components of kafka. We can test our implementation of full working bu starting broker , consumer and producer class.

Summary

In this blog, we explored the internal workings of Kafka, exploring into how it operates behind the scenes. We discussed the roles and functionalities of key components such as the broker, consumer, and producer, explaining how they interact to enable efficient message streaming.

Additionally, we examined the concepts of topics and partitions, detailing how messages are stored and managed within Kafka. To get practical our understanding, we also built a simplified Kafka-like application, demonstrating the core principles in action.

However, due to space constraints, we were unable to cover important topics like replication and permanent storage in this blog. These are critical aspects of Kafka that ensure data durability, fault tolerance, and high availability. We will cover these topics in detail in a future blog post. Stay tuned!

You can get the full working code on GitHub