Kafka 消费者详解

一、消费者工作原理

1. 消费方式

  • pull 模式:Kafka 采用该方式,Consumer 主动从 Broker 拉取数据。不足在于若没有数据,消费者会陷入循环中
  • push 模式:Broker 向 Consumer 推送数据。不足在于难以适应所有消费者的消费速率

2. 消费者组

  • 相同 groupid 的消费者组成一个消费者组
    • 为避免消费重复问题,每个分区的数据只能由消费者组中的一个消费者消费
    • 所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者
    • 消费者组之间完全独立:一个消息可以由多个消费者组消费,一个消费者组可以消费多个消息

3. 消费者组初始化流程

  • coordinator辅助实现消费者组的初始化和分区的分配,每个 Broker 节点都有
  • 0.9 版本后,每个消费者的 offset(消费到哪了)由消费者提交到系统主题 __consumer_offsets 中保存(之前存放在 ZK)
  • 系统主题 __consumer_offsets 默认有 50 个分区,消费者会选择 hash(groupid) % 50 对应分区的 Leader 副本所在的 Broker 节点中的 coordinator 进行管理,同时消费者的 offset 也会存储到该分区中

  • 首先,消费者组中的所有 consumer 向 coordinator 发送 JoinGroup 请求
  • coordinator 选出一个 consumer 作为 Leader,并把要消费的 topic 信息以及注册的 consumer 信息发送给该 consumer
  • 该 consumer 会制定消费方案(如哪个 consumer 消费哪个分区),并把消费方案发送给 coordinator
  • coordinator 把消费方案下发给各个 consumer,开始消费
  • 触发消费者组再平衡的情况(将其任务分给其他消费者):
    • 每个 consumer 都会和 coordinator 保持心跳(heartbeat.interval.ms,默认 3s),一旦超时(session.timeout.ms,默认 45s),该消费者会被移除,并触发再平衡
    • 若消费者处理消息的时间过长(max.poll.interval.ms,默认 5min),会触发再平衡

4. 消费者组消费流程

  • 首先,消费者创建一个网络连接客户端与 Broker 通信
  • 消费者调用 sendFetches() 方法初始化抓取数据工作
    • fetch.min.bytes:每批次最小抓取数据大小,默认 1Byte
    • fetch.max.wait.ms:若一批数据未达到最小值的超时时间,默认 500ms
    • fetch.max.bytes:每批次最大抓取数据大小,默认 50M
  • 调用 send() 方法拉取数据,通过回调方法把结果放入指定队列
  • 消费者从队列中抓取数据,默认一次拉取 <=500 条数据(max.poll.records
  • 数据经过反序列化和拦截器处理后,最终由消费者处理

5. 消费者重要参数

参数名称 描述
bootstrap.servers 向 Kafka 集群建立初始连接用到的 host/port 列表
key.deserializer / value.deserializer 指定接收消息的 key 和 value 的反序列化类型
group.id 消费者所属的消费者组
enable.auto.commit 消费者自动周期性地向服务器提交偏移量,默认 true
auto.commit.interval.ms 消费者偏移量向 Kafka 提交的频率,默认 5s
auto.offset.reset 当 Kafka 中没有初始偏移量(消费者组第一次消费)或当前偏移量在服务器中不存在(数据被删除)时如何处理,默认 latest,自动重置偏移量为最新的偏移量
可选 earliest:自动重置偏移量到最早的偏移量;none:如果消费者组原来的偏移量不存在,则向消费者抛异常;anything:向消费者抛异常
offsets.topic.num.partitions __consumer_offsets 的分区数,默认 50
heartbeat.interval.ms Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。该值需小于 session.timeout.ms 值的 1/3
session.timeout.ms Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。超时移除并触发再平衡
max.poll.interval.ms 消费者处理消息的最大时长,默认 5min。超时移除并触发再平衡
fetch.min.bytes 消费者获取服务器端一批消息最小的字节数。默认 1Byte
fetch.max.wait.ms 若一批数据未达到最小值的超时时间,默认 500ms
fetch.max.bytes 消费者每批次最大抓取数据大小,默认 50M。若批次数据大小大于该值仍可拉取,受 message.max.bytes 和 max.message.bytes 影响
max.poll.records 一次 poll 拉取数据返回消息的最大条数,默认 500 条
partition.assignment.strategy 消费者分区分配策略,默认 Range + CooperativeSticky,可选:Range、RoundRobin、Sticky、CooperativeSticky

二、消费者 API

1. 订阅主题

  • 在消费者 API 代码中必须配置消费者组 id。命令行启动消费者不填写消费者组 id 会被自动填写随机的消费者组 id
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop101:9092,hadoop102:9092");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 消费者组ID
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "consumer-test");

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
// 订阅主题
consumer.subscribe(Collections.singletonList("test"));
while (true) {
// 1s拉取一次
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
records.forEach(System.out::println);
}
// ConsumerRecord(topic = test, partition = 2, leaderEpoch = 2, offset = 14, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello)

2. 订阅分区

1
2
3
4
5
6
7
// 订阅分区
consumer.assign(Collections.singletonList(new TopicPartition("test", 0)));
while (true) {
// 1s拉取一次
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
records.forEach(System.out::println);
}

3. 消费者组订阅

  • 开启 3 个消费者,使用同一个 group id,订阅同一个主题
  • 生产者发送数据,可以看到每个消费者只处理一个分区的数据

三、分区分配与再平衡

  • Kafka 消费时有四种主流的分区分配策略(partition.assignment.strategy,默认 Range + CooperativeSticky):Range、RoundRobin、Sticky、CooperativeSticky。可以同时使用多个分区分配策略

1. Range 与再平衡

  • Range 是对单个 topic 而言的。首先对同一个 topic 里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。保持均匀的同时排在前面的消费者多消费
    • 如:一个 topic 有 7 个分区(0-6)和 3 个消费者(C0-C2),7 / 3 = 2 … 1,因此 C0 消费 0, 1, 2,C1 消费 3, 4,C2 消费 5, 6
    • 如:一个 topic 有 8 个分区(0-7)和 3 个消费者(C0-C2),8 / 3 = 2 … 2,因此 C0 消费 0, 1, 2,C1 消费 3, 4, 5,C2 消费 6, 7
  • 如果 topic 过多,排在前面的消费者消费压力会比后面的大,容易产生数据倾斜
  • 再平衡:
    • 如果停止掉 C0,C0 45s 内需要消费的数据会在 45s 后被整体分配到 C1 或 C2
    • 45s 后,会按 Range 策略再分配:C1 消费 0, 1, 2, 3,C2 消费 4, 5, 6

2. RoundRobin 与再平衡

  • RoundRobin 轮询分区策略针对集群中所有 topic 而言。会把所有 partition 和所有 consumer 都列出来,然后按照 hashcode 进行排序,最后通过轮询算法来分配 partition 给到各个消费者
    • 如:所有 topic 共有 7 个分区(0-6)和 3 个消费者(C0-C2),则 C0 消费 0, 3, 6,C1 消费 1, 4,C2 消费 2, 5
  • 再平衡:
    • 如果停止掉 C0,C0 45s 内需要消费的数据会在 45s 后被轮询分配给其他消费者,如 C1 消费 0, 1, 4, 6,C2 消费 2, 3, 5
    • 45s 后,会按 RoundRobin 策略再分配:C1 消费 0, 2, 4, 6,C2 消费 1, 3, 5
1
2
// 修改分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");

3. Sticky 与再平衡

  • 粘性分区:可以理解为分配的结果带有“粘性”。即在执行一次新的分配之前,会考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销
  • 粘性分区是 Kafka 从 0.11.x 版本开始引入的分配策略,首先会尽量均衡的分配分区到消费者上面,当同一消费者组内的消费者出现问题时,会尽量保持原有分配的分区不变化
    • 如:一个 topic 有 7 个分区(0-6)和 3 个消费者(C0-C2),可能的情况为 C0 消费 0, 1,C1 消费 2, 3, 5,C2 消费 4, 6
  • 再平衡:
    • 如果停止掉 C0,C0 45s 内需要消费的数据会在 45s 后被均匀随机分配
    • 45s 后,会按 Sticky 策略再分配,且尽量保持原有分配的分区不变:如 C1 消费 2, 3, 5,C2 消费 0, 1, 4, 6
1
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.StickyAssignor");

四、offset

1. offset 维护位置

  • Kafka 0.9 版本前,consumer 将 offset 保存在 ZK 中。0.9 版本后,保存在主题 __consumer_offsets
  • __consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 [<group.id>,<topic>,<partition>],value 是当前 offset 的值
  • 每隔一段时间,Kafka 内部会对这个 topic 进行 compact 压缩,保留最新数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
$ vim config/consumer.properties
exclude.internal.topics=false # 默认true,表示不能消费系统主题
$ xsync config/consumer.properties

# 创建主题test2
$ bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --create --topic test2 --partitions 2 --replication-factor 2
# 消费主题test2
$ bin/kafka-console-consumer.sh --bootstrap-server hadoop101:9092 --topic test2 --group group2
# 生产消息到test2
$ bin/kafka-console-producer.sh --bootstrap-server hadoop101:9092 --topic test2
>hello
>world
# 消费主题__consumer_offsets
$ bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server hadoop101:9092 --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
...
[group2,test2,1]::OffsetAndMetadata(offset=1, leaderEpoch=Optional[0], metadata=, expireTimestamp=None)
[group2,test2,0]::OffsetAndMetadata(offset=2, leaderEpoch=Optional[0], metadata=, expireTimestamp=None)

2. 提交 offset

  • 自动提交:专注业务逻辑
    • enable.auto.commit:是否开启自动提交 offset 功能,默认 true
    • auto.commit.interval.ms:consumer 自动提交 offset 的时间间隔,默认 5s
  • 手动提交:让开发人员把握提交的时机,有以下两种方式,提交的都是这批数据最高的偏移量
    • commitSync():同步提交,阻塞当前线程,必须等待提交完毕,再去消费下一批数据,并且会自动失败重试
    • commitAsync():异步提交,发送完提交 offset 请求后,就开始消费下一批数据,没有重试机制,可能提交失败
1
2
3
4
5
6
7
8
9
// 关闭自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
records.forEach(System.out::println);
consumer.commitSync(); // 同步提交
// consumer.commitAsync(); // 异步提交
}

3. 消费指定 offset

  • auto.offset.reset:当 Kafka 中没有初始偏移量(消费者组第一次消费)或当前偏移量在服务器中不存在(数据被删除)时如何处理
    • earliest:自动重置偏移量到最早的偏移量(–from-beginning)
    • latest:默认,自动重置偏移量为最新的偏移量
    • none:如果消费者组原来的偏移量不存在,则向消费者抛异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Collections.singletonList("test"));

Set<TopicPartition> assignment = new HashSet<>();
// 获取消费者分区分配信息(需等待分区分配方案制定完成)
while (assignment.size() == 0) {
consumer.poll(Duration.ofSeconds(1)); // 加速
assignment = consumer.assignment();
}
// 遍历所有分区,并指定offset从600的位置开始消费
for (TopicPartition tp: assignment) {
consumer.seek(tp, 600);
}
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
records.forEach(System.out::println);
}

// 每次执行完需要修改消费者组名

4. 指定时间消费

  • 把时间转换为对应的 offset
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Collections.singletonList("test"));

Set<TopicPartition> assignment= new HashSet<>();
// 获取消费者分区分配信息
while (assignment.size() == 0) {
consumer.poll(Duration.ofSeconds(1));
assignment = consumer.assignment();
}
// 封装集合存储,每个分区对应一天前的数据
HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();
for (TopicPartition tp : assignment) {
timestampToSearch.put(tp, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
}
// 获取从一天前开始消费的每个分区的offset
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestampToSearch);
// 遍历每个分区,对每个分区设置消费时间。
for (TopicPartition tp : assignment) {
OffsetAndTimestamp offsetAndTimestamp = offsets.get(tp);
// 根据时间指定开始消费的位置
if (offsetAndTimestamp != null){
consumer.seek(tp, offsetAndTimestamp.offset());
}
}
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
records.forEach(System.out::println);
}

五、生产经验

1. 消费者事务

  • 重复消费已经消费了数据,但是 offset 没提交
    • 如:开启自动提交,在上一次提交后,下一次提交前消费者宕机,则期间消费的数据会重复
  • 漏消费:先提交 offset 后消费,有可能会造成数据的漏消费
    • 如:设置手动提交,消费者消费数据后提交,但消费数据的结果未落盘时消费者宕机
  • 如果想完成消费者的精准一次消费,需要 Kafka 消费端将消费过程和提交 offset 过程做原子绑定,即下游消费者必须支持事务

2. 消费者提高吞吐量

  • 增加 topic 的分区数,同时增加消费者组的消费者数量
  • 提高每批次拉取的数量(max.poll.records,默认 500 条)及大小上限(fetch.max.bytes,默认 50M)