一、概述
1. 简介
- Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。核心目标是数据流上的有状态计算(Stateful Computations over Data Streams)
- 有状态的流处理:把流处理需要的额外数据保存成一个状态,然后针对这条数据进行处理,并且更新状态
- 状态在内存中:优点速度快;缺点可靠性差
- 状态在分布式系统中:优点可靠性高;缺点速度慢
- 官网:https://flink.apache.org/
2. Flink 的发展历史
- 在德语中,Flink 一词表示快速、灵巧。项目的 logo 是一只彩色的松鼠
- Flink 起源于 Stratosphere 项目,它是由 3 所地处柏林的大学和欧洲其他一些大学在 2010~2014 年共同进行的研究项目,由柏林理工大学的教授 Volker Markl 领衔开发。2014 年 4 月,Stratosphere 的代码捐赠给了 Apache 软件基金会,Flink 就是在此基础上被重新设计出来的
- 2014 年 8 月,Flink 第一个版本 0.6 正式发布,与此同时 Fink 的几位核心开发者创办 Data Artisans 公司
- 2014 年 12 月,Flink 项目完成孵化
- 2015 年 4 月,Flink 发布了里程碑式的重要版本 0.9.0
- 2019 年 1 月,长期对 Flink 投入研发的阿里巴巴,以 9000 万欧元的价格收购了 Data Artisans 公司
- 2019 年 8 月,阿里巴巴将内部版本 Blink 开源,合并入 Flink 1.9.0 版本
3. Flink 的特点
- 高吞吐和低延迟:每秒处理数百万个事件,毫秒级延迟
- 结果的准确性:Flink 提供了事件时间(event-time)和处理时间(processing-time)语义。对于乱序事件流,事件时间语义仍然能提供一致且准确的结果
- 精确一次(exactly-once)的状态一致性保证
- 可以连接到最常用的外部系统,如 Kafka、Hive、JDBC、HDFS、Redis 等
- 高可用:本身高可用的设置,加上与 K8s、YARN 和 Mesos 的紧密集成,再加上从故障中快速恢复和动态扩展任务的能力,Flink 能做到以极少的停机时间 7×24 全天候运行
4. Flink 的应用场景
- 电商和市场营销:实时数据报表、广告投放、实时推荐
- 物联网(IOT):传感器实时数据采集和显示、实时报警,交通运输业
- 物流配送和服务业:订单状态实时更新、通知信息推送
- 银行和金融业:实时结算和通知推送,实时检测异常行为
5. Flink 和 Spark Streaming
- Spark 以批处理为根本
- 数据模型:Spark 采用 RDD 模型,Spark Streaming 的 DStream 实际上就是一组组小批数据 RDD 的集合
- 运行时架构:Spark 是批计算,将 DAG 划分为不同的 stage,一个完成后才可以计算下一个
- Flink 以流处理为根本
- 数据模型:Flink 基本数据模型是数据流,以及事件(Event)序列
- 运行时架构:Flink 是标准的流执行模式,一个事件在一个节点处理完后可以直接发往下一个节点进行处理
Flink | Spark Streaming | |
---|---|---|
计算模型 | 流计算 | 微批处理 |
时间语义 | 事件时间、处理时间 | 处理时间 |
窗口 | 多、灵活 | 少、不灵活(窗口必须是批次的整数倍) |
状态 | 有 | 没有 |
流式SQL | 有 | 没有 |
6. Flink 的分层 API
- 底层 API(处理函数):对最原始数据加工处理。底层 API 与 DataStream API 相集成,可以处理复杂的计算
- 核心 API:DataStream API(流处理)和 DataSet API(批处理)封装了底层处理函数,提供了通用的模块,比如转换(transformations,包括 map、flatmap 等),连接(joins),聚合(aggregations),窗口(windows)操作等。Flink 1.12 以后,DataStream API 已经实现真正的流批一体,DataSet API 已经过时
- Table API:以表为中心的声明式编程,其中表可能会动态变化。Table API 遵循关系模型:表有二维数据结构,类似于关系数据库中的表;同时 API 提供可比较的操作,例如 select、project、join、group-by、aggregate 等。可以在表与 DataStream/DataSet 之间无缝切换,以允许程序将 Table API 与 DataStream 以及 DataSet 混合使用
- SQL API:这一层在语法与表达能力上与 Table API 类似,但是是以 SQL 查询表达式的形式表现程序。SQL 抽象与 Table API 交互密切,同时 SQL 查询可以直接在 Table API 定义的表上执行
二、快速上手
1. 创建项目
- 创建一个 Maven 工程
- 添加项目依赖(IDEA 启动时,配置包含 Provided 的依赖)
1 | <properties> |
- 数据准备
1 | $ cat input/word.txt |
2. 批处理 WordCount
- 批处理对数据的处理转换,是看作数据集来进行操作的
- Flink 本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的 API。所以从 Flink 1.12 开始,官方推荐直接使用 DataStream API,可以在提交任务时通过将执行模式设为 BATCH 来进行批处理(
-Dexecution.runtime-mode=BATCH
)
1 | public class BatchWordCount { |
3. 流处理 WordCount
1 | public class StreamWordCount { |
4. 读取 Socket 文本流
1 | // nc -lk 7777 |
- 由于 Java 中泛型擦除的存在,在某些特殊情况下(比如 Lambda 表达式中),自动提取的信息是不够精细的。这时就需要显式地提供类型信息,才能使应用程序正常工作
- Flink 具有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器
5. 使用本地 Web UI
- 在开发环境,可以使用
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration())
创建一个带本地 Web UI 的项目,用于本地测试
1 | <dependency> |
6. Flink 支持的数据类型
- Flink 使用“类型信息”(TypeInformation)来统一表示数据类型。TypeInformation 类是 Flink 中所有类型描述符的基类。它涵盖了类型的一些基本属性,并为每个数据类型生成特定的序列化器、反序列化器和比较器
- 对于常见的 Java 和 Scala 数据类型,Flink 都是支持的。Flink 在内部对不同的类型进行了划分,这些类型可以在 Types 工具类中找到
- 基本类型:所有 Java 基本类型及其包装类,再加上 Void、String、Date、BigDecimal 和 BigInteger
- 数组类型:包括基本类型数组(PRIMITIVE_ARRAY)和对象数组(OBJECT_ARRAY)
- 复合数据类型:
- Java 元组类型(TUPLE):这是 Flink 内置的元组类型,是 Java API 的一部分。最多 25 个字段,也就是从 Tuple0~Tuple25,不支持空字段
- Scala 样例类及 Scala 元组:不支持空字段
- 行类型(ROW):可以认为是具有任意个字段的元组,并支持空字段
- POJO:Flink 自定义的类,似于 Java Bean 模式的类
- 辅助类型:Option、Either、List、Map 等
- 泛型类型(GENERIC):Flink 支持所有的 Java 类和 Scala 类。不过如果没有按照上面 POJO 类型的要求来定义,就会被 Flink 当作泛型类来处理。Flink 会把泛型类型当作黑盒,无法获取它们内部的属性;它们也不是由 Flink 本身序列化的,而是由 Kryo 序列化的
- 在这些类型中,元组类型和 POJO 类型最为灵活,因为它们支持创建复杂类型。相比之下,POJO 还支持在键(key)的定义中直接使用字段名,这会让代码的可读性大大增加。在项目实践中,往往会将流处理程序中的元素类型定为 Flink 的 POJO 类型
- Flink 对 POJO 类型的要求如下:
- 类是公有(public)的
- 有一个无参的构造方法
- 所有属性都是可访问的
- 所有属性的类型都是可序列化的
- 类型提示(Type Hints)
- Flink 具有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。但是,由于 Java 中泛型擦除的存在,在某些特殊情况下(比如 Lambda 表达式中),自动提取的信息是不够精细的。这时就需要显式地提供类型信息,才能使应用程序正常工作或提高其性能
- 为了解决这类问题,Java API 提供了专门的“类型提示”(type hints),它可以捕获泛型的类型信息,并且一直记录下来,为运行时提供足够的信息
1 | // 对于map里传入的Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple2<String, Long> |