学习资料 – 第八章 Flink时间、窗口及操作 枫叶云笔记 (fynote.com)

Flink窗口是处理无界数据流的核心

例如,在实时读取基站日志数据实时流中,我们可以定义每5秒统计某基站主叫总数量,那么Flink整个无界流就被划分成如下一个个窗口对应的有界流。

在Flink窗口处理中,默认使用的时区是UTC-0,也就是Flink窗口时间起始范围是从1970-01-01 00:00:00开始固定往后划分的,假设5秒划分一个窗口,那么每个窗口对应的起始时间点是固定的,如:第一个窗口起始时间为[1970-01-01 00:00:00~1970-01-01 00:00:05),第二个窗口起始时间为[1970-01-01 00:00:05~1970-01-01 00:00:10)... ,并且这些窗口“含头不含尾”,即第一个窗口中不包含1970-01-01 00:00:05时间产生的数据,该时刻产生的数据会归到下一个窗口,下个窗口同样也是“含头不含尾”的方式处理窗口内的数据,窗口的“含头不含尾”处理方式就是我们在watermark小节中提到watermark计算方式减去1毫秒处理方式形成的。

Keyed 和 Non-Keyed Window

Keyed Window

数据流经过keyby算子操作形成KeyedStream后,进行窗口设置就形成了KeyedWindow,所谓KeyedWindow是针对每个Key都会单独设置窗口,相同的key数据会被同一个并行任务处理,窗口操作会针对每个key单独进行窗口划分,最终针对每个key输出结果。

Non-KeyedWindow

数据流如果没有经过keyby算子处理也可以直接应用窗口操作,这就是Non-KeyedWindow,这种情况下所有的数据都会划分到同一个窗口中被一个task任务进行处理,得到全局数据对应窗口统计结果,值得注意的是这种Non-KeyedWindow是非并行计算与Flink程序设置的并行度无关,所以在实际场景中这种Non-KeyWindow使用不多。

Flink 窗口分类

基于时间的窗口
基于数量统计的窗口

Keyed Window和Non-KeyedWindow都支持时间窗口和数量窗口,时间窗口需要通过Window Assigners(窗口分配器)来分配不同的时间窗口类型触发窗口,数量窗口需要通过调用countWindow方法并传入窗口事件数来触发执行。

窗口分配器(Window Assigners)

在Flink中可以通过WindowAssigners(窗口分配器)将数据分配到不同的时间窗口,然后跟上对应的窗口函数(如process,aggregate…)进行具体业务逻辑处理。

WindowAssigners支持四种窗口类型

滚动窗口(Tumbling Windows)
滑动窗口(Sliding Windows)
会话窗口(Session Windows)
全局窗口(Global Windows)

滚动窗口(Tumbling Window)

滚动窗口是根据固定时间大小进行切分,窗口大小固定并且各窗口首尾相接,每个窗口时间范围之间不重叠,例如:指定滚动窗口大小为5秒,那么每5秒都会有一个窗口生成并计算,如下图所示:

代码示例:

#基于Process Time处理时间
keyedDs.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))

#基于EventTime事件时间
keyedDs.window(TumblingEventTimeWindows.of(Time.seconds(5)))

of(Time)方法同时有一个重载方法of(Time,offset),该方法 可以同时指定一个offset

例如:TumblingEventTimeWindows.of(Time.seconds(5), Time.hours(-8)),offset参数用于对齐窗口,例如:不设置offset时,长度为1小时的窗口为: 1:00:00.000 - 1:59:59.999、2:00:00.000 - 2:59:59.999...,如果我们想要改变窗口对齐方式,可以设置offset,假设指定了offset为15分钟(TumblingEventTimeWindows.of(Time.hours(1), Time.minutes(15))),那么窗口起始时间为1:15:00.000 - 2:14:59.999、2:15:00.000 - 3:14:59.999...,该参数重要的一个使用场景是根据UTC-0调整窗口时差,Flink中窗口划分时间使用的是标准时间戳:1970-01-01 00:00:00,即UTC-0,中国属于东八区,那么对应的时间是UTC+8,也就是1970-01-01 08:00:00,如果每1小时划分一个窗口,在中国基于UTC+8时间每天划分的第一个窗口是08~09点,所以为了能使一天的窗口时间从0点开始就需要对齐窗口,可以在中国设置改offset为-8即可。

滑动窗口(Sliding Window)

滑动窗口与滚动窗口类似,滑动窗口长度大小固定,使用滑动窗口时需要指定一个时间参数表示窗口长度(window size),既然是滑动窗口,那么同时还需要指定一个窗口滑动步长(window slide)来控制生成窗口的频率,例如:滑动窗口中指定window size 为10秒并且widow slide 为5秒,就代表每隔5秒生成一个包含最近10秒时间范围内的窗口进行计算,如下图所示:

当窗口滑动时间(Window Slide)小于窗口长度(Window Size)时,就会出现窗口重叠,这种情况下一个事件可能被分发到多个窗口中;当窗口滑动时间(Window Slide)和窗口长度(Window Size)相同时,滑动窗口与滚动窗口一样,可以理解为滚动窗口就是一种特殊的滑动窗口,滑动窗口定义更加灵活;

在使用滑动窗口时,我们一般会指定窗口滑动时间小于等于窗口长度,两者尽量是倍数关系。

滑动窗口代码示例:

#基于Process Time处理时间
keyedDs.Window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))

#基于EventTime事件时间
keyedDs.Window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))

第一个参数是窗口长度(Window Size),第二个参数是窗口滑动间隔时间(Window Slide)。

会话窗口(Session Window)

会话窗口(Session Windows)主要是将某段时间内活跃度较高的数据聚合成一个窗口进行计算,窗口的触发的条件是Session Gap(会话间隔),是指在规定的时间内如果没有活跃数据接入,则认为窗口结束,然后触发窗口计算结果。

与滑动窗口、滚动窗口不同的是Session Windows不需要有固定windows size和slide time,只需要定义session gap,来规定不活跃数据的时间上限即可,此外,会话窗口不会相互重叠。

Session Windows 窗口类型比较适合非连续性数据处理或周期性产生数据场景

SessionWindow本质上没有固定的起止时间点,在Flink内部,会话窗口会为每条数据创建一个窗口,然后将距离不超过预设间隔时间的窗口合并,合并后成为一个SessionWindow并触发执行。

会话窗口代码示例:

#基于Process Time处理时间
keyedDs.window(ProcessingTimeSessionWindows.withGap(Time.seconds(3)))
或者
keyedDs.window(ProcessingTimeSessionWindows.withDynamicGap(...))

#基于EventTime事件时间
keyedDs.window(EventTimeSessionWindows.withGap(Time.seconds(3)))
或者
keyedDs.window(EventTimeSessionWindows.withDynamicGap(...))

ProcessingTimeSessionWindows调用withGap方法传入一个时间,该时间表示的就是对应数据没有输入的超时时间,超过该时间后就会自动划分形成会话窗口,ProcessingTimeSessionWindows除了有withGap方法外,还有withDynamicGap方法,通过该方法可以根据不同数据动态决定数据没有输入的超时时间。

全局窗口(Global Window)

在Flink中全局窗口是一种特殊的窗口,针对KeyedStream会将所有相同的key数据汇集在一个全局窗口中,针对Non-keyedStream会将所有数据汇集到一个全局窗口中,该窗口的触发需要我们自定对应的窗口触发器(trigger),在定义的trigger中我们可以指定全局窗口是基于时间触发还是基于数据条数触发,或是基于自己的逻辑规则进行窗口触发。如果不定义trigger那么全局窗口不会被触发执行。

代码中指定使用Global Window 全局窗口方式如下:

#必须设置trigger
... ...
keyedDs.window(GlobalWindows.create())
        //自定义触发器
        .trigger(new MyTrigger())
... ...

计数窗口(Count Window)

计数窗口(CountWindow) 根据固定的事件数量定义窗口大小,跟时间没有关系,当数据达到指定事件数量时窗口触发执行,计数窗口底层的实现就是全局窗口(Global Window)实现。

计数窗口代码示例:

#针对KeyedStream
keyedStream.countWindow(3)
或者
keyedStream.countWindow(5,2)


#针对Non-KeyedStream
DataStream.countWindowAll(3)
或者
DataStream.countWindowAll(5,2)

与其他窗口类似,Flink中支持针对keyedStream和Non-KeyedStream使用CountWindow,直接针对流调用countWindow(size)/countWindowAll(size)方法传入对应的窗口事件个数即可,当达到size事件个数时就会触发对应的窗口,这种叫做“滚动计数窗口”,同时CountWindow也支持“滑动计数窗口”,通过调用countWindow(size,slide)/countWindowAll(size,slide)方法实现,size为窗口大小,slide为滑动步长,意为每隔slide个事件对最近size个事件进行划分窗口统计。

作者 张, 宴银

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注