- 源算子和输出算子在大多数情况下不需要自己实现,Flink 官方提供了一部分框架的连接器
- Apache Kafka (source/sink)
- Apache Cassandra (source/sink)
- Amazon DynamoDB (sink)
- Amazon Kinesis Data Streams (source/sink)
- Amazon Kinesis Data Firehose (sink)
- DataGen (source)
- Elasticsearch (sink)
- Opensearch (sink)
- FileSystem (sink)
- RabbitMQ (source/sink)
- Google PubSub (source/sink)
- Hybrid Source (source)
- Apache Pulsar (source)
- JDBC (sink)
- MongoDB (source/sink)
- 除 Flink 官方之外,Apache Bahir 框架也实现了一些其他第三方系统与 Flink 的连接器
- Apache ActiveMQ (source/sink)
- Apache Flume (sink)
- Redis (sink)
- Akka (sink)
- Netty (source)
一、执行环境(Execution Environment)
1. 创建执行环境
执行环境就是 StreamExecutionEnvironment 类的对象
getExecutionEnvironment
:它会根据当前运行的上下文得到正确的结果,是最简单的方式。如果程序是独立运行的,就返回一个本地执行环境;如果是创建了 jar 包,然后从命令行调用它并提交到集群执行,那么就返回集群的执行环境createLocalEnvironment
:返回一个本地执行环境,可以在调用时传入一个参数,指定默认的并行度;如果不传入,则默认并行度就是本地的 CPU 核心数createRemoteEnvironment
:返回集群执行环境,需要在调用时指定 JobManager 的主机名和端口号,并指定要在集群中运行的 Jar 包路径
在获取到程序执行环境后,还可以对执行环境进行灵活的设置。比如可以全局设置程序的并行度、禁用算子链,还可以定义程序的时间语义、配置容错机制等
2. 执行模式
- 从 Flink 1.12 开始,官方推荐的做法是直接使用 DataStream API,在提交任务时通过将执行模式设为 BATCH 来进行批处理。不建议使用 DataSet API
- 流执行模式(Streaming):默认,一般用于需要持续实时处理的无界数据流
- 批执行模式(Batch):专门用于批处理的执行模式
- 自动模式(AutoMatic):由程序根据输入数据源是否有界,来自动选择执行模式
1 | # 提交作业时配置(常用) |
3. 触发程序执行
- 输出(Sink)操作并不代表程序已经结束。因为当 main() 方法被调用时,其实只是定义了作业的每个执行操作,然后添加到数据流图中,这时并没有真正处理数据,因为数据可能还没来
- Flink 是由事件驱动的,只有等到数据到来,才会触发真正的计算,这也被称为“延迟执行”或“懒执行”
- 需要显式地调用执行环境的
execute()
方法,来触发程序执行。该方法将一直等待作业完成,然后返回一个执行结果(JobExecutionResult)
二、源算子(Source)
- Flink 可以从各种来源获取数据,然后构建 DataStream 进行转换处理。一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)
- Flink 1.12 以前,旧的添加 source 的方式,是调用执行环境的
addSource()
方法。Flink 1.12 开始,主要使用流批统一的新 Source 架构fromSource()
1. 从集合中读取数据
- 将数据临时存储到内存中,形成特殊的数据结构后,作为数据源使用,一般用于测试
1 | DataStreamSource<Integer> source = env.fromCollection(Arrays.asList(1, 2, 3)); |
2. 从文件读取数据
- 读取文件,首先需要添加文件连接器依赖
1 | <dependency> |
- 参数可以是目录,也可以是文件,还可以从 HDFS 目录下读取,使用路径
hdfs://...
- 相对路径是从系统属性
user.dir
获取路径,IDEA 下是 project 的根目录,standalone 模式下是集群节点根目录
1 | FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("input/word.txt")).build(); |
3. 从 Socket 读取数据
- 不论从集合还是文件,读取的其实都是有界数据。在流处理的场景中,数据往往是无界的
- 这种方式由于吞吐量小、稳定性较差,一般用于测试
1 | env.socketTextStream("localhost", 7777); |
4. 从 Kafka 读取数据
- Flink 官方提供了连接工具
flink-connector-kafka
,直接帮我们实现了一个消费者 FlinkKafkaConsumer,它就是用来读取 Kafka 数据的 SourceFunction - Flink 官方提供的是一个通用的 Kafka 连接器,它会自动跟踪最新版本的 Kafka 客户端。目前最新版本只支持 0.10.0 版本以上的 Kafka
- 引入 Kafka 连接器的依赖
1 | <dependency> |
- 消费数据:
1 | KafkaSource<String> kafkaSource = KafkaSource.<String>builder() |
5. 从数据生成器读取数据
- Flink 从 1.11 开始提供了一个内置的 DataGen 连接器,主要是用于生成一些随机数,用于在没有数据源的时候,进行流任务的测试以及性能测试等
- Flink 1.17 提供了新的 Source 写法,需要导入依赖
1 | <dependency> |
- 生成数据:
1 | DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>( |
三、输出算子(Sink)
- Flink 程序中所有对外的输出操作,一般都是利用 Sink 算子完成的,主要就是用来实现与外部系统连接、并将数据提交写入
- Flink 1.12 以前,Sink 算子的创建是通过调用
DataStream.addSink()
方法实现的。Flink 1.12 开始,同样重构了 Sink 架构,改为使用sinkTo()
DataStream.print()
就是一个输出算子,它会将结果输出到控制台
1. 输出到文件
- Flink 提供了一个流式文件系统的连接器
FileSink
,它可以写入 Flink 支持的文件系统 - FileSink 支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder),可以直接调用 FileSink 的静态方法:
- 行编码:FileSink.forRowFormat(basePath, rowEncoder)
- 批量编码:FileSink.forBulkFormat(basePath, bulkWriterFactory)
1 | // 每个目录中,都有`并行度个数`的文件在写入 |
2. 输出到 Kafka
- 输出无 key 的 record
1 | // 如果是精准一次,必须开启checkpoint |
- 实现带 key 的 record(自定义序列化器)
1 | env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); |
3. 输出到 MySQL
- 添加 MySQL 连接器依赖
1 | <dependency> |
- maven 配置中如果有
<mirrorOf>*</mirrorOf>
,表示所有包都要从该仓库中找,此时在 pom 文件中指定的repositories
会无效。可将配置改为<mirrorOf>*,!repository_id</mirrorOf>
,排除 pom 中的仓库
- 输出到 MySQL
1 | SinkFunction<WaterSensor> jdbcSink = JdbcSink.sink( |
4. 自定义 Sink 输出
- 与 Source 类似,Flink 提供了通用的 SinkFunction 接口和对应的 RichSinkFunction 抽象类,只要实现它,通过简单地调用 DataStream.addSink() 方法就可以自定义写入任何外部存储
- 自定义 Sink 想要实现状态一致性并不容易,所以并不常用
1 | public static class MySink extends RichSinkFunction<String> { |