Flink 中的窗口

一、窗口概述

  • 批处理统计中,可以等待一批数据都到齐后,统一处理。但是在实时处理统计中,来一条数据就处理一条。如果要统计最近一段时间内的数据就需要窗口
  • Flink 中的窗口可以理解成一个桶,窗口可以把数据流切割成有限大小的多个桶,每个数据都会分发到对应的桶中。当到达窗口结束时间时,就会对桶中收集到的数据进行计算处理,之后关闭窗口
  • 窗口并不是静态准备好的,而是动态创建:当有落在这个窗口区间范围的数据达到时,才创建对应的窗口

concept

二、窗口分类

1. 按照驱动类型分类

(1) 时间窗口(Time Windows)

  • 时间窗口以时间点来定义窗口的开始和结束,所以截取出的就是某一时间段的数据
  • 到达结束时间时,窗口不再收集数据,触发计算输出结果,并将窗口关闭销毁

(2) 计数窗口(Count Window)

  • 计数窗口基于元素的个数来截取数据,窗口截取数据的个数,就是窗口的大小
  • 到达固定的个数时,窗口不再收集数据,触发计算输出结果,并将窗口关闭销毁

time-and-count-window

2. 按照分配数据的规则分类

(1) 滚动窗口(Tumbling Window)

  • 滚动窗口有固定的大小,是一种对数据进行“均匀切片”的划分方式,是最简单的窗口形式
  • 滚动窗口可以基于时间定义,也可以基于数据个数定义。需要的参数只有一个,就是窗口大小
  • 窗口之间没有重叠,也不会有间隔,是“首尾相接”的状态。每个数据都会被分配到一个窗口,而且只会属于一个窗口
  • 滚动窗口应用非常广泛,它可以对每个时间段做聚合统计,很多 BI 分析指标都可以用它来实现

tumbling-window

(2) 滑动窗口(Sliding Window)

  • 滑动窗口的大小也是固定的。但是窗口之间并不是首尾相接的,而是可以“错开”一定的位置
  • 定义滑动窗口的参数有两个:一个是窗口大小,另一个是滑动步长
    • 当滑动步长小于窗口大小时,滑动窗口就会出现重叠,这时数据可能会被同时分配到多个窗口中(窗口大小 / 滑动步长
    • 当滑动步长等于窗口大小时,就成了滚动窗口
  • 因为窗口在结束时间会触发计算输出结果,所以滑动步长就代表了窗口的计算频率
  • 滑动窗口适合计算结果更新频率非常高的场景

sliding-window

(3) 会话窗口(Session Window)

  • 会话窗口是基于“会话”(session)来对数据进行分组的。会话窗口只能基于时间来定义
  • 会话窗口中,最重要的参数就是会话的超时时间,也就是两个会话窗口之间的最小距离
    • 如果相邻两个数据到来的时间间隔小于超时时间,就说明还在保持会话,它们就属于同一个窗口
    • 如果大于超时时间,那么新来的数据就应该属于新的会话窗口,而前一个窗口就应该关闭了
  • 会话窗口的长度不固定,起始和结束时间也不确定,各个分区之间窗口没有任何关联
  • 会话窗口之间一定是不会重叠的,而且会留有至少为超时时间的间隔
  • 在一些类似保持会话的场景下,可以使用会话窗口来进行数据的处理统计

session-window

(4) 全局窗口(Global Window)

  • 全局窗口会把相同 key 的所有数据都分配到同一个窗口中
  • 全局窗口没有结束时,默认是不会做触发计算的。如果希望它能对数据进行计算处理,还需要自定义触发器
  • 一般在做更加灵活的窗口处理时自定义使用。Flink 中的计数窗口底层就是用全局窗口实现的

global-window

三、窗口 API 概览

window-api

  • 在定义窗口操作之前,首先需要确定,到底是基于按键分区的数据流 KeyedStream 来开窗,还是直接在没有按键分区的 DataStream 上开窗
    • 基于 KeyedStream:进行窗口操作时,窗口计算会在多个并行子任务上同时执行。相同 key 的数据会被发送到同一个并行子任务,窗口操作会基于每个 key 进行单独的处理
    • 基于 DataStream:窗口逻辑只能在一个任务上执行,就相当于并行度变成了 1,且无法手动调整其并行度
  • 窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)
1
2
3
4
5
6
7
8
stream
.keyBy(<KeySelector>)
.window(<WindowAssigner>)
.aggregate(<WindowFunction>);

stream
.windowAll(<WindowAssigner>)
.aggregate(<WindowFunction>);

四、窗口分配器

  • 定义窗口分配器(Window Assigners)是构建窗口算子的第一步,它的作用就是定义数据应该被分配到哪个窗口,也就是在指定窗口的类型
  • 除去需要自定义的全局窗口外,其他常用的窗口类型 Flink 中都给出了内置的分配器实现
1
2
3
WindowedStream ws = keyedStream.window(<WindowAssigner>);

AllWindowedStream aws = dataStream.windowAll(<WindowAssigner>);

1. 时间窗口

  • 滚动处理时间窗口
1
2
3
4
stream.keyBy(...)
// 长度为5秒的滚动窗口
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.aggregate(...)
  • 滑动处理时间窗口
1
2
3
4
stream.keyBy(...)
// 长度为10秒,滑动步长为5秒的滑动窗口(5秒触发一次)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(...)
  • 会话处理时间窗口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
stream.keyBy(...)
// 会话超时时间为10秒的窗口
//.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
// 每条数据来时都会更新超时时间(由最新数据的超时时间决定)
.window(ProcessingTimeSessionWindows.withDynamicGap(
new SessionWindowTimeGapExtractor<InType>() {
@Override
public long extract(InType value) {
// 单位ms
return xxx;
}
}
));
.aggregate(...)
  • 滚动事件时间窗口
1
2
3
stream.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.aggregate(...)
  • 滑动事件时间窗口
1
2
3
stream.keyBy(...)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(...)
  • 会话事件时间窗口
1
2
3
stream.keyBy(...)
.window(EventTimeSessionWindows.withGap(Time.seconds(10)))
.aggregate(...)

2. 计数窗口

  • 滚动计数窗口
1
2
3
4
stream.keyBy(...)
// 长度为10的滚动计数窗口
.countWindow(10)
.aggregate(...)
  • 滑动计数窗口
1
2
3
4
stream.keyBy(...)
// 长度为10,滑动步长为3的滑动计数窗口(每来3个数据触发一次)
.countWindow(10, 3)
.aggregate(...)

3. 全局窗口

  • 全局窗口是计数窗口的底层实现,一般在需要自定义窗口时使用
  • 使用全局窗口必须自行定义触发器才能实现窗口计算,否则起不到任何作用
1
2
3
4
stream.keyBy(...)
.window(GlobalWindows.create())
.evictor(...)
.trigger(...)

五、窗口函数

  • 定义窗口分配器之后,需要定义窗口函数。窗口函数定义了要对窗口中收集的数据做的计算操作。根据处理的方式可以分为增量聚合函数和全窗口函数

1. 增量聚合函数

  • 增量聚合:每来一个数据就在之前结果上聚合一次,窗口触发的时候输出计算结果
  • Flink 为窗口的聚合提供了一系列预定义的简单聚合方法,可以直接基于 WindowedStream 调用。主要包括 sum()、max()、maxBy()、min()、minBy(),与 KeyedStream 的简单聚合非常相似。它们的底层,其实都是通过 AggregateFunction 来实现的

(1) 归约函数(ReduceFunction)

1
2
3
4
5
6
7
8
9
10
11
WindowedStream<InType, String, TimeWindow> windowedStream = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));

SingleOutputStreamOperator<InType> reduce = windowedStream.reduce(
new ReduceFunction<InType>() {
@Override
public InType reduce(InType value1, InType value2) throws Exception {
// value1为窗口前一次聚合结果
return xxx;
}
}
);

(2) 聚合函数(AggregateFunction)

  • 聚合函数可以定义更加灵活的窗口聚合操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
WindowedStream<InType, String, TimeWindow> windowedStream = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));

SingleOutputStreamOperator<String> aggregate = windowedStream.aggregate(
// 输入数据的类型, 累加器的类型(存储的中间计算结果的类型), 输出数据的类型
new AggregateFunction<InType, Integer, String>() {
@Override
public Integer createAccumulator() {
// 创建累加器,为聚合指定初始状态
// 属于本窗口的第一条数据来时调用,且每个聚合任务只会调用一次
return 0;
}
@Override
public Integer add(InType value, Integer accumulator) {
// 将输入的元素与累加器做聚合操作
// 来一条计算一条
return xxx;
}
@Override
public String getResult(Integer accumulator) {
// 从累加器中获取聚合的输出结果
// 窗口触发时调用
return accumulator.toString();
}
@Override
public Integer merge(Integer a, Integer b) {
// 合并两个累加器,并将合并后的状态作为一个累加器返回
// 只有会话窗口才会用到
return null;
}
}
);

2. 全窗口函数

  • 增量聚合函数无法满足的场景:①有些场景下,需要做的计算必须基于全部的数据才有效;②输出的结果可能要包含上下文中的一些信息(比如窗口的起始时间)
  • 全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算

(1) 窗口函数(WindowFunction)

  • 老版本的通用窗口函数接口,WindowFunction 类中可以获取到包含窗口所有数据的可迭代集合,还可以拿到窗口本身的信息
  • 不过 WindowFunction 能提供的上下文信息较少,也没有更高级的功能。它的作用可以被 ProcessWindowFunction 全覆盖,所以之后可能会逐渐弃用
1
2
3
4
5
6
7
8
9
windowedStream.apply(
// 输入类型, 输出类型, key类型, 窗口类型
new WindowFunction<InType, String, String, TimeWindow>() {
// key, 窗口对象, 窗口数据, 采集器
@Override
public void apply(String s, TimeWindow window, Iterable<InType> input, Collector<String> out) throws Exception {
}
}
);

(2) 处理窗口函数(ProcessWindowFunction)

  • ProcessWindowFunction 是 Window API 中最底层的通用窗口函数接口。除了可以拿到窗口中的所有数据之外,还可以获取到一个“上下文对象”,不仅能够获取窗口信息,还可以访问当前的处理时间、事件时间水位线和状态信息等
1
2
3
4
5
6
7
8
9
10
11
12
13
14
windowedStream.process(
// 输入类型, 输出类型, key类型, 窗口类型
new ProcessWindowFunction<InType, String, String, TimeWindow>() {
// key, 上下文对象, 窗口数据, 采集器
@Override
public void process(String s, Context context, Iterable<InType> elements, Collector<String> out) throws Exception {
long startTs = context.window().getStart();
long endTs = context.window().getEnd();
String start = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss");
String end = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss");
long count = elements.spliterator().estimateSize();
}
}
);

3. 增量聚合和全窗口函数的结合使用

  • 调用增量聚合函数时,可以传入全窗口函数
  • 基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输出结果
  • 此时的全窗口函数就不再缓存所有数据了,而是直接将增量聚合函数的结果拿来当作了 Iterable 类型的输入
1
2
3
4
5
6
7
8
windowedStream.aggregate(
new AggregateFunction<InType, AccType, OutType>() {
// ......
},
new ProcessWindowFunction<OutType, OutOutType, KeyType, WindowType>() {
// ......
}
);

六、触发器和移除器

1. 触发器(Trigger)

  • 触发器主要是用来控制窗口什么时候触发计算(计算窗口结果并输出)
1
2
3
stream.keyBy(...)
.window(...)
.trigger(new MyTrigger())

2. 移除器(Evictor)

  • 移除器主要用来定义在触发前后移除某些数据的逻辑
1
2
3
stream.keyBy(...)
.window(...)
.evictor(new MyEvictor())

七、源码解析

1. 什么时候触发窗口函数

  • ProcessingTimeTrigger.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
// 来一条数据执行一次
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
// 注册处理时间的定时器(到达窗口最大时间戳时触发)
ctx.registerProcessingTimeTimer(window.maxTimestamp()); // `end - 1ms`
return TriggerResult.CONTINUE;
}

public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}

public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
return TriggerResult.FIRE;
}

......
}

2. 窗口的起始和结束

  • TimeWindow.java
1
2
3
4
5
6
7
8
9
10
11
12
13
public class TimeWindow extends Window {

// 当前时间, 时间偏移, 窗口大小
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
final long remainder = (timestamp - offset) % windowSize;
if (remainder < 0) {
return timestamp - (remainder + windowSize);
} else {
// start:取窗口长度时间的整数倍(13 - 13 % 10)
return timestamp - remainder;
}
}
}
  • TumblingProcessingTimeWindows.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {

// 划分窗口
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
final long now = context.getCurrentProcessingTime();
if (staggerOffset == null) {
staggerOffset = windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size);
}
long start = TimeWindow.getWindowStartWithOffset(now, (globalOffset + staggerOffset) % size, size);
// end:start + 窗口长度时间(左闭右开)
// 单例集合保证同一个窗口的数据在同一个窗口
return Collections.singletonList(new TimeWindow(start, start + size));
}
}

3. 窗口的生命周期

  • 创建:属于本窗口的第一条数据来的时候创建,放入一个 singletonList 中
  • 销毁:时间进展 >= 窗口的最大时间戳(end - 1ms) + 允许迟到的时间(默认0)