一、消费者工作原理
1. 消费方式
- pull 模式:Kafka 采用该方式,Consumer 主动从 Broker 拉取数据。不足在于若没有数据,消费者会陷入循环中
- push 模式:Broker 向 Consumer 推送数据。不足在于难以适应所有消费者的消费速率
2. 消费者组
- 相同
groupid
的消费者组成一个消费者组- 为避免消费重复问题,每个分区的数据只能由消费者组中的一个消费者消费
- 所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者
- 消费者组之间完全独立:一个消息可以由多个消费者组消费,一个消费者组可以消费多个消息
3. 消费者组初始化流程
- coordinator:辅助实现消费者组的初始化和分区的分配,每个 Broker 节点都有
- 0.9 版本后,每个消费者的
offset
(消费到哪了)由消费者提交到系统主题__consumer_offsets
中保存(之前存放在 ZK) - 系统主题
__consumer_offsets
默认有 50 个分区,消费者会选择hash(groupid) % 50
对应分区的 Leader 副本所在的 Broker 节点中的 coordinator 进行管理,同时消费者的 offset 也会存储到该分区中
- 首先,消费者组中的所有 consumer 向 coordinator 发送 JoinGroup 请求
- coordinator 选出一个 consumer 作为 Leader,并把要消费的 topic 信息以及注册的 consumer 信息发送给该 consumer
- 该 consumer 会制定消费方案(如哪个 consumer 消费哪个分区),并把消费方案发送给 coordinator
- coordinator 把消费方案下发给各个 consumer,开始消费
- 触发消费者组再平衡的情况(将其任务分给其他消费者):
- 每个 consumer 都会和 coordinator 保持心跳(
heartbeat.interval.ms
,默认 3s),一旦超时(session.timeout.ms
,默认 45s),该消费者会被移除,并触发再平衡 - 若消费者处理消息的时间过长(
max.poll.interval.ms
,默认 5min),会触发再平衡
- 每个 consumer 都会和 coordinator 保持心跳(
4. 消费者组消费流程
- 首先,消费者创建一个网络连接客户端与 Broker 通信
- 消费者调用 sendFetches() 方法初始化抓取数据工作
fetch.min.bytes
:每批次最小抓取数据大小,默认 1Bytefetch.max.wait.ms
:若一批数据未达到最小值的超时时间,默认 500msfetch.max.bytes
:每批次最大抓取数据大小,默认 50M
- 调用 send() 方法拉取数据,通过回调方法把结果放入指定队列
- 消费者从队列中抓取数据,默认一次拉取 <=500 条数据(
max.poll.records
) - 数据经过反序列化和拦截器处理后,最终由消费者处理
5. 消费者重要参数
参数名称 | 描述 |
---|---|
bootstrap.servers | 向 Kafka 集群建立初始连接用到的 host/port 列表 |
key.deserializer / value.deserializer | 指定接收消息的 key 和 value 的反序列化类型 |
group.id | 消费者所属的消费者组 |
enable.auto.commit | 消费者自动周期性地向服务器提交偏移量,默认 true |
auto.commit.interval.ms | 消费者偏移量向 Kafka 提交的频率,默认 5s |
auto.offset.reset | 当 Kafka 中没有初始偏移量(消费者组第一次消费)或当前偏移量在服务器中不存在(数据被删除)时如何处理,默认 latest,自动重置偏移量为最新的偏移量 可选 earliest:自动重置偏移量到最早的偏移量;none:如果消费者组原来的偏移量不存在,则向消费者抛异常;anything:向消费者抛异常 |
offsets.topic.num.partitions | __consumer_offsets 的分区数,默认 50 |
heartbeat.interval.ms | Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。该值需小于 session.timeout.ms 值的 1/3 |
session.timeout.ms | Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。超时移除并触发再平衡 |
max.poll.interval.ms | 消费者处理消息的最大时长,默认 5min。超时移除并触发再平衡 |
fetch.min.bytes | 消费者获取服务器端一批消息最小的字节数。默认 1Byte |
fetch.max.wait.ms | 若一批数据未达到最小值的超时时间,默认 500ms |
fetch.max.bytes | 消费者每批次最大抓取数据大小,默认 50M。若批次数据大小大于该值仍可拉取,受 message.max.bytes 和 max.message.bytes 影响 |
max.poll.records | 一次 poll 拉取数据返回消息的最大条数,默认 500 条 |
partition.assignment.strategy | 消费者分区分配策略,默认 Range + CooperativeSticky,可选:Range、RoundRobin、Sticky、CooperativeSticky |
二、消费者 API
1. 订阅主题
- 在消费者 API 代码中必须配置消费者组 id。命令行启动消费者不填写消费者组 id 会被自动填写随机的消费者组 id
1 | Properties properties = new Properties(); |
2. 订阅分区
1 | // 订阅分区 |
3. 消费者组订阅
- 开启 3 个消费者,使用同一个 group id,订阅同一个主题
- 生产者发送数据,可以看到每个消费者只处理一个分区的数据
三、分区分配与再平衡
- Kafka 消费时有四种主流的分区分配策略(
partition.assignment.strategy
,默认 Range + CooperativeSticky):Range、RoundRobin、Sticky、CooperativeSticky。可以同时使用多个分区分配策略
1. Range 与再平衡
- Range 是对单个 topic 而言的。首先对同一个 topic 里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。保持均匀的同时排在前面的消费者多消费
- 如:一个 topic 有 7 个分区(0-6)和 3 个消费者(C0-C2),7 / 3 = 2 … 1,因此 C0 消费 0, 1, 2,C1 消费 3, 4,C2 消费 5, 6
- 如:一个 topic 有 8 个分区(0-7)和 3 个消费者(C0-C2),8 / 3 = 2 … 2,因此 C0 消费 0, 1, 2,C1 消费 3, 4, 5,C2 消费 6, 7
- 如果 topic 过多,排在前面的消费者消费压力会比后面的大,容易产生数据倾斜
- 再平衡:
- 如果停止掉 C0,C0 45s 内需要消费的数据会在 45s 后被整体分配到 C1 或 C2
- 45s 后,会按 Range 策略再分配:C1 消费 0, 1, 2, 3,C2 消费 4, 5, 6
2. RoundRobin 与再平衡
- RoundRobin 轮询分区策略针对集群中所有 topic 而言。会把所有 partition 和所有 consumer 都列出来,然后按照 hashcode 进行排序,最后通过轮询算法来分配 partition 给到各个消费者
- 如:所有 topic 共有 7 个分区(0-6)和 3 个消费者(C0-C2),则 C0 消费 0, 3, 6,C1 消费 1, 4,C2 消费 2, 5
- 再平衡:
- 如果停止掉 C0,C0 45s 内需要消费的数据会在 45s 后被轮询分配给其他消费者,如 C1 消费 0, 1, 4, 6,C2 消费 2, 3, 5
- 45s 后,会按 RoundRobin 策略再分配:C1 消费 0, 2, 4, 6,C2 消费 1, 3, 5
1 | // 修改分区分配策略 |
3. Sticky 与再平衡
- 粘性分区:可以理解为分配的结果带有“粘性”。即在执行一次新的分配之前,会考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销
- 粘性分区是 Kafka 从 0.11.x 版本开始引入的分配策略,首先会尽量均衡的分配分区到消费者上面,当同一消费者组内的消费者出现问题时,会尽量保持原有分配的分区不变化
- 如:一个 topic 有 7 个分区(0-6)和 3 个消费者(C0-C2),可能的情况为 C0 消费 0, 1,C1 消费 2, 3, 5,C2 消费 4, 6
- 再平衡:
- 如果停止掉 C0,C0 45s 内需要消费的数据会在 45s 后被均匀随机分配
- 45s 后,会按 Sticky 策略再分配,且尽量保持原有分配的分区不变:如 C1 消费 2, 3, 5,C2 消费 0, 1, 4, 6
1 | properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.StickyAssignor"); |
四、offset
1. offset 维护位置
- Kafka 0.9 版本前,consumer 将 offset 保存在 ZK 中。0.9 版本后,保存在主题
__consumer_offsets
中 - __consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是
[<group.id>,<topic>,<partition>]
,value 是当前 offset 的值 - 每隔一段时间,Kafka 内部会对这个 topic 进行 compact 压缩,保留最新数据
1 | $ vim config/consumer.properties |
2. 提交 offset
- 自动提交:专注业务逻辑
enable.auto.commit
:是否开启自动提交 offset 功能,默认 trueauto.commit.interval.ms
:consumer 自动提交 offset 的时间间隔,默认 5s
- 手动提交:让开发人员把握提交的时机,有以下两种方式,提交的都是这批数据最高的偏移量
commitSync()
:同步提交,阻塞当前线程,必须等待提交完毕,再去消费下一批数据,并且会自动失败重试commitAsync()
:异步提交,发送完提交 offset 请求后,就开始消费下一批数据,没有重试机制,可能提交失败
1 | // 关闭自动提交 |
3. 消费指定 offset
auto.offset.reset
:当 Kafka 中没有初始偏移量(消费者组第一次消费)或当前偏移量在服务器中不存在(数据被删除)时如何处理- earliest:自动重置偏移量到最早的偏移量(–from-beginning)
- latest:默认,自动重置偏移量为最新的偏移量
- none:如果消费者组原来的偏移量不存在,则向消费者抛异常
1 | KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); |
4. 指定时间消费
- 把时间转换为对应的 offset
1 | KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); |
五、生产经验
1. 消费者事务
- 重复消费:已经消费了数据,但是 offset 没提交
- 如:开启自动提交,在上一次提交后,下一次提交前消费者宕机,则期间消费的数据会重复
- 漏消费:先提交 offset 后消费,有可能会造成数据的漏消费
- 如:设置手动提交,消费者消费数据后提交,但消费数据的结果未落盘时消费者宕机
- 如果想完成消费者的精准一次消费,需要 Kafka 消费端将消费过程和提交 offset 过程做原子绑定,即下游消费者必须支持事务
2. 消费者提高吞吐量
- 增加 topic 的分区数,同时增加消费者组的消费者数量
- 提高每批次拉取的数量(
max.poll.records
,默认 500 条)及大小上限(fetch.max.bytes
,默认 50M)