学习资料 – 第八章 Flink时间、窗口及操作 枫叶云笔记 (fynote.com)
包括处理时间和事件时间,通过指定时间属性,我们可以对数据进行排序、窗口划分和触发等操作。
窗口机制用于将无限的数据流划分为有限大小的窗口,以便进行聚合、分析或转换,在Flink中有各种窗口类型,它们根据时间或其他属性划分数据流进而在窗口内执行聚合、计数、求和等操作,以获得窗口内数据的统计结果。
时间语义Time
根据使用时间的不同Flink中将时间划分为三种语义:
事件生成时间(Event Time)
事件摄入时间(Ingestion Time)
事件处理时间(Processing Time)

Event Time – 事件生成时间
Event Time是每个事件在其生成设备上发生的时间,这个时间往往是嵌入在事件记录中,例如一条数据中的时间戳记录了该事件数据的产生时间,该时间与下游Flink处理时系统时间无关。
Event Time 时间语义在实际生产环境中使用较多,该时间语义能保证乱序数据处理的准确性。
Ingestion Time – 事件摄入时间
Ingestion Time是指数据进入到Flink系统的时间,该时间依赖于Flink对应Source Operator所在主机节点的系统时间。
这种时间语义不能处理事件乱序问题,在实际工作场景中使用较少。
Processing Time – 事件处理时间
事件处理时间是指数据被Operator处理时当前所在主机的系统时间。
这种语义虽然处理数据性能高但不能解决数据乱序和延迟问题,从而导致数据统计不是太精准。Processing Time适合时间计算精度要求不高的计算场景。
Flink中最常用的时间语义是事件时间EventTime,在Flink1.12版本之前Flink中默认使用的是Process Time时间语义,在Flink1.12版本后,默认的时间语义为EventTime。
Watermark水位线
如果Flink程序接收到的事件是乱序的,那么此时触发[0~5)窗口显然会丢失后续的4这条数据,导致后续计算结果不准确,为了正确的统计结果还需要继续“等一等”后续的事件,但是这里我们又不能无限期的等待下去,所以这里需要有一个时间标记来决定何时触发窗口,这个时间标记就是watermark。

在Flink中,watermark是一种衡量事件时间进展的机制,watermark是一种特殊的数据记录,watermark本质就是一个时间戳,基于Flink接收到的事件时间(Event Time)计算得到,并且该时间标记会随着数据流往后流动,当Flink算子接收到Watermark(t)事件时,可以认为早于或等于t时刻的事件时间已经完全到达。
watermark原理及特点
有序事件watermark生成
Flink接收到的事件如果是有序的,watermark的值就是当前计算watermark时刻Flink接收到的事件时间,并且watermark的值会随着Flink接收到的事件时间默认每隔200ms计算一次,往后推移。
此时,如果进行窗口计算,由于流数据没有乱序,所以在接收到窗口结束对应时刻的事件时间时也没有必要“等一等”,直接触发窗口计算即可,这样就能将数据正确的划分到正确的窗口内进行计算。

无序事件watermark生成
watermark的值为使用Flink此刻已经接收到的最大事件时间减去“等待”的时间,公式如下:
Watermark=进入Flink的最大的事件时间(maxEventTime)-指定的延迟时间(t)-1ms
waterkmark是一个时间标记,随着时间的推移,只能增大不能减少
当周期性计算watermark的值一旦确定,那么Flink认为该watermark时刻之前的所有数据(即使有乱序)都已全部达到。
下图中计算watermark时指定的延迟时间为4s(后续默认统一为s单位)。当事件15到达Flink后,如果此刻计算watermark的值那么watermark的值为当前Flink程序接收到的最大事件时间15减去等待延迟时间4为11,后续继续有乱序事件9、12到达Flink,但由于watermark的值只能增加不能减少的特点,watermark的值还是11。按照下图的乱序事件到达的情况,当接收到20事件时,Flink计算watermark的值为此刻接收到的事件时间最大值21减去延迟时间4得到17,那么此刻watermark的值为17。

在有序流中指定的延迟时间就相当于是0,在乱序流中需要我们根据实际情况来指定延迟时间,总之,当计算得到watermark时就代表Flink的事件时间推进到了该时刻,我们就可以认为watermark时刻之前的数据全部已经到达Flink,不会再有延迟数据到达。
通过了解以上watermark的生成原理,我们也能得到watermark具备如下特点:
1.watermark是衡量事件时间进展的机制,watermark值代表了当前事件时间的进展,Flink处理乱序事件时,以watermark为标准,只要计算出watermark的值t,那么就认为当期t时刻之前的所有数据全部达到,之后流中不会有小于t时刻的事件时间数据。
2.watermark本质是一个时间,基于事件时间生成并单调递增。
3.watermark是周期性生成并插入到数据流中,默认每隔200ms计算一次watermark,这里的200ms时间是以系统时间为基准。
4.watermark计算公式为:进入Flink的最大的事件时间(mxtEventTime)-指定的延迟时间(t),可以指定延迟时间确保正确处理乱序问题。
watermark传递与对齐机制
在Flink多并行度下,全局watermark是每个并行度中watermark最小的值。
如下图示例,Flink读取源数据的并行度为4,根据每个并行度中处理的事件时间得到每个并行度中的watermark值分别为2,4,3,6,那么此刻Watermark取最小值2,并将该watermark广播给Flink下游,从而下游不必再基于原始事件时间处理就能知道当前的全局watermark。

当每个并行度中的watermark更新时,同样会选择每个并行度最小的watermark当做全局的watermark,依次往后推进watermark的值,如下图所示:

综上来看,Flink在多并行度情况下会选择所有并行度中最小的watermark当做全局Watermark并广播给下游。
在Flink多个Source流进行关联时,每个source流都可以单独设置watermark,多Source进行关联后,Flink全局watermark值选择原理与上面相同,会选择多Source流中最小的watermark值当做Flink全局watermark。
watermark alignment对齐机制
watermark作为处理事件时间的核心很多操作依赖于watermark来触发计算,如:window窗口、CEP操作等,Flink多并行度或Flink多souce情况下,Flink默认会选择并行度或者多Source中watermark最小值作为全局watermark。如果某个并行度或者Source读取数据的速度过快,对应的watermark前进速度也会过快,默认这种选择watermark机制下,超前于较慢一侧的数据就会被存储在Operator的状态中,两个并行/Source之间读取速度差距越大则状态中存储的数据也越大,这对存储空间及状态恢复都会产生很大的挑战。
为了解决以上问题,Flink在1.15版本引入了watermark对齐机制(watermark alignment),如果某个并发或Source读取速度过快,watermark对齐机制就会暂停这个并发/source的读取,等待较慢一侧追赶上来之后再恢复读取,watermark对齐机制原理如下:

Flink Source由两部分组成,一部分是存在于JobManager中的Source Coordinator ,另一部分是存在于各TaskManager上的Source Operator,其中Source Coordinator属于协调者角色,Coordinator会周期性检查各Source Operator所汇报的watermark值是否达到当前允许的watermark最大偏移,如果某个Operator超过了watermark最大偏移,那么就会暂停读取Source数据,直到下个watermark检查周期。
Flink 多Source之间watermark对齐机制在JobManager上引入了CoordinatorStore来让各Source Coordinator之间交换信息,在watermark对齐机制下,各个Source Coordinator 通过CoordinatorStore来协调最大的允许watermark。
在编写Flink代码时,可以通过以下方式指定watermark对齐机制:
WatermarkStrategy
.<Tuple2<Long,String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withWatermarkAlignment("alignment-group-1", Duration.ofSeconds(20), Duration.ofSeconds(1));
以上withWatermarkAlignment即是开启watermark对齐机制,其中需要传入3个参数,第一个参数是指定watermark对齐组,可以对不同的Source源设置相同的对齐组,这样就可以让不同的Source之间进行watermark对齐,只有一个Source的情况下,指定该值后,多个并发之间会进行watermark对齐;第二个参数是指定最大允许的watermark偏移值,如果读取速度过快一侧watermark较读取数据慢一侧watermark达到该值,那么读取速度快的一侧就会暂停读取数据;第三个参数是watermark检查间隔,Flink会每隔该时间检查watermark以决定是否对读取速度过快的并行/Source暂停读取数据,默认值为1秒。