Kafka Broker 详解

一、Broker 工作原理

1. ZK 中存储的 Kafka 信息

1
2
3
4
5
6
7
8
$ cd /opt/app/zookeeper-3.7.0
$ bin/zkCli.sh
[zk: localhost:2181(CONNECTED) 0] ls /kafka
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification]
[zk: localhost:2181(CONNECTED) 1] ls /kafka/brokers/ids
[0, 1, 2]
[zk: localhost:2181(CONNECTED) 2] get /kafka/controller
{"version":1,"brokerid":0,"timestamp":""}
  • 大致目录结构如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
kafka
├── admin
│   ├── delete_topics
├── brokers
│   ├── ids # 记录正常工作的服务器
│   │   ├── ...
│   ├── seqid
│   ├── topics
│   │   ├── test # 主题
│   │   │   ├── partitions
│   │   │   │   ├── 0
│   │   │   │   │   ├── state # 记录每一个主题下的分区对应的Leader和ISR队列等
│   │   │   │   ├── ...
│   │   ├── ...
├── cluster
│   ├── id
├── config
│   ├── brokers
│   ├── changes
│   ├── clients
│   ├── ips
│   ├── topics
│   │   ├── ...
│   ├── users
├── consumers # 0.9版本前保存offset信息(通信压力大,消费者每次消费都要通信);0.9版本后offset存储在kafka主题中
├── controller # 辅助Leader选举(仅辅助)
├── controller_epoch
├── feature
├── isr_change_notification
├── latest_producer_id_block
├── log_dir_event_notification

2. Broker 工作流程

流程

  • 首先,每台 Broker 节点在启动时向 ZK 注册,表示当前节点可用
  • Broker 中的 Controller 模块向 ZK 注册,第一个注册的将负责管理集群 Broker 的上下线,所有 Topic 的分区副本分配和 Leader 选举工作
  • 注册的 Controller 监听 Broker 节点的变化,若 Leader 副本所在节点宕机,注册的 Controller 会根据 ZK 中的信息重新选举 Leader,之后更新 ZK 信息
  • 注册的 Controller 进行 Leader 选举,选举规则为在 ISR 中存活为前提,按照 AR 中的顺序轮询(如 AR:[1,0,2], ISR:[2,0],则选 0)
  • 注册的 Controller 在 ZK 中维护 Leader、ISR 等信息
  • 未注册的 Controller 从 ZK 中同步信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 查看现有分区情况(Replicas即AR)
$ bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --topic test --describe
Topic: test TopicId: D7jWqskZSf-F-MC39chP5A PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: test Partition: 0 Leader: 1 Replicas: 2,0,1 Isr: 1,0,2
Topic: test Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: test Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,0,2
# 停止掉hadoop102的kafka
# 查看新的分区情况
$ bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --topic test --describe
Topic: test TopicId: D7jWqskZSf-F-MC39chP5A PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: test Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 0,2
Topic: test Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,2
Topic: test Partition: 2 Leader: 2 Replicas: 1,2,0 Isr: 0,2
# 开启hadoop102的kafka
# 查看新的分区情况
$ bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --topic test --describe
Topic: test TopicId: D7jWqskZSf-F-MC39chP5A PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: test Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 0,2,1
Topic: test Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,2,1
Topic: test Partition: 2 Leader: 2 Replicas: 1,2,0 Isr: 0,2,1

3. Broker 重要参数

参数名称 描述
replica.lag.time.max.ms 如果 Follower 长时间未向 Leader 发送通信请求或同步数据,就会被移除 ISR。默认 30s
auto.leader.rebalance.enable 自动 Leader 再平衡。默认 true
leader.imbalance.per.broker.percentage 每个 Broker 允许的不平衡的 Leader 的比率,超过该值则控制器会触发 Leader 再平衡。默认 10%
leader.imbalance.check.interval.seconds 检查 Leader 负载是否平衡的间隔时间。默认 300s
log.segment.bytes log 日志划分成 Segment 块的大小,默认 1G
log.index.interval.bytes 每当向 log 文件写入了 4KB 大小的日志,就往 index 文件里面记录一个索引。默认 4KB
log.retention.hours 数据保存的时间,默认 7 天
log.retention.minutes 数据保存的时间,分钟级别,优先级更高,默认关闭
log.retention.ms 数据保存的时间,毫秒级别,优先级最高,默认关闭
log.retention.check.interval.ms 检查数据是否超时的间隔,默认 5min
log.retention.bytes 若超过设置的所有日志总大小,删除最早的 Segment。默认 -1,表示无穷大
log.cleanup.policy 日志清理策略。默认 delete,表示所有数据启用删除策略;若设为 compact,表示所有数据启用压缩策略
num.io.threads 负责写磁盘的线程数,参数值要占总核数的 50%。默认 8
num.replica.fetchers 副本拉取线程数,参数值占总核数 50% 的 1/3
num.network.threads 数据传输线程数,参数值占总核数 50% 的 2/3。默认 3
log.flush.interval.messages 强制页缓存刷写到磁盘的条数。默认 long 的最大值,一般不改,交给系统管理
log.flush.interval.ms 每隔多久,刷数据到磁盘。默认 null,一般不改,交给系统管理

二、Kafka 副本

1. 副本概述

  • Kafka 默认 1 个副本,生产环境一般配置为 2 个,来保证数据可靠性。配置太多副本会增加磁盘存储空间,增加网络上的数据传输,降低效率
  • Kafka 中的副本分为 Leader 和 Follower,生产者和消费者只与 Leader 交互(Hadoop 副本等价),Follower 主动找 Leader 进行同步数据
  • Kafka 分区中的所有副本统称为 AR(Assigned Repllicas),且 AR = ISR + OSR
    • ISR:表示和 Leader 保持同步的 Leader 和 Follower 集合。如果 Follower 长时间(replica.lag.time.max.ms,默认 30s)未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR 到 OSR。Leader 发生故障之后,就会从 ISR 中选举新的 Leader
    • OSR:表示 Follower 与 Leader 副本同步时,延迟过多的副本

2. Leader 和 Follower 故障处理细节

  • LEO(Log End Offset):副本的最后一个 offset + 1
  • HW(High Watermark):所有副本中最小的 LEO
  • 消费者能见到的最大 offect 为 HW - 1

(1) Follower 故障处理细节

  • Follower 故障后从 ISR 队列中移除,期间 Leader 节点正常接收数据
  • 该 Follower 恢复后,读取本地磁盘记录的 HW,将 log 文件大于等于该 HW 的部分截取掉,从该 HW 开始向 Leader 同步
  • 等该 Follower 的 LEO 大于等于现在的 HW 时,重新加入 ISR 队列

(2) Leader 故障处理细节

  • Leader 故障后从 ISR 队列中移除,控制器从 ISR 中选出一个新的 Leader
  • 为保证多个副本之间的数据一致性(不保证数据不丢失或不重复),其余的 Follower 会先将各自 log 文件高于 HW 的部分截取掉,然后从新的 Leader 同步数据

3. 分区副本分配

  • 尽可能错开,保证负载均衡及数据可靠性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
$ bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --create --partitions 16 --replication-factor 3 --topic test2
$ bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --describe --topic test2
Topic: test2 TopicId: 6JMWRKElSUm_9ZAq7sMi3w PartitionCount: 16 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: test2 Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: test2 Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: test2 Partition: 2 Leader: 2 Replicas: 2,3,0 Isr: 2,3,0
Topic: test2 Partition: 3 Leader: 3 Replicas: 3,0,1 Isr: 3,0,1

Topic: test2 Partition: 4 Leader: 0 Replicas: 0,2,3 Isr: 0,2,3
Topic: test2 Partition: 5 Leader: 1 Replicas: 1,3,0 Isr: 1,3,0
Topic: test2 Partition: 6 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: test2 Partition: 7 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2

Topic: test2 Partition: 8 Leader: 0 Replicas: 0,3,1 Isr: 0,3,1
Topic: test2 Partition: 9 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: test2 Partition: 10 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
Topic: test2 Partition: 11 Leader: 3 Replicas: 3,2,0 Isr: 3,2,0

Topic: test2 Partition: 12 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: test2 Partition: 13 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: test2 Partition: 14 Leader: 2 Replicas: 2,3,0 Isr: 2,3,0
Topic: test2 Partition: 15 Leader: 3 Replicas: 3,0,1 Isr: 3,0,1

4. 调整分区副本分配

  • 添加新 Broker:
    • 克隆 hadoop101,修改 IP 和主机名为 hadoop104
    • 删除 Kafka 目录下的 datas 和 logs 目录(有整个集群的唯一标识),并修改 broker.id 配置
    • 启动 hadoop104 的 Kafka
  • 调整分区副本分配:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
## 之前创建的主题仍保留原有配置(不包含新节点)
$ bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --topic test --describe
Topic: test TopicId: D7jWqskZSf-F-MC39chP5A PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: test Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,1,0
Topic: test Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: test Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0

## 重新分配
# 1. 制作要操作主题的配置文件
$ vim topics-to-move.json
{
"topics": [
{"topic": "test"}
],
"version": 1
}
# 2. 生成负载均衡计划
$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop101:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[2,0,1],"log_dirs":["any","any","any"]},{"topic":"test","partition":1,"replicas":[0,1,2],"log_dirs":["any","any","any"]},{"topic":"test","partition":2,"replicas":[1,2,0],"log_dirs":["any","any","any"]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"test","partition":1,"replicas":[3,0,1],"log_dirs":["any","any","any"]},{"topic":"test","partition":2,"replicas":[0,1,2],"log_dirs":["any","any","any"]}]}
# 3. 创建副本存储计划配置文件(可手动调整)
$ vim increase-replication-factor.json
{"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"test","partition":1,"replicas":[3,0,1],"log_dirs":["any","any","any"]},{"topic":"test","partition":2,"replicas":[0,1,2],"log_dirs":["any","any","any"]}]}
# 4. 执行副本存储计划
$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop101:9092 --reassignment-json-file increase-replication-factor.json --execute
# 5. 验证副本存储计划
$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop101:9092 --reassignment-json-file increase-replication-factor.json --verify
Status of partition reassignment:
Reassignment of partition test-0 is complete.
Reassignment of partition test-1 is complete.
Reassignment of partition test-2 is complete.
Clearing broker-level throttles on brokers 0,1,2,3
Clearing topic-level throttles on topic test

## 查看结果
$ bin/kafka-topics.sh --bootstrap-server hadoop100:9092 --topic test --describe
Topic: test TopicId: D7jWqskZSf-F-MC39chP5A PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: test Partition: 0 Leader: 2 Replicas: 2,3,0 Isr: 0,2,3
Topic: test Partition: 1 Leader: 1 Replicas: 3,0,1 Isr: 1,0,3
Topic: test Partition: 2 Leader: 1 Replicas: 0,1,2 Isr: 1,0,2
  • 退役节点:
    • 对旧节点上的主题进行重新分配
    • 停止旧节点

5. Leader 再平衡

  • 正常情况下,Kafka 会自动把 Leader 均匀分散在各个机器上,来保证每台机器的读写吞吐量都是均匀的。但是如果某些 Broker 宕机,会导致 Leader 过于集中在几台 Broker 上,造成集群负载不均衡
  • Kafka 默认开启 Leader 负载平衡(auto.leader.rebalance.enable,默认 true,建议关闭)。每过 leader.imbalance.check.interval.seconds(默认 300s)时间间隔进行检查,当 Broker 超过其允许的不平衡 Leader 的比率(leader.imbalance.per.broker.percentage,默认 10%),控制器会触发 Leader 的平衡
1
2
3
4
5
6
7
8
9
$ bin/kafka-topics.sh --bootstrap-server hadoop100:9092 --describe --topic test3
Topic: test3 TopicId: 6JMWRKElSUm_9ZAq7sMi3w PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: test3 Partition: 0 Leader: 2 Replicas: 0,2,1 Isr: 2,1,0
Topic: test3 Partition: 1 Leader: 0 Replicas: 2,1,0 Isr: 0,1,2
Topic: test3 Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
# 假设集群中只有test3一个主题
# 对于Broker0节点,分区0的AR优先副本是0节点,但0节点却不是Leader节点,因此不平衡数加1,AR副本总数为3
# 所以Broker0节点的不平衡率为1/3>10%,需要再平衡
# 同理,Broker1不需要,Broker2需要

三、Kafka 文件存储

1. 文件存储机制

  • Topic 是逻辑上的概念,而 Partition 是物理上的概念。每个 Partition 对应于一个 log 文件(虚拟),该 log 文件中存储的就是 Producer 生产的数据,会被不断追加到该 log 文件末端
  • 为防止 log 文件过大导致数据定位效率低下,Kafka 采取了分片索引机制,将每个 Partition 分为多个 Segment,块大小由 log.segment.bytes(默认 1G)指定
  • 每个 Segment 包括:.log 日志文件、.index 偏移量索引文件和 .timeindex 时间戳索引文件等,文件以当前 Segment 的第一条消息的 offset 命名。这些文件位于一个文件夹下,文件夹命名规则为 <topic名称>-<分区号>,如 test-0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
$ ll data/
drwxr-xr-x. 2 root root 204 13:58 test-0/
drwxr-xr-x. 2 root root 204 13:58 test-1/
drwxr-xr-x. 2 root root 204 13:58 test-2/
......
$ ll data/test-0/
-rw-r--r--. 1 root root 10485760 14:28 00000000000000000000.index
-rw-r--r--. 1 root root 1486 15:19 00000000000000000000.log
-rw-r--r--. 1 root root 10485756 14:28 00000000000000000000.timeindex
-rw-r--r--. 1 root root 8 14:48 leader-epoch-checkpoint
-rw-r--r--. 1 root root 43 14:28 partition.metadata

$ cd data/test-0/
$ kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.index
Dumping ./00000000000000000000.index
offset: 0 position: 0
$ kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.log
Dumping ./00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 position: 0 size: 70 crc: 2351331103 ...
baseOffset: 1 lastOffset: 10 count: 10 position: 70 size: 191 crc: 437570606 ...
baseOffset: 11 lastOffset: 13 count: 3 position: 261 size: 100 crc: 1110070785 ...
......

2. 稀疏索引

  • index 为稀疏索引,大约每往 log 文件写入 4KB 数据(log.index.interval.bytes),就会往 index 文件写入一条索引
  • index 文件中保存的 offset 为相对值,这样能确保 offset 的值所占空间不会过大
  • 如何定位到 offset=600 的记录?

3. 文件清理策略

  • Kafka 中默认的日志保存时间为 7 天(log.retention.hours),默认检查周期为 5min(log.retention.check.interval.ms
  • 日志清理策略(log.cleanup.policy
    • delete:日志删除,默认
      • 基于时间:默认打开,以 Segment 中所有记录中的最大时间戳作为该文件的时间戳,超时删该 Segment
      • 基于大小:默认关闭,若超过设置的所有日志总大小(log.retention.bytes,默认 -1,表示无穷大),则删除最早的 Segment
    • compact:日志压缩,对于相同 key 的不同 value 值,只保留最后一个版本
      • 压缩后的 offset 可能是不连续的,当从 offset 消费消息时,可能会拿到比这个 offset 大的 offset 对应的消息,并从这个位置开始消费
      • 这种策略只适合特殊场景,如消息的 key 是用户 id,value 是用户资料,这样即使不连续也是最新的

四、高效读写数据

  • Kafka 本身是分布式集群,并且采用分区技术,对海量数据切割存储,提高了生产者和消费者的并行度
  • 读数据采用稀疏索引,可以快速定位到要消费的数据
  • 顺序写磁盘:以追加的形式写 Segment
  • 页缓存:Kafka 重度依赖底层操作系统提供的页缓存 PageCache 功能。当写操作时,操作系统只是将数据写入 PageCache(落盘由内核决定)。当读操作时,先从 PageCache 中查找,如果找不到,再从磁盘中加载到 PageCache
  • 零拷贝:Kafka 的数据加工处理操作交由生产者和消费者处理。Kafka Broker 应用层不关心存储的数据,所以就不用走应用层,传输效率高