Summary of Broker Principle of Kafka Knowledge Summary


This article introduces [Kafka] ‘s Broker workflow, including the controller election process; Kafka replica leader election and leader and follower failure processes; briefly describes how to adjust partition replicas in the production environment; Kafka’s file storage mechanism and log files Delete strategy; finally understand the principles of page buffering and zero copy used in kafka.

1. Workflow

This part has a general understanding of the workflow of kafka Broker, and the role of zookeeper in the work of kafka broker. Those important data are stored in zookeeper.

1.1. The role of zookeeper

Zookeeper plays an important role in kafka. Kafka uses zookeeper for metadata management, saves broker registration information, including topic (Topic), partition (Partition) information, etc., selects the partition leader, and the offset information of low-version kafka consumers is also Will be saved in zookeeper. As shown in the figure, use the zookeeper client PrettyZoo to view the content as follows:
Let’s take a look at the role of some important nodes.

Broker node

The brokers node information stored by kafka on zookeeper is as follows:
Among them /brokers/ids/[0...n]: the information of each service node is stored online using temporary nodes, and it is automatically deleted after offline;

  "listener_security_protocol_map" : {
  "endpoints" : [ "PLAINTEXT://" ],
  "jmx_port" : -1,
  "features" : { },
  "host" : "",
  "timestamp" : "1646557610504",
  "port" : 9091,
  "version" : 5

/brokers/seqid: The auxiliary generated brokerId. When the user does not configure, ZK will automatically generate a globally unique id.

/brokers/topics/{topicName}: Persistent data nodes store topic’s partition copy allocation information. The contents of the and queue /brokers/topics/{topicName}//partitions/0/stateare recorded in .leader

  "controller_epoch" : 5,
  "leader" : 3,
  "version" : 1,
  "leader_epoch" : 4,
  "isr" : [ 2, 3 ]

where _transaction_offsetsis the node where the transaction is stored.

Consumers node

Before version 0.9, it was used to save offset information, and after version 0.9, the offset was stored in the kafka topic.

Controller and /Controller_epoch nodes

/controller: Save the brokerId information corresponding to the controller (the leader of the broker, the leader here should be distinguished from the leader of the replica, the leader here is the leader of all brokers in the kafka cluster), etc.

/controller_epoch: This is used to solve the split-brain problem, and stores an integer value (epoch number, also known as isolation token)
other nodes

/config/topics: stores dynamically modified topic-level configuration information

/config/clients: Stores dynamically modified client-level configuration information

/config/changes: Store the corresponding information when dynamically modifying the configuration

/admin/delete_topics: Save the information of the topic to be deleted when the topic is deleted

/isr_change_notification: Save the corresponding path to notify when the Kafka replica ISR list changes

1.2. Leader election of kafka broker

1.2.1 Types of leader election

Here we need to clarify a concept leader election, because kafka involves multiple election mechanisms, which is easy to confuse. Kafka involves elections in three aspects:

  • The broker (controller) selects the leader
  • Partition multi-copy leader election
  • Consumers choose leader

This article will describe the process of broker selection of leader and partition selection of leader. Later, when consumers are referred to, they will talk about the process of consumer selection of leaders.

1.2.2 Broker election process

There are many brokers (also called controllers) in a kafka cluster, but one leader needs to be elected among them, and the others are followers. The leader of the broker has a very important role, such as: creating, deleting topics, adding partitions and assigning leader partitions; cluster broker management, including adding, closing and failure handling; partition redistribution ( auto.leader.rebalance.enable=true, will be introduced later); partition leader election.

Let’s take a look at Broker’s leader election process and fault handling
The figure can be roughly described as follows:

  1. The first broker started in the cluster will create a temporary node/controller in zookeeper to make itself the controller. When other brokers are started, they will also create temporary nodes/controllers in zookeeper, but they will find that the node already exists. When an exception is received, a watch object will be created in zookeeper to facilitate these brokers to accept the leader’s change messages;
  2. If the main leader disconnects from zookeeper due to network problems or exits abnormally, other brokers can receive the change notification of the controller through watch and start to try to create a temporary node/controller. If a broker is successfully created, it will As mentioned above, other brokers will also receive abnormal notifications, which means that the leader of the broker in the cluster has been determined, and other brokers can only create watch objects.
  3. The leader of the broker in the cluster exits abnormally. After a new leader is elected, it will check whether there is a leader of the partition copy on the abnormal broker. If there is, initiate the leader election of the partition, elect a new partition leader, and then update the ISR. queue data

1.2.3. The split-brain problem

What is a split brain

Speaking of split-brain, what is split-brain? Literally, it means that it is divided into two and has multiple heads. This problem is easy to occur in the case of high availability of distributed systems. In short, the leader is in a suspended state due to network or other reasons. At this time, the leader election will be triggered, so that two leaders will appear and a series of question.

The leader of the kafka broker is equivalent to the master of the entire kafka cluster, responsible for many important tasks (mentioned above, not repeated here). The broker registers the temporary node/controller in zookeeper by preemption. First come first served. Since the validity of the temporary node of zookeeper is judged by the session, if the broker where the controller is located is disconnected within the session timeout period, a re-election will be triggered.

So what happens when a split brain occurs?

As can be seen from the above, the leader of the broker is mainly used to manage topics. Those operations that create topics and add partitions after split-brain will report errors; however, the read and write of existing topics will not be affected, because read and write It means that the metadata of the partition can be obtained in any broker.

A split-brain situation?

The time for the broker’s leader to perform GC exceeds the zookeeper session timeout; the broker’s leader has a network failure.

kafka solution

In order to solve the controller split-brain problem, there is a persistent node / controller_epoch in zookeeper, which stores an epoch number (epoch number, also known as isolation token) of an integer value. Every time a controller is elected in the cluster, it will be created by Zookeeper An epoch number with a larger value. If a broker receives data smaller than this epoch value, the message will be ignored.

1.2.4. Herding effect

In the early kafka version, if there are many Partitions on the Broker that is down, it will cause multiple Watches to be triggered, resulting in a large number of adjustments in the cluster, resulting in a large number of network congestion, this herd effect will lead to the hidden danger of overloading zookeeper . After that, Kafka has a concept of controller (that is, the leader of the broker) to manage the state of the partition copy. When the leader copy of a partition fails, the controller is responsible for electing a new leader copy for the partition. When a change in the ISR set of a partition is detected, the controller is responsible for notifying all brokers to update their metadata information.

In a distribution using zookeeper, both such split-brain and herding effects are unavoidable.

1.3. Trigger leader election

In the current kakfa cluster, only the leader of the broker needs zookeeperto register the corresponding listener, and the others brokerrarely monitor the data changes of the zookeeper, but each broker still needs to /controllermonitor; when the /controllernode data changes, each Each broker will update the stored in its own memory activeControllerId.

When the /controllernode is deleted, the brokers in the cluster will be elected. If the broker is the controller before the node is deleted, there needs to be an offline operation before the election (close the corresponding resources, such as closing the state machine, logging out the corresponding listener, etc.). If there are special needs, you can manually delete /controllernodes to trigger a new round of elections. Of course, closing the broker corresponding to the controller and manually /controllerwriting new corresponding data to the node brokeridcan also trigger a new round of election.

2. Leader and follower

2.1. Replica of kafka

kafkaThe role of replicas is to improve the reliability of data. The default number of replicas in the system is 1, and the general configuration number in the production environment is 2 to ensure data reliability; otherwise, too many replicas will increase the storage space of the disk, increase the data transmission on the network, and reduce the efficiency.

kafkaThe replica is divided into leaderand follower, in which leaderdata is read and written, and followerit is only responsible for data synchronization. There are three concepts about replicas:

  • ISR: Represents and leaderkeeps a synchronized followercollection
  • OSR: Indicates a replica followerwith too much delay from synchronizationleader
  • AR: All replicas in the partition are collectively referred to as AR(Assigned Replicas), AR = ISR + OSR, the AR set of a partition is specified at the time of allocation, and as long as no reallocation occurs, the order of internal replicas in the set remains unchanged, while the partition’s ISR The order of replicas in the collection may change.

It was ISRalso introduced in the previous article. If there is no communication request or synchronization data followerto be sent for a long time , this will be put out of the queue. This time threshold is set by parameters, and the default is .leader.lag.time.max.ms30s

If the leadersend fails ISR, a new one is elected from it leader.

2.2. Leader election process

leaderThe election of the partition is responsible for the specific implementation kafkaof the broker leader(later article will be described as an controlleralternative ).broker leader

A leader is required when creating a partition (creating a topic or adding a partition has the action of creating a partition) or when the partition goes online (for example, the original leader copy in the partition goes offline, and the partition needs to elect a new leader to go online to provide services to the outside world). election. During the election, the first surviving replica will be searched from the order of replicas in the AR set, and this replica will be guaranteed to be in the ISR queue.

In addition, when the partition is redistributed (described below), it is also necessary to perform leader election. At this time, the first surviving replica is found from the redistributed AR list, and this replica is in the current ISR queue.

Furthermore, when a borrower node is shut down, the leader copy located on this node will be offline, so the corresponding partition needs to perform leader election. At this point, the first surviving replica will be found from the AR list, and this replica is in the current ISR list, and also ensure that this replica is not on the node that is being shut down.

2.3. Unclean leader election

kafka also provides a parameter configuration: unclean.leader.election.enable, the default is true, the parameter specifies whether to allow non-ISR replicas to become leaders, if set to true, when the ISR queue is empty, if the ISR is empty, it means that both the leader and the follower have died. Select those replicas that are not in the ISR queue as the new leader. The messages written to the replicas may lag far behind the leader, so there may be a risk of data loss. In the production environment, it is recommended to turn off this parameter and set it to false.

2.4. Leader and follower failure process

2.4.1. LEO and HW

Follower and leader failures may occur in a production environment, so how does Kafka handle these failures? The following is a brief introduction to the process. Before talking about the process, let’s first understand the two concepts of LEO and HW.

  • LEO (log end offset): The last offset of each copy, LEO is the latest offset+1
  • HW (high watermark): the smallest LEO among all replicas;

The concepts of LEO and HW are actually generated because the data is first written to the leader, and then the followers pull the data for synchronization, but the synchronization speed is inconsistent, and there will be problems in succession. The offset of the latter copy is different. At this time, Kafka will Use the smallest offset+1 among all replicas, also HW.

2.4.2. Follower failure process

At this point, what happens if the follower on Broker1 fails? First, the follower on Broker1 will be kicked out of the ISR queue, but the leader and other followers will continue to receive data and will not be affected, and the corresponding LEO and HW will move backward; if Broker1 fails at this time After the follower on Broker1 is recovered, the follower on Broker1 will read the last HW location recorded on the local disk, cut off the part of the log file higher than the HW, and synchronize from the HW to the Leader; until the follower on Broker1 If the LEO is greater than or equal to the HW of the partition, it means that the follower has caught up with the leader and will be added to the ISR queue again.

2.4.3. Leader failure process

The above has learned about the failure of the follower, so what if the leader fails? Looking at the above picture, first of all, if the leader on Broker0 fails, it will be kicked out of the ISR queue first, and then a new leader will be selected from the ISR; at this time, in order to ensure that between multiple copies other followers will first truncate the part of their log files higher than the HW, and then synchronize the data from the new leader (it can be seen that this can only guarantee the data consistency between replicas, and cannot guarantee that the data will not be lost. or not repeated).

2.5. Partition Replica Adjustment

From the previous article, we know that the replica distribution of partitions in the kafka cluster is as evenly distributed as possible to each node, so as to ensure that the read and write throughput of each machine is uniform, but some broker downtime occurs. This will cause leaders to be concentrated in several brokers, resulting in excessive read and write pressure, and even if the downed broker is restored, the original leader will become a follower and cannot share the pressure, resulting in an unbalanced cluster load.

2.5.1 Leader Partition Automatic Balance

In order to solve the above problems, Kafka has an automatic balancing mechanism. Kafka provides the following parameters for control:

  • auto.leader.rebalance.enable: auto leader paritionbalance, default is true
  • The ratio of unbalanced leaders allowed by each broker, the default is 10%, if it exceeds this value, the controller will trigger the balance of leaders
  • leader.imbalance.check.interval.seconds: The interval to check whether the leader load is balanced, the default is 300 seconds

However, this automatic balancing is not enabled in the production environment, because triggering the automatic balancing of the leader partition will deplete the performance, or you can increase value of the parameter that triggers the automatic balancing.

2.5.2 Manually adjust replica allocation

It will cause the performance of the server to be different, the lack of server disks or other reasons need to store more copies of the server nodes with good performance and large disk space, so how to manually adjust the distribution ratio of partition copies in the production environment?

Let’s create a test topic first:
The following demonstrates how to update the replica ratio between partitions. First, create a assign-replicas.jsonfile with the following contents:

    "version": 1,
    "partitions": [
        {"topic": "test-assign", "partition": 0, "replicas": [1, 2]},
        {"topic": "test-assign", "partition": 1, "replicas": [1, 2]},
        {"topic": "test-assign", "partition": 2, "replicas": [1, 2]}

Then execute the command:

bin/ --zookeeper localhost:2181 --reassignment-json-file assign-replicas.json --execute

bin/ --zookeeper localhost:2181 --reassignment-json-file assign-replicas.json --verify

Finally, look at the distribution of copies of this topic:

bin/ --describe --zookeeper localhost:2181 --topic test-assign

2.5.3. Increase the replication factor

In the production environment, because the importance level of a topic needs to be increased, consider adding copies. The following shows how to add copies

Create a Jsonfile:add-relication-factor.json

    "version": 1,
    "partitions": [
        {"topic": "test-assign", "partition": 0, "replicas": [3, 2, 1]},
        {"topic": "test-assign", "partition": 1, "replicas": [1, 3, 2]},
        {"topic": "test-assign", "partition": 2, "replicas": [2, 1, 2]}

Execute the copy storage plan:

bin/ --zookeeper localhost:2181 --reassignment-json-file add-relicati
on-factor.json --execute

3. File storage mechanism

3.1. Storage structure

In Kafka, a topic is a logical concept, and a partition exists physically. Each partition corresponds to a log file, and the log file stores the data produced by the Producer. The data produced by the Producer will be continuously appended to the end of the log file. In order to prevent the log file from being too large and causing inefficient data positioning, Kafka adopts a sharding and indexing mechanism to divide each partition into multiple segments, and each segment includes .indexfiles, .logfiles, and .timeindexother files. These files are located in the folder, the file naming rule is: topic name + partition number.
The above figure corresponds to the file structure diagram of a certain topic. One topic corresponds to multiple partitions, and one partition corresponds to one log ( Log). If only one log file is used to record, this will cause the log to be too large, resulting in data location efficiency. Low, so kafka uses a sharding and indexing mechanism.

kafkaIntroduced log segmentation ( LogSegment), which divides the log into multiple smaller files; the log is stored in the configured log.dirsfolder, and each LogSegmentconsists of three files: offset index file ( .indexsuffix), timestamp Index file ( .timeindexsuffix) and message data file ( .logsuffix); note that there is also a leader-epoch-checkpointfile here, which saves the Leader Epochvalue of yes (to solve the need for copy data consistency).

The directory structure is as follows:

├── long-topic-0
    ├── 00000000000000000000.index
    ├── 00000000000000000000.log
    ├── 00000000000000000000.timeindex
    ├── 00000000000000000113.index
    ├── 00000000000000000113.log
    ├── 00000000000000000113.timeindex
    └── leader-epoch-checkpoint

Segment file name rules: the first segmentpart of the partition starts from 0, and each subsequent segmentfile name is the segmentlast message of the previous file offset, and ofssetthe maximum value is 64 bits ( longtype), 20 digits in character length, no digits are used. 0 padding.

logAfter the file is filled with 1G by default, log rollinga new segment ( segment) will be formed to record the message. The size of the segment depends on the log.segment.bytesparameter.

indexAnd timeindexthe file will be allocated a size of 10M when it is just used, and when it is done log rolling, it will be trimmed to the actual size, so see the size of the first few index files, only a few hundred K.

How to view the contents of the log file:

./bin/ --files 00000000000000000000.log  --print-data-log

3.2. Correspondence between log, timeindex and index files

When the logfile writes 4k log.index.interval.bytesdata (which can be set here), an index information will be written to the indexfile. Such an indexindex file is a sparse index, and it does not create index information for each log.

logThe log file is written sequentially, and generally consists of message+ actual offset+ position, while the data structure of the index file is composed of relative offset(4byte) + position(4byte).

When kafkaquerying a offsetcorresponding actual message, you can indexperform a binary search to obtain the nearest low-order bit offset, and then start from the offsetcorresponding low-order bit and search for the corresponding message backward positionfrom the actual logfile.
Timestamp index file, its function is to query messages in a certain time period, its data structure is: timestamp (8byte) + relative offset (4byte), if you want to use this index file, you must first find it through the time range The corresponding offset, and then go to the corresponding index file to find the position information, and finally traverse the log file, this process also needs to use the index index file.

3.3. File Cleanup Policy

It is known from the above that the message data will be continuously appended to the end of the log file, which will face a problem, that is, the log file is getting bigger and bigger, and the disk space is certain, then kakfa can pass the configuration log.cleanup.policyparameters at this time, the default is delete(delete, Directly delete unqualified log segments according to a certain retention policy LogSegment), the other is compact(compression, log compression is to retain the last message according to the key) two.

3.3.1. deleteDelete

The default log storage time in kafka is 7 days, and the storage time can be modified by adjusting the following parameters.

  • log.retention.hours: minimum priority hours, default 7 days
  • log.retention.minutes:minute
  • highest priority milliseconds
  • Responsible for setting the check cycle, the default is 5 minutes
  • Delay execution delete time
  • log.retention.bytes: When set to – 1, it means running the maximum value of reserved logs (equivalent to close); when set to 1G, it means the maximum value of log files

There are three specific log retention policies:

  • Based on the time policy
    , the log deletion task will periodically check whether there is a retention time in the current log file that exceeds the set threshold to find a set of log segment files that can be deleted; here, you need to pay attention to the priority of the log.retention parameter: > log.retention.minutes > log.retention.hours, by default only the log.retention.hoursparameter is configured, A value of 168 is 7 days.
    Deleting an expired log segment file is not simply calculated based on the modification time of the log segment file, but based on the largest timestamp in the log segment. First, query the timestamp index file corresponding to the log segment. , find the last index data of the timestamp index file, if the timestamp is greater than 0, take the value; otherwise, the latest modification time will be used.
    When deleting, first remove the log segment to be deleted from the jump table of the log segment maintained by the Log object to ensure that there are no threads to read these log segments; then all files corresponding to the log segment, including the index The suffixes are added to the files .deleted; finally, it is handed over to a delete-filenamed delay task to delete .deletedthese suffixed files. The default is to 1execute once a minute, which can file.delete.delay.msbe configured by.
  • Based on the log size policy
    , the log deletion task will periodically check whether the current log size exceeds the set threshold ( log.retention.bytes, the default is – 1, which means infinity) to find the set of log segment files that can be deleted.
  • Based on log start offset
    The policy is based on whether the start offset of the next log segment of the log segment baseOffsetis less than or equal to logStartOffset, and if so, the log segment can be deleted.
    Let me talk about it here logStartOffset. In general, the starting offset of the log file is logStartOffsetequal to the first log segment baseOffset, but this is not absolute. logStartOffsetThe value can be modified by DeleteRecordsRequestrequests , using kafka-delete-records.shscripts, log cleaning and truncation, etc. .

3.3.2. compactCompression

Log compaction keeps only the last version for the same keydistinct values. valueIf the application only cares about the keycorresponding latest valuevalue, you can enable Kafkathe corresponding log cleaning function, which Kafkawill periodically merge keythe message, and only keep the latest valuevalue.

3.4. Reasons for kafka to read and write efficiently

The reasons why kafka can read and write quickly are as follows:

  1. Kafka is a distributed cluster, which adopts partition mode and operates in parallel
  2. Sparse index is used for reading data, which can quickly locate consumption data
  3. sequential write to disk
  4. Page Buffering and Zero Copy

4. Page Buffering and [Zero Copy]

4.1. Page Buffering

In Kafka, it is widely used PageCache, which is one of the important factors that Kafka can achieve high throughput.

First look at the read operation. When a process wants to read the contents of a file on the disk, the operating system will first check whether the data page to be read is buffered in PageCachethe buffer, and if it exists, it will directly return the data to be read, which reduces the The operation for disk I/O; but if it is not found, the operating system will initiate a read request to the disk and store the read data page PageCachein , and then return the data to the process, just like using redis buffering. reason.

Then the write operation is the same as the read operation. If a process needs to write data to the disk, the operating system will check whether the data page PageCachealready exists in , if not PageCache, add the corresponding data page in , and then write the data to the corresponding data page. In addition, the modified data page also becomes a dirty page, and the operating system will write the data in the dirty page to the disk at an appropriate time to maintain data consistency.

4.2. Zero Copy

Zero-copy does not require no copying, but reduces the number of unnecessary copies, which is usually used in the IO read and write process. The general application IO process is as shown below, and it will go through four copies:

  • The data is DMA from the disk to the kernel’s Read Buffer;
  • Read Buffer in kernel mode to Buffer in user mode application layer
  • User mode Buffer to kernel mode Socket Buffer
  • Socket Buffer to NIC Buffer of network card

From the above process, we can know that copying between kernel mode and user mode is equivalent to performing two useless operations, and switching between them will also cost a lot of resources; when data is copied from disk to kernel cache (page cache) through DMA, in order to To reduce the performance loss of CPU copying, the operating system will share the kernel cache with the user layer, reducing a CPU copy process. At the same time, the read and write of the user layer will also directly access the shared storage, and the data itself is copied from the user layer to the socket cache. The process has also become a CPU copy process from kernel to kernel, which is faster. This is zero copy. The IO process is as follows.
The methods in Java’s JDK NIO transferTo()can implement zero-copy operations. This implementation depends on the underlying sendFile()implementation of the operating system:

public void transferTo(long position, long count, WritableByteChannel target);

The underlying call is:

#include <sys/socket.h>
ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count);

However, it should be noted that zero-copy has a great relationship with the bottom layer of the system, so whether zero-copy system calls can be made depends on whether the specific operating system is implemented.

Let’s take a look at the zero-copy example of Java nio:

import java.nio.channels.FileChannel;

public class ZeroCopy {

    public static void main(String[] args) {
        File source = new File("G:/");
        File target = new File("G:/");
        NioZeroCopy(source, target);

    public static void NioZeroCopy(File source, File target) {
        try (
                FileChannel sourceChannel = new FileInputStream(source).getChannel();
                FileChannel targetChannel = new FileOutputStream(target).getChannel();
        ) {
            for(long count = sourceChannel.size(); count > 0;) {
                long transfer = sourceChannel.transferTo(sourceChannel.position(), count, targetChannel);
                sourceChannel.position(sourceChannel.position() + transfer);
                count -= transfer;
        } catch (IOException e) {
            System.out.println( "Exception: " + e.getMessage());


Leave a Comment

Your email address will not be published. Required fields are marked *