学习资料 – 第八章 Flink时间、窗口及操作 枫叶云笔记 (fynote.com)

Flink窗口在KeyedStreams 和 Non-KeyedStreams中使用的基本结构,两者的唯一区别是keyedStreams要调用KeyBy(…)后再调用window(…),而non-Keyed Stream只用直接调用windowAll(…)就可以

在对流调用window/windowAll设置完窗口后,还可以继续调用一系列的方法来对窗口数据进行处理,这些窗口操作的API包含Windows Trigger(窗口触发器)、Evictor(数据剔除器)、Lateness(时延设定)、Output Tag(输出标签)以及Windows Funciton(窗口函数)等组成部分,其中Windows Funciton是所有窗口算子必须指定的属性,其余的属性都是根据实际情况选择指定。

keyed Window

stream
  .keyBy(...)                <-  仅 keyed 窗口需要
  .window(...)               <-  必填项:"assigner"
  [.trigger(...)]            <-  可选项:"trigger" (省略则使用默认 trigger)
  [.evictor(...)]            <-  可选项:"evictor" (省略则不使用 evictor)
  [.allowedLateness(...)]    <-  可选项:"lateness" (省略则为 0)
  [.sideOutputLateData(...)] <-  可选项:"output tag" (省略则不对迟到数据使用 side output)
  .reduce/aggregate/apply()  <-  必填项:"function"
  [.getSideOutput(...)]      <-  可选项:"output tag"

Non-Key Window

stream
  .windowAll(...)            <-  必填项:"assigner"
  [.trigger(...)]            <-  可选项:"trigger" (else default trigger)
  [.evictor(...)]            <-  可选项:"evictor" (else no evictor)
  [.allowedLateness(...)]    <-  可选项:"lateness" (else zero)
  [.sideOutputLateData(...)] <-  可选项:"output tag" (else no side output for late data)
  .reduce/aggregate/apply()  <-  必填项:"function"
  [.getSideOutput(...)]      <-  可选项:"output tag"

Window API 解释:

Ø Windows Assigner:指定窗口的类型,定义如何将数据流分配到一个或多个窗口;

Ø Windows Trigger:指定窗口触发的时机,定义窗口满足什么样的条件触发计算;

Ø Evictor:用于数据剔除;

Ø allowedLateness:标记是否处理迟到数据,当迟到数据到达窗口中是否触发计算;

Ø Output Tag:标记输出标签,然后在通过getSideOutput将窗口中的数据根据标签输出;

Ø Windows Funciton:定义窗口上数据处理的逻辑,例如对数据进行sum操作。

窗口触发器(trigger)

Trigger决定了一个窗口何时被触发进而被window function处理,默认每个窗口都有一个默认的Trigger,时间窗口默认触发器为EventTimeTrigger/ProcessTimeTrigger,计数窗口默认的触发器为CountTrigger。

绝大多数情况下,我们使用默认的窗口触发器即可,如果默认的窗口触发不能满足我们需求,可以自定义窗口触发器来完成窗口触发执行。

数据剔除器(evictor)

Flink Window API 中的数据剔除器(Evictor)可以在窗口触发前或者后对窗口内的数据进行删除。

Flink有三个内置的Evictor,默认情况下,所有内置的 evictor 逻辑都在调用窗口函数前执行。

CountEvictor: 仅记录用户指定数量的元素,一旦窗口中的元素超过这个数量,多余的元素会从窗口缓存的开头移除。Flink 不对窗口中元素的顺序做任何保证,也就是说,即使 evictor 从窗口缓存的开头移除一个元素,这个元素也不一定是最先到达窗口的。

DeltaEvictor: 接收 DeltaFunction 和 threshold 参数,计算最后一个元素与窗口缓存中所有元素的差值, 并移除差值大于或等于 threshold 的元素。

TimeEvictor: 接收 interval 参数,以毫秒表示。 它会找到窗口中元素的最大 timestamp max_ts 并移除比 max_ts - interval 小的所有元素。

如果以上默认的Evictor剔除器不满足我们业务情况,还可以通过实现Evictor接口自定义窗口数据剔除规则。

窗口聚合函数(Window Functions)

Flink中提供了三种类型的窗口函数:

ReduceFunction
AggregateFunction
ProcessWindowFunction

在Flink中还有一个过时的窗口函数WindowFunction,该窗口函数完全可以由ProcessWindowFunction替代,虽然在当下Flink版本中还可以使用,在未来Flink版本中该函数将会被移除。

以上窗口函数中又可以分为两大类:增量聚合函数和全量聚合函数。

增量聚合函数:

ReduceFunction
AggregateFunction

可以在每条数据到达窗口后基于中间状态结果进行增量计算,所以执行起来更加高效,这种窗口函数只缓存中间结果状态不需要缓存所有数据;

全量聚合函数:

ProcessWindowFunction
WindowFunction

会将窗口内的数据缓存起来组成Iterable集合,当窗口结束时统一全量聚合,由于需要缓存窗口所有数据所以执行效率不如增量聚合函数效率高。

reduce-ReduceFunction

对WindowStream可以通过ReduceFunction增量聚合函数定义如何处理窗口数据的逻辑,ReduceFunction指定两条输入数据如何合并在一起产生一条输出数据,输入数据和输出数据类型必须相同。

示例代码:

DataStream<Tuple2<String, Long>> input = ...;
input.keyBy(<key selector>)
.window(<window assigner>)
 .reduce(new ReduceFunction<Tuple2<String, Long>>() {
      public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
        return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
      }
});

aggregate-AggregateFunction

和ReduceFunction相似,AggregateFunction也是基于中间状态计算结果的增量计算函数,但AggregateFunction在窗口计算上更加通用,AggregateFunction接口相对ReduceFunction更加灵活,实现复杂度也相对较高。

示例代码:

private static class AverageAggregate
implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
  @Override
  public Tuple2<Long, Long> createAccumulator() {
    return new Tuple2<>(0L, 0L);
  }
  @Override
  public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
    return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
  }
  @Override
  public Double getResult(Tuple2<Long, Long> accumulator) {
    return ((double) accumulator.f0) / accumulator.f1;
  }
  @Override
  public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
    return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
  }}

DataStream<Tuple2<String, Long>> input = ...;
input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .aggregate(new AverageAggregate());

使用AggregateFunction接口时需要指定三种类型:输入数据类型(IN)、累加器类型(ACC)、输出数据类型(OUT),输入数据类型和输出数据类型对应AggregateFunction输入和输出的数据类型,这里ACC 累加器类型实际上是进行数据增量聚合计算时的中间状态。在AggregateFunction接口中定义了四个需要复写的方法,如下:

createAccumulator():创建累加器并初始化累加器,该累加器即是参与整个增量计算的中间状态。

add(Event,Acc):窗口中每有一条数据会调用一次add方法,在add方法中可以设置进入元素和累加器如何进行聚合的逻辑。

getResult(Acc):从累加器状态中返回最后结果。

merge(Acc,Acc):在窗口合并场景下,将两个窗口的累加器合并为一个累加器结果并返回,例如在SessionWindow中,每条数据对应一个窗口,当两条数据时间差没有超过gap时间时,会自动合并为一个窗口,在这种情况下merge()方法会被调用。

此外,需要注意的是WindowStream可以直接调用min/minBy/max/maxBy/sum等方法实现聚合功能,这些方法的底层都是通过AggregateFunction接口实现。

process-ProcessWindowFunction

ProcessWindowFunction是全量窗口函数,可以获取包含窗口内的所有元素的Iterable、Window、数据Key、Context上下文等信息,由于该函数是全量窗口函数,所以在窗口触发前需要缓存窗口所有数据。

示例代码:

DataStream<Tuple2<String, Long>> input = ...;
input
  .keyBy(t -> t.f0)
  .window(TumblingEventTimeWindows.of(Time.minutes(5)))
  .process(new MyProcessWindowFunction());

/* ... */
public class MyProcessWindowFunction 
extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {
  @Override
  public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
    long count = 0;
    for (Tuple2<String, Long> in: input) {
      count++;
    }
    out.collect("Window: " + context.window() + "count: " + count);
}
}

apply-WindowFunction(已过时)

WindowFunction也是全量窗口函数,与ProcessWindowFunction类似,但没有提供Context上下文对象,功能没有ProcessWindowFunction强大,可以完全使用ProcessWindowFunction替代WindowFunction,Flink1.17版本WindowFunction不建议再使用,未来Flink版本可能会被弃用。

可以对WindowStream调用apply方法传入WindowFunction,使用方式如下:

DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
    .apply(new MyWindowFunction());

public class MyWindowFunction 
extends WindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {
  @Override
  public void apply(String key, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
    ... ...
}
}

允许延迟(Allowed Lateness)

watermark是一种衡量事件时间进展的机制,对Flink乱序流设置窗口后,窗口触发时机也是以watermark时间为基准,大多数情况下我们会考虑Flink流大部分数据乱序程度来设置watermark,但在某些情况下数据可能延时会非常严重,即使通过Watermark机制也无法等到数据全部进入窗口再进行处理。一个窗口触发后,流中再来属于该窗口的数据时,Flink中默认会将这些迟到的数据做丢弃处理。

有些时候用户希望即使数据延迟到达的情况下,也能够正常按照流程处理并输出结果,此时就需要使用Allowed Lateness(允许延迟)机制来对迟到的数据进行额外的处理。

Flink Allowed Lateness机制可以通过allowedLateness方法来指定一个延迟时间,窗口触发后,在该延迟时间范围内不会被销毁,属于该窗口的延迟数据到达后依然可以触发窗口计算,直到watermark推进到“窗口结束时间+延迟时间”后,再销毁该窗口,后续再有迟到“非常严重”的数据默认丢弃

Allowed Lateness机制使用方式如下:

DataStream<T> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
    .<windowed transformation>(<window function>);

假设现在每隔10分钟设置一个滚动窗口,如果watermark延迟时间设置2分钟,同时为了保证迟到数据正确计算到对应窗口中我们设置allowedLateness允许延迟时间为3分钟(allowedLateness(Time.minutes(3))),若一个正常窗口时间范围为[08:00:00~08:10:00) ,当watermark达到08:10:00时,该窗口会触发执行(此刻Flink最大事件时间为08:12:00),由于设置了AllowedLateness,所以该窗口触发执行后不会被销毁,而是会延长3分钟销毁,当watermark达到08:13:00时(Flink接收到最大的事件时间为08:15:00),该窗口才会别真正销毁,期间属于该窗口内的数据每来一条都会重新触发窗口的执行。

watermark设置的延迟时间(2分钟)是针对Flink流中绝大多数数据乱序情况而设置、解决的是数据流乱序问题(也是一种数据迟到问题)

而allowedLateness机制是在乱序的基础上再等一等“迟到更严重”的数据,保证这些数据能正确计算到对应的窗口内。

侧流输出迟到数据(sideOutputLateData)

即使设置了AllowedLateness机制后,当watermark达到“窗口触发时间+allowedLateness允许延迟时间”后,再有属于该窗口数据达到,这些数据依然会被丢弃,也就是说设置AllowedLateness机制并不能保证Flink可以正确处理“迟到”非常严重的事件。

这种情况下,如果我们不希望丢弃这些“迟到”非常严重的事件而是将这些数据单独保留后续进行处理,这就需要使用Flink中SideOutput(侧输出流)机制。

针对窗口流可以调用sideOutputLateData(lateOutputTag)方法来标记迟到严重的数据,然后使用getSideOutput(lateOutputTag)获取lateOutputTag标签对应的迟到严重的数据,之后转成独立的DataStream数据集进行处理,这个过程需要创建late-data的OutputTag,然后通过该标签从数据流中将迟到数据筛选出来。

final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};

DataStream<T> input = ...;
SingleOutputStreamOperator<T> result = input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.sideOutputLateData(lateOutputTag)
.<windowed transformation>(<window function>);

DataStream<T> lateStream = result.getSideOutput(lateOutputTag);

事件时间下的流关联

Union合并

Union可以对相同类型的数据流进行合并,输出一个新的DataStream数据流,合并之后的数据流包含两个或多个流中所有元素,并且数据类型不变。

当引入事件时间和watermark后,两个流各自会有对应的watermark,两流合并后Flink会以两流中watermark小的为基准确定watermark。

Connect合并

connect算子将两个输入的DataStream数据流作为参数,将两个不同数据类型的DataStream数据流连接在一起,生成一个ConnectedStreams对象作为结果,与union算子不同,union只是简单的将两个类型一样的流合并在一起,而connect算子可以将不同类型的DataStream连接在一起,并且connect只能连接两个流。

与Union一样,当引入事件时间和watermark后,多个流各自会有对应的watermark,多流合并后Flink会以多流中watermark小的为基准确定watermark。

Window Join关联

在 Flink 中,Window Join 是一种流处理操作,用于将两个流中的元素基于窗口分组并进行关联。

Window Join 可以在事件时间或处理时间上进行操作,并提供了不同类型的窗口(如滚动窗口、滑动窗口、会话窗口)来控制关联操作的时间范围。

Window Join 的基本思想是根据指定的关联条件将流中的元素进行分组,并将同一个窗口中的元素进行关联操作,实现类似SQL中两表Join内关联的效果(Select a.id,a.name,a.age,b.score from a join b on a.id = b.id),其使用方式如下:

dataStream.join(otherStream)
.where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new JoinFunction () {...});

Window Join 的工作流程如下:

1.从不同的输入流中读取数据,提取两流中关联条件数据。

2.设置适当的窗口类型(如滚动窗口、滑动窗口、会话窗口),并定义窗口的大小和滑动间隔,根据窗口的时间范围对数据进行分组。

3.在每个窗口中将相应窗口关联的元素按照自定义的关联操作(JoinFunction)进行处理生成最终结果

Flink 提供了丰富的 API 和内置函数来实现 Window Join 操作,可以根据需求选择不同的窗口类型、触发器、时间语义等来进行定制化的操作。此外,Window Join也支持处理迟到数据(Late Data)的机制,以应对数据延迟到达或乱序的情况。

在 Window Join 中,如果使用事件时间(Event Time)进行关联,Flink 会选择两个输入流中watermark较小的作为基准来触发窗口操作。

Interval Join关联

Interval Join 这种流处理操作用于在指定的时间区间内关联两个或多个输入流的元素,与 Window Join 不同,Interval Join 基于时间区间而不是固定的窗口来进行关联操作,这使得 Interval Join 更适用于需要根据时间区间进行灵活关联的场景。

编写代码时使用Interval Join方式如下:

keyedStream.intervalJoin(otherKeyedStream)
.between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound
.upperBoundExclusive(true) // optional
.lowerBoundExclusive(true) // optional
    .process(new IntervalJoinFunction() {...});

Flink中IntervalJoin仅支持EventTime事件时间,且只能基于KeyedStream使用IntervalJoin,两流中只有相同的key且符合时间范围关系的数据会被关联,between方法中第一个参数指定的是lower bound的值,第二个参数指定的是upper bound的值,两个数据流数据进行关联时是包含lower bound 和upper bound边界数据,如果不行包含时间边界数据可以通过upperBoundExclusive(true)和lowerBoundExclusive(true)方法去掉边界,两流关联时就不会关联边界的数据。

Window Cogroup关联

Flink的Window Cogroup是一种强大且灵活的流处理操作,与Window Join类似,可以根据窗口内的关联条件将多个输入流的元素进行分组和关联,并根据自定义的逻辑输出结果,只需要在使用时调用.coGroup()方法来代替.join()方法即可

代码示例:

dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new CoGroupFunction () {...});

在Window Cogroup中,也是通过where和equalTo分别指定两流中关联的条件,并设置窗口,最后调用apply方法传入CoGroupFunction接口重写其coGroup方法,在该方法内来处理两流在相同窗口内关联的数据。CoGroupFunction接口如下:

new CoGroupFunction<IN1, IN2, OUT>() {
    @Override
    public void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<OUT> out) throws Exception {
        out.collect(first + "=====" + second);
    }
}

与Window Join不同的是,Window Cogroup中coGroup方法的参数是两流对应可遍历的数据集合,如上所示,first是第一个流中在某个窗口中输入的所有数据,second是第二个流在对应窗口内输入的所有数据,这里的first和second集合不仅仅是相同窗口中根据关联条件匹配的数据,而是两流在对应窗口的所有数据,可以根据自己的需求实现类似SQL中的内连接(inner join)、左外连接(left outer join)、右外连接(right outer join)和全外连接(full outer join)等不同类型的关联操作。

实际上Window Join底层的实现也是基于Window Cogroup来完成的,Window Cogroup相比于Window Join更适用于各种复杂的业务场景。

作者 张, 宴银

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注