一、概述
1. Kafka 定义
- Kafka 是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域
- Kafka 是一个开源的分布式事件流平台(Event Streaming Platform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用
使用案例:
- 网站在前端埋点记录用户操作行为,发送到日志服务器
- 可通过 Flume 时刻监控日志变化,发送到 Hadoop 等大数据平台进行数据分析
- Hadoop 正常读写速度受硬盘影响,约 100M/s。平常 Flume 采集速度小于 100M/s,但在特定场景,其速度会大于 200M/s。为避免造成阻塞,在中间引入 Kafka 集群作为缓冲
2. 消息队列
- 目前企业中比较常见的消息队列产品主要有 Kafka、ActiveMQ、RabbitMQ、RocketMQ 等
- 在大数据场景主要采用 Kafka 作为消息队列。在 JavaEE 开发中主要采用 ActiveMQ、RabbitMQ、RocketMQ 等
(1) 传统消息队列的应用场景
- 缓存/消峰:控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况(如双十一秒杀)
- 解耦:允许独立的扩展或修改队列两端的处理过程,只要确保它们遵守同样的接口约束(如厂家、超市和顾客)
- 异步通信:允许用户把一个消息放入队列,但并不立即处理它,而是在需要的时候再去处理(如登录后发送短信通知)
(2) 消息队列的两种模式
- 点对点模式:消费者主动拉取数据,消息收到后清除消息(一对一)
- 发布/订阅模式:可以有多个 topic 主题,消费者消费数据之后,不删除数据,且每个消费者相互独立,都可以消费到数据
- 消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接收感兴趣的消息
3. Kafka 基础架构
- Producer:消息生产者,向 Kafka Broker 发消息的客户端
- Consumer:消息消费者,向 Kafka Broker 取消息的客户端
- Consumer Group(CG):消费者组,由多个 consumer 组成
- Broker:一台 Kafka 服务器就是一个 Broker
- Topic:可以理解为一个队列
- Partition:为了实现扩展性,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列,可以分布在不同的 Broker 上
- Replica:副本。一个 topic 的每个 partition 都有若干个副本,一个 Leader 和若干个 Follower
- Leader:主副本,生产者、消费者只针对 Leader 副本操作
- Follower:从副本,实时从 Leader 同步数据,当 Leader 发生故障时,某个 Follower 会成为新的 Leader
二、 集群部署
1. 集群规划
hadoop100 | hadoop101 | hadoop102 |
---|---|---|
ZK、Kafka | ZK、Kafka | ZK、Kafka |
2. ZooKeeper 集群部署
- 下载地址:Apache ZooKeeper
1 | ## 准备资源 |
- 集群脚本:
/usr/local/bin/zk.sh
1 |
|
3. Kafka 集群部署
- 下载地址:Apache Kafka
1 | ## 准备资源 |
- 集群脚本:
/usr/local/bin/kf.sh
1 |
|
4. 启动
1 | # 启动ZooKeeper集群 |
先停止 Kafka,等待其所有节点进程停止后,再停止 ZooKeeper:Kafka 将一些信息存储到了 ZooKeeper,停止前需要先和 ZooKeeper 通信
三、Kafka 命令行操作
1. 主题命令行操作
1 | # 查看操作主题命令参数 |
- 核心参数
参数 | 描述 |
---|---|
--bootstrap-server <String: server to connect to> | 连接的 Kafka Broker 的主机名称和端口号 |
--topic <String: topic> | 操作的 topic 名称 |
--create | 创建主题 |
--delete | 删除主题 |
--alter | 修改主题 |
--list | 查看所有主题 |
--describe | 查看主题详细描述 |
--partitions <Integer: # of partitions> | 设置分区数 |
--replication-factor <Integer: replication factor> | 设置分区副本 |
--config <String: name=value> | 更新系统默认的配置 |
2. 生产者命令行操作
1 | # 查看操作生产者命令参数 |
- 核心参数
参数 | 描述 |
---|---|
--bootstrap-server <String: server to connect to> | 连接的 Kafka Broker 的主机名称和端口号 |
--topic <String: topic> | 操作的 topic 名称 |
3. 消费者命令行操作
1 | # 查看操作消费者命令参数 |
- 核心参数
参数 | 描述 |
---|---|
--bootstrap-server <String: server to connect to> | 连接的 Kafka Broker 的主机名称和端口号 |
--topic <String: topic> | 操作的 topic 名称 |
--from-beginning | 从头开始消费 |
--group <String: consumer group id> | 指定消费者组名称 |
四、其他
1. Kafka 配置文件
server.properties
1 | # broker的全局唯一编号,不能重复,只能是数字 |