Flink 源算子与输出算子

一、执行环境(Execution Environment)

1. 创建执行环境

  • 执行环境就是 StreamExecutionEnvironment 类的对象

    • getExecutionEnvironment:它会根据当前运行的上下文得到正确的结果,是最简单的方式。如果程序是独立运行的,就返回一个本地执行环境;如果是创建了 jar 包,然后从命令行调用它并提交到集群执行,那么就返回集群的执行环境
    • createLocalEnvironment:返回一个本地执行环境,可以在调用时传入一个参数,指定默认的并行度;如果不传入,则默认并行度就是本地的 CPU 核心数
    • createRemoteEnvironment:返回集群执行环境,需要在调用时指定 JobManager 的主机名和端口号,并指定要在集群中运行的 Jar 包路径
  • 在获取到程序执行环境后,还可以对执行环境进行灵活的设置。比如可以全局设置程序的并行度、禁用算子链,还可以定义程序的时间语义、配置容错机制等

2. 执行模式

  • 从 Flink 1.12 开始,官方推荐的做法是直接使用 DataStream API,在提交任务时通过将执行模式设为 BATCH 来进行批处理。不建议使用 DataSet API
    • 流执行模式(Streaming):默认,一般用于需要持续实时处理的无界数据流
    • 批执行模式(Batch):专门用于批处理的执行模式
    • 自动模式(AutoMatic):由程序根据输入数据源是否有界,来自动选择执行模式
1
2
3
4
5
6
# 提交作业时配置(常用)
bin/flink run -Dexecution.runtime-mode=BATCH ......

# 代码中配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

3. 触发程序执行

  • 输出(Sink)操作并不代表程序已经结束。因为当 main() 方法被调用时,其实只是定义了作业的每个执行操作,然后添加到数据流图中,这时并没有真正处理数据,因为数据可能还没来
  • Flink 是由事件驱动的,只有等到数据到来,才会触发真正的计算,这也被称为“延迟执行”或“懒执行”
  • 需要显式地调用执行环境的 execute() 方法,来触发程序执行。该方法将一直等待作业完成,然后返回一个执行结果(JobExecutionResult)

二、源算子(Source)

  • Flink 可以从各种来源获取数据,然后构建 DataStream 进行转换处理。一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)
  • Flink 1.12 以前,旧的添加 source 的方式,是调用执行环境的 addSource() 方法。Flink 1.12 开始,主要使用流批统一的新 Source 架构 fromSource()

1. 从集合中读取数据

  • 将数据临时存储到内存中,形成特殊的数据结构后,作为数据源使用,一般用于测试
1
2
DataStreamSource<Integer> source = env.fromCollection(Arrays.asList(1, 2, 3));
DataStreamSource<Integer> source = env.fromElements(1, 2, 3);

2. 从文件读取数据

  • 读取文件,首先需要添加文件连接器依赖
1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
  • 参数可以是目录,也可以是文件,还可以从 HDFS 目录下读取,使用路径 hdfs://...
  • 相对路径是从系统属性 user.dir 获取路径,IDEA 下是 project 的根目录,standalone 模式下是集群节点根目录
1
2
FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("input/word.txt")).build();
env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource");

3. 从 Socket 读取数据

  • 不论从集合还是文件,读取的其实都是有界数据。在流处理的场景中,数据往往是无界的
  • 这种方式由于吞吐量小、稳定性较差,一般用于测试
1
env.socketTextStream("localhost", 7777);

4. 从 Kafka 读取数据

  • Flink 官方提供了连接工具 flink-connector-kafka,直接帮我们实现了一个消费者 FlinkKafkaConsumer,它就是用来读取 Kafka 数据的 SourceFunction
  • Flink 官方提供的是一个通用的 Kafka 连接器,它会自动跟踪最新版本的 Kafka 客户端。目前最新版本只支持 0.10.0 版本以上的 Kafka
  • 引入 Kafka 连接器的依赖
1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
  • 消费数据:
1
2
3
4
5
6
7
8
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("debian201:9092,debian202:9092,debian203:9092")
.setTopics("mytopic")
.setGroupId("mygroup")
.setValueOnlyDeserializer(new SimpleStringSchema())
.setStartingOffsets(OffsetsInitializer.latest())
.build();
DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkaSource");

5. 从数据生成器读取数据

  • Flink 从 1.11 开始提供了一个内置的 DataGen 连接器,主要是用于生成一些随机数,用于在没有数据源的时候,进行流任务的测试以及性能测试等
  • Flink 1.17 提供了新的 Source 写法,需要导入依赖
1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-datagen</artifactId>
<version>${flink.version}</version>
</dependency>
  • 生成数据:
1
2
3
4
5
6
7
8
9
10
11
12
DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(
new GeneratorFunction<Long, String>() { // 输入类型固定为Long
@Override
public String map(Long value) throws Exception {
return "Number:" + value;
}
},
Long.MAX_VALUE, // count,如果有n个并行度,会将count分成n份
RateLimiterStrategy.perSecond(10), // 每秒生成10条数据(每个并行度)
Types.STRING
);
env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "dataGeneratorSource")

三、输出算子(Sink)

  • Flink 程序中所有对外的输出操作,一般都是利用 Sink 算子完成的,主要就是用来实现与外部系统连接、并将数据提交写入
  • Flink 1.12 以前,Sink 算子的创建是通过调用 DataStream.addSink() 方法实现的。Flink 1.12 开始,同样重构了 Sink 架构,改为使用 sinkTo()
  • DataStream.print() 就是一个输出算子,它会将结果输出到控制台

1. 输出到文件

  • Flink 提供了一个流式文件系统的连接器 FileSink,它可以写入 Flink 支持的文件系统
  • FileSink 支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder),可以直接调用 FileSink 的静态方法:
    • 行编码:FileSink.forRowFormat(basePath, rowEncoder)
    • 批量编码:FileSink.forBulkFormat(basePath, bulkWriterFactory)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 每个目录中,都有`并行度个数`的文件在写入
env.setParallelism(2);
// 开启checkpoint,否则文件一直都是`.inprogress`状态
env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);

FileSink<String> fieSink = FileSink
// 输出行式存储的文件,指定路径、指定编码
.<String>forRowFormat(new Path("f:/tmp"), new SimpleStringEncoder<>())
// 配置文件名的前缀、后缀
.withOutputFileConfig(
OutputFileConfig.builder()
.withPartPrefix("log-")
.withPartSuffix(".log")
.build()
)
// 按照目录分桶:每个小时一个目录
.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))
// 文件滚动策略:1min或1MB
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(1))
.withMaxPartSize(new MemorySize(1024 * 1024))
.build()
)
.build();

stream.sinkTo(fieSink);

2. 输出到 Kafka

  • 输出无 key 的 record
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 如果是精准一次,必须开启checkpoint
env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);

KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
// 指定kafka的地址和端口
.setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
// 指定Topic名称、序列化器
.setRecordSerializer(
KafkaRecordSerializationSchema.<String>builder()
.setTopic("mytopic")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
// 写到kafka的一致性级别:精准一次、至少一次等
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
// 如果是精准一次,必须设置事务的前缀
.setTransactionalIdPrefix("trans-")
// 如果是精准一次,必须设置事务超时时间:大于checkpoint间隔,小于15min
.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")
.build();

stream.sinkTo(kafkaSink);
  • 实现带 key 的 record(自定义序列化器)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);

KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
.setRecordSerializer(
new KafkaRecordSerializationSchema<String>() {
@Nullable
@Override
public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {
String[] datas = element.split(",");
byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);
byte[] value = element.getBytes(StandardCharsets.UTF_8);
return new ProducerRecord<>("mytopic", key, value);
}
}
)
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("trans-")
.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")
.build();

sensorDS.sinkTo(kafkaSink);

3. 输出到 MySQL

  • 添加 MySQL 连接器依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.27</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>1.17-SNAPSHOT</version>
</dependency>

<!-- 官方还未提供flink-connector-jdbc的1.17.0的正式依赖,暂时从apache snapshot仓库下载,在pom文件中指定仓库路径 -->
<repositories>
<repository>
<id>apache-snapshots</id>
<name>apache snapshots</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
</repository>
</repositories>
  • maven 配置中如果有 <mirrorOf>*</mirrorOf>,表示所有包都要从该仓库中找,此时在 pom 文件中指定的 repositories 会无效。可将配置改为 <mirrorOf>*,!repository_id</mirrorOf>,排除 pom 中的仓库
  • 输出到 MySQL
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
SinkFunction<WaterSensor> jdbcSink = JdbcSink.sink(
// 执行的SQL,一般是insert
"insert into ws values(?,?,?)",
// 预编译SQL,对占位符填充值
new JdbcStatementBuilder<WaterSensor>() {
@Override
public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {
//每收到一条WaterSensor,如何去填充占位符
preparedStatement.setString(1, waterSensor.getId());
preparedStatement.setLong(2, waterSensor.getTs());
preparedStatement.setInt(3, waterSensor.getVc());
}
},
// 执行选项:达到100条数据或达到3s提交批次
JdbcExecutionOptions.builder()
.withMaxRetries(3) // 重试次数
.withBatchSize(100) // 批次的大小
.withBatchIntervalMs(3000) // 批次的时间
.build(),
// 连接选项:url、用户名、密码等
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://hadoop102:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
.withUsername("root")
.withPassword("123456")
.withDriverName("com.mysql.cj.jdbc.Driver")
.withConnectionCheckTimeoutSeconds(60) // 连接的超时时间
.build()
);

sensorDS.addSink(jdbcSink);

4. 自定义 Sink 输出

  • 与 Source 类似,Flink 提供了通用的 SinkFunction 接口和对应的 RichSinkFunction 抽象类,只要实现它,通过简单地调用 DataStream.addSink() 方法就可以自定义写入任何外部存储
  • 自定义 Sink 想要实现状态一致性并不容易,所以并不常用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static class MySink extends RichSinkFunction<String> {

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 创建连接
}

@Override
public void close() throws Exception {
super.close();
// 做一些清理、销毁连接
}

@Override
public void invoke(String value, Context context) throws Exception {
// 写出逻辑,来一条数据,调用一次
}
}