一、窗口概述
- 批处理统计中,可以等待一批数据都到齐后,统一处理。但是在实时处理统计中,来一条数据就处理一条。如果要统计最近一段时间内的数据就需要窗口
- Flink 中的窗口可以理解成一个桶,窗口可以把数据流切割成有限大小的多个桶,每个数据都会分发到对应的桶中。当到达窗口结束时间时,就会对桶中收集到的数据进行计算处理,之后关闭窗口
- 窗口并不是静态准备好的,而是动态创建:当有落在这个窗口区间范围的数据达到时,才创建对应的窗口
二、窗口分类
1. 按照驱动类型分类
(1) 时间窗口(Time Windows)
- 时间窗口以时间点来定义窗口的开始和结束,所以截取出的就是某一时间段的数据
- 到达结束时间时,窗口不再收集数据,触发计算输出结果,并将窗口关闭销毁
(2) 计数窗口(Count Window)
- 计数窗口基于元素的个数来截取数据,窗口截取数据的个数,就是窗口的大小
- 到达固定的个数时,窗口不再收集数据,触发计算输出结果,并将窗口关闭销毁
2. 按照分配数据的规则分类
(1) 滚动窗口(Tumbling Window)
- 滚动窗口有固定的大小,是一种对数据进行“均匀切片”的划分方式,是最简单的窗口形式
- 滚动窗口可以基于时间定义,也可以基于数据个数定义。需要的参数只有一个,就是窗口大小
- 窗口之间没有重叠,也不会有间隔,是“首尾相接”的状态。每个数据都会被分配到一个窗口,而且只会属于一个窗口
- 滚动窗口应用非常广泛,它可以对每个时间段做聚合统计,很多 BI 分析指标都可以用它来实现
(2) 滑动窗口(Sliding Window)
- 滑动窗口的大小也是固定的。但是窗口之间并不是首尾相接的,而是可以“错开”一定的位置
- 定义滑动窗口的参数有两个:一个是窗口大小,另一个是滑动步长
- 当滑动步长小于窗口大小时,滑动窗口就会出现重叠,这时数据可能会被同时分配到多个窗口中(
窗口大小 / 滑动步长
)
- 当滑动步长等于窗口大小时,就成了滚动窗口
- 因为窗口在结束时间会触发计算输出结果,所以滑动步长就代表了窗口的计算频率
- 滑动窗口适合计算结果更新频率非常高的场景
(3) 会话窗口(Session Window)
- 会话窗口是基于“会话”(session)来对数据进行分组的。会话窗口只能基于时间来定义
- 会话窗口中,最重要的参数就是会话的超时时间,也就是两个会话窗口之间的最小距离
- 如果相邻两个数据到来的时间间隔小于超时时间,就说明还在保持会话,它们就属于同一个窗口
- 如果大于超时时间,那么新来的数据就应该属于新的会话窗口,而前一个窗口就应该关闭了
- 会话窗口的长度不固定,起始和结束时间也不确定,各个分区之间窗口没有任何关联
- 会话窗口之间一定是不会重叠的,而且会留有至少为超时时间的间隔
- 在一些类似保持会话的场景下,可以使用会话窗口来进行数据的处理统计
(4) 全局窗口(Global Window)
- 全局窗口会把相同 key 的所有数据都分配到同一个窗口中
- 全局窗口没有结束时,默认是不会做触发计算的。如果希望它能对数据进行计算处理,还需要自定义触发器
- 一般在做更加灵活的窗口处理时自定义使用。Flink 中的计数窗口底层就是用全局窗口实现的
三、窗口 API 概览
- 在定义窗口操作之前,首先需要确定,到底是基于按键分区的数据流 KeyedStream 来开窗,还是直接在没有按键分区的 DataStream 上开窗
- 基于 KeyedStream:进行窗口操作时,窗口计算会在多个并行子任务上同时执行。相同 key 的数据会被发送到同一个并行子任务,窗口操作会基于每个 key 进行单独的处理
- 基于 DataStream:窗口逻辑只能在一个任务上执行,就相当于并行度变成了 1,且无法手动调整其并行度
- 窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)
1 2 3 4 5 6 7 8
| stream .keyBy(<KeySelector>) .window(<WindowAssigner>) .aggregate(<WindowFunction>);
stream .windowAll(<WindowAssigner>) .aggregate(<WindowFunction>);
|
四、窗口分配器
- 定义窗口分配器(Window Assigners)是构建窗口算子的第一步,它的作用就是定义数据应该被分配到哪个窗口,也就是在指定窗口的类型
- 除去需要自定义的全局窗口外,其他常用的窗口类型 Flink 中都给出了内置的分配器实现
1 2 3
| WindowedStream ws = keyedStream.window(<WindowAssigner>);
AllWindowedStream aws = dataStream.windowAll(<WindowAssigner>);
|
1. 时间窗口
1 2 3 4
| stream.keyBy(...) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .aggregate(...)
|
1 2 3 4
| stream.keyBy(...) .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) .aggregate(...)
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| stream.keyBy(...) .window(ProcessingTimeSessionWindows.withDynamicGap( new SessionWindowTimeGapExtractor<InType>() { @Override public long extract(InType value) { return xxx; } } )); .aggregate(...)
|
1 2 3
| stream.keyBy(...) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .aggregate(...)
|
1 2 3
| stream.keyBy(...) .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) .aggregate(...)
|
1 2 3
| stream.keyBy(...) .window(EventTimeSessionWindows.withGap(Time.seconds(10))) .aggregate(...)
|
2. 计数窗口
1 2 3 4
| stream.keyBy(...) .countWindow(10) .aggregate(...)
|
1 2 3 4
| stream.keyBy(...) .countWindow(10, 3) .aggregate(...)
|
3. 全局窗口
- 全局窗口是计数窗口的底层实现,一般在需要自定义窗口时使用
- 使用全局窗口必须自行定义触发器才能实现窗口计算,否则起不到任何作用
1 2 3 4
| stream.keyBy(...) .window(GlobalWindows.create()) .evictor(...) .trigger(...)
|
五、窗口函数
- 定义窗口分配器之后,需要定义窗口函数。窗口函数定义了要对窗口中收集的数据做的计算操作。根据处理的方式可以分为增量聚合函数和全窗口函数
1. 增量聚合函数
- 增量聚合:每来一个数据就在之前结果上聚合一次,窗口触发的时候输出计算结果
- Flink 为窗口的聚合提供了一系列预定义的简单聚合方法,可以直接基于 WindowedStream 调用。主要包括 sum()、max()、maxBy()、min()、minBy(),与 KeyedStream 的简单聚合非常相似。它们的底层,其实都是通过 AggregateFunction 来实现的
(1) 归约函数(ReduceFunction)
1 2 3 4 5 6 7 8 9 10 11
| WindowedStream<InType, String, TimeWindow> windowedStream = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
SingleOutputStreamOperator<InType> reduce = windowedStream.reduce( new ReduceFunction<InType>() { @Override public InType reduce(InType value1, InType value2) throws Exception { return xxx; } } );
|
(2) 聚合函数(AggregateFunction)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| WindowedStream<InType, String, TimeWindow> windowedStream = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
SingleOutputStreamOperator<String> aggregate = windowedStream.aggregate( new AggregateFunction<InType, Integer, String>() { @Override public Integer createAccumulator() { return 0; } @Override public Integer add(InType value, Integer accumulator) { return xxx; } @Override public String getResult(Integer accumulator) { return accumulator.toString(); } @Override public Integer merge(Integer a, Integer b) { return null; } } );
|
2. 全窗口函数
- 增量聚合函数无法满足的场景:①有些场景下,需要做的计算必须基于全部的数据才有效;②输出的结果可能要包含上下文中的一些信息(比如窗口的起始时间)
- 全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算
(1) 窗口函数(WindowFunction)
- 老版本的通用窗口函数接口,WindowFunction 类中可以获取到包含窗口所有数据的可迭代集合,还可以拿到窗口本身的信息
- 不过 WindowFunction 能提供的上下文信息较少,也没有更高级的功能。它的作用可以被 ProcessWindowFunction 全覆盖,所以之后可能会逐渐弃用
1 2 3 4 5 6 7 8 9
| windowedStream.apply( new WindowFunction<InType, String, String, TimeWindow>() { @Override public void apply(String s, TimeWindow window, Iterable<InType> input, Collector<String> out) throws Exception { } } );
|
(2) 处理窗口函数(ProcessWindowFunction)
- ProcessWindowFunction 是 Window API 中最底层的通用窗口函数接口。除了可以拿到窗口中的所有数据之外,还可以获取到一个“上下文对象”,不仅能够获取窗口信息,还可以访问当前的处理时间、事件时间水位线和状态信息等
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| windowedStream.process( new ProcessWindowFunction<InType, String, String, TimeWindow>() { @Override public void process(String s, Context context, Iterable<InType> elements, Collector<String> out) throws Exception { long startTs = context.window().getStart(); long endTs = context.window().getEnd(); String start = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss"); String end = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss"); long count = elements.spliterator().estimateSize(); } } );
|
3. 增量聚合和全窗口函数的结合使用
- 调用增量聚合函数时,可以传入全窗口函数
- 基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输出结果
- 此时的全窗口函数就不再缓存所有数据了,而是直接将增量聚合函数的结果拿来当作了 Iterable 类型的输入
1 2 3 4 5 6 7 8
| windowedStream.aggregate( new AggregateFunction<InType, AccType, OutType>() { }, new ProcessWindowFunction<OutType, OutOutType, KeyType, WindowType>() { } );
|
六、触发器和移除器
1. 触发器(Trigger)
- 触发器主要是用来控制窗口什么时候触发计算(计算窗口结果并输出)
1 2 3
| stream.keyBy(...) .window(...) .trigger(new MyTrigger())
|
2. 移除器(Evictor)
1 2 3
| stream.keyBy(...) .window(...) .evictor(new MyEvictor())
|
七、源码解析
1. 什么时候触发窗口函数
ProcessingTimeTrigger.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> { public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) { ctx.registerProcessingTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; }
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; }
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) { return TriggerResult.FIRE; } ...... }
|
2. 窗口的起始和结束
1 2 3 4 5 6 7 8 9 10 11 12 13
| public class TimeWindow extends Window { public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) { final long remainder = (timestamp - offset) % windowSize; if (remainder < 0) { return timestamp - (remainder + windowSize); } else { return timestamp - remainder; } } }
|
TumblingProcessingTimeWindows.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) { final long now = context.getCurrentProcessingTime(); if (staggerOffset == null) { staggerOffset = windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size); } long start = TimeWindow.getWindowStartWithOffset(now, (globalOffset + staggerOffset) % size, size); return Collections.singletonList(new TimeWindow(start, start + size)); } }
|
3. 窗口的生命周期
- 创建:属于本窗口的第一条数据来的时候创建,放入一个 singletonList 中
- 销毁:时间进展 >= 窗口的最大时间戳(end - 1ms) + 允许迟到的时间(默认0)