一、系统架构
1. 作业管理器(JobManager)
- JobManager 是 Flink 集群中任务管理和调度的核心,是控制应用执行的主进程。也就是说,每个应用都应该被唯一的 JobManager 所控制执行
- JobManger 包含3个不同的组件:
(1) JobMaster
- JobMaster 是 JobManager 中最核心的组件,负责处理单独的作业(Job)
- 多个 Job 可以同时运行在一个 Flink 集群中,每个 Job 对应一个 JobMaster
- 作业提交时,JobMaster 会接收到要执行的应用,然后把 JobGraph 转换成一个物理层面的数据流图,这个图被叫作“执行图”(Execution Graph),它包含了所有可以并发执行的任务
- JobMaster 会向资源管理器(ResourceManager)发出请求,申请执行任务必要的资源。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的 TaskManager 上
- 在运行过程中,JobMaster 会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调
(2) 资源管理器(ResourceManager)
- ResourceManager 主要负责资源的分配和管理,在 Flink 集群中只有一个
- 资源主要是指 TaskManager 的任务槽(task slots)。任务槽就是 Flink 集群中的资源调配单元,包含了机器用来执行计算的一组 CPU 和内存资源。每一个任务(Task)都需要分配到一个 slot 上执行
(3) 分发器(Dispatcher)
- Dispatcher 主要负责提供一个 REST 接口,用来提交应用,并且负责为每一个新提交的作业启动一个新的 JobMaster 组件
- Dispatcher 也会启动一个 Web UI,用来方便地展示和监控作业执行的信息
- Dispatcher 在架构中并不是必需的,在不同的部署模式下可能会被忽略掉
2. 任务管理器(TaskManager)
- TaskManager 是 Flink 中的工作进程,数据流的具体计算就是它来做的,Flink 集群中至少有一个
- 每一个 TaskManager 都包含了一定数量的任务槽(task slots)。slot 是资源调度的最小单位,slot 的数量限制了 TaskManager 能够并行处理的任务数量
- 启动之后,TaskManager 会向资源管理器注册它的 slots;收到资源管理器的指令后,TaskManager 就会将一个或者多个槽位提供给 JobMaster 调用,JobMaster 就可以分配任务来执行了
- 在执行过程中,TaskManager 可以缓冲数据,还可以跟其他运行同一应用的 TaskManager 交换数据
二、核心概念
1. 并行度(Parallelism)
- 当要处理的数据量非常大时,我们可以把一个算子操作“复制”多份到多个节点,数据来了之后就可以到其中任意一个执行。这样一个算子(operator)任务就被拆分成了多个并行的子任务(subtasks),再将它们分发到不同节点,就真正实现了并行计算
- 在 Flink 执行过程中,每一个算子可以包含一个或多个子任务,这些子任务在不同的线程、不同的物理机或不同的容器中完全独立地执行
- 一个特定算子的子任务的个数被称为并行度(parallelism)。包含并行子任务的数据流,就是并行数据流,它需要多个分区(stream partition)来分配并行任务
- 一个程序中,不同的算子可能具有不同的并行度。一般一个流程序的并行度,可以认为是其所有算子中最大的并行度
- 可以通过以下方法来设置并行度(优先级由高到低):
(1) 代码中设置
- 在算子后调用
setParallelism()
方法,来设置当前算子的并行度。该方法只针对当前算子的并行度
1 | stream.map(word -> Tuple2.of(word, 1L)).setParallelism(2); |
- 直接调用执行环境的
setParallelism()
方法,全局设定并行度。该方法针当前代码中所有算子的默认并行度
1 | env.setParallelism(2); |
由于 keyBy 不是算子,所以无法对 keyBy 设置并行度
(2) 提交应用时设置
- 在使用
flink run
命令提交应用时,可以增加-p
参数来指定当前应用程序执行的并行度
1 | $ bin/flink run –p 2 –c com.lb.flink.Main ./flink-test.jar |
- Web UI 上提交作业时也可以在对应输入框中直接设置并行度
(3) 配置文件中设置
- 可以直接在集群的配置文件
flink-conf.yaml
中更改默认并行度 - 该设置对于整个集群上提交的所有作业有效,默认值为 1
1 | parallelism.default: 2 |
- 在 IDEA 开发环境中,没有配置文件,默认并行度就是当前机器的 CPU 核心数
2. 算子链(Operator Chain)
- 一个数据流在算子之间传输数据的形式可以是一对一(one-to-one)的直通(forwarding)模式,也可以是打乱的重分区(redistributing)模式,具体是哪一种形式,取决于算子的种类
- 一对一(One-to-one,Forwarding):
- 该模式的数据流会维护分区以及元素的顺序。比如 source 算子读取数据之后,可以直接发送给 map 算子做处理,它们之间不需要重新分区,也不需要调整数据的顺序
- map、filter、flatMap 等算子都是这种对应关系,类似于 Spark 中的窄依赖
- 重分区(Redistributing):
- 该模式的数据流的分区会发生改变。比如 map 和 keyBy/window 算子之间,以及 keyBy/window 和 Sink 算子之间
- 每一个算子的子任务,会根据数据传输的策略,把数据发送到不同的下游目标任务。这些传输方式都会引起重分区的过程。类似于 Spark 中的 shuffle
- 合并算子链:
- 在 Flink 中,并行度相同的一对一(one-to-one)算子操作,可以直接链接在一起形成一个“大”的任务(task),每个 task 会被一个线程执行。这样的技术被称为“算子链”(Operator Chain)
- 算子链可以减少线程之间的切换和基于缓存区的数据交换,在减少时延的同时提升吞吐量
- Flink 默认会按照算子链的原则进行链接合并,如果想要禁止合并或者自行定义,也可以在代码中对算子做一些特定的设置
- 禁用算子链适用于拆分两个复杂的算子,使其在不同的线程中运行。也可用来定位出问题的算子
1 | // 全局禁用算子链 |
3. 任务槽(Task Slots)
- Flink 中每一个 TaskManager 都是一个 JVM 进程,它可以启动多个独立的线程,来并行执行多个子任务(subtask)
- TaskManager 的计算资源是有限的,并行的任务数就需要有一定的限制,因此需要对每个任务运行所占用的资源做出明确的划分
- 每个任务槽(task slot)其实表示了 TaskManager 拥有的计算资源的一个固定大小的子集。假如一个 TaskManager 有三个 slot,那么它会将管理的内存平均分成三份,每个 slot 独占一份
- slot 目前仅仅用来隔离内存,不会涉及 CPU 的隔离。在具体应用中,可以将 slot 数量配置为机器的 CPU 核心数,尽量避免不同任务之间对 CPU 的竞争
- 可以在 Flink 配置文件中设置每个 TaskManager 的 slot 数量
taskmanager.numberOfTaskSlots
,默认 1 个
(1) 任务槽的共享
- 默认情况下,Flink 是允许子任务共享 slot 的,且 slot 中的子任务是同时执行的
- 同一个作业下,对于不同算子的并行子任务,可以放到同一个 slot 上执行;对于相同算子的并行子任务,需要分配到不同的 slot 上
- 好处:
- 将资源密集型和非密集型的任务同时放到一个 slot 中,它们就可以自行分配对资源占用的比例,从而保证最重的任务平均分配给所有的 TaskManager
- slot 共享允许我们保存完整的作业管道。这样即使某个 TaskManager 出现故障宕机,其他节点也可以完全不受影响,作业的任务可以继续执行
- 只有属于同一个 slot 共享组的子任务,才会开启 slot 共享(默认都是 default 组)。不同组之间的任务是完全隔离的,必须分配到不同的 slot 上
- 如果希望某个算子对应的任务完全独占一个 slot,或者只有某一部分算子共享 slot,可以手动指定其 slot 共享组。此时总共需要的 slot 数量,就是各个 slot 共享组最大并行度的总和
1 | // 指定当前算子的共享组 |
(2) 任务槽和并行度的关系
- 任务槽是静态的概念,是指 TaskManager 具有的并发执行能力;而并行度是动态概念,也就是 TaskManager 运行程序时实际使用的并发能力
- 假设一共有 3 个 TaskManager,每一个 TaskManager 中的 slot 数量设置为 3 个,那么一共有 9 个 task slot,表示集群最多能执行 9 个同一算子的并行子任务
- 整个流处理程序的并行度就是所有算子并行度中最大的那个,这代表了运行程序需要的 slot 数量
- 只有 slot 数量能满足程序最大并行度时,程序才能执行
三、作业提交流程
1. Standalone 会话模式作业提交流程
2. 逻辑流图/作业流图/执行流图/物理流图
- 逻辑流图(StreamGraph)
- 根据用户通过 DataStream API 编写的代码生成的最初的 DAG 图,用来表示程序的拓扑结构。这一步一般在客户端完成
- 作业流图(JobGraph)
- StreamGraph 经过优化后生成的就是作业流图(JobGraph),这是提交给 JobManager 的数据结构,确定了当前作业中所有任务的划分
- 主要的优化为:将多个符合条件的节点链接在一起合并成一个任务节点,形成算子链,这样可以减少数据交换的消耗。JobGraph 一般也是在客户端生成的,在作业提交时传递给 JobMaster
- 执行流图(ExecutionGraph)
- JobMaster 收到 JobGraph 后,会根据它来生成执行流图(ExecutionGraph)。ExecutionGraph 是 JobGraph 的并行化版本,是调度层最核心的数据结构
- 与 JobGraph 最大的区别就是按照并行度对并行子任务进行了拆分,并明确了任务间数据传输的方式
- 物理流图(Physical Graph)
- JobMaster 生成执行流图后,会将它分发给 TaskManager;各个 TaskManager 会根据执行图部署任务,最终的物理执行过程也会形成一张图,一般就叫作物理流图(Physical Graph)
- 物理流图只是具体执行层面的图,并不是一个具体的数据结构
- 物理流图主要就是在执行图的基础上,进一步确定数据存放的位置和收发的具体方式。有了物理流图,TaskManager 就可以对传递来的数据进行处理计算了