学习资料 – 第九章 Flink Table API与SQL (一) 枫叶云笔记 (fynote.com)
通过采用Flink的Table API和SQL编程方式,我们极大地降低了编程难度。在实际企业开发中,强烈建议采用这种方式来进行数据分析处理。
TableEnvironment
TableEnvironment是Table API和SQL查询的核心概念,主要负责注册表、注册外部Catalog、执行SQL查询、注册自定义函数以及DataStream和Table之间的转换。
创建TableEnvironment有两种方式
1) 通过TableEnvionment.create()创建
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode() //.inBatchMode()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
inStreamingMode指的是Flink流式处理,inBatchMode()指的是Flink批处理。
2) 通过现有的StreamExecutionEnvironment创建
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
创建表
表标识符(identifier)
Flink中创建的表由三部分组成:Catalog名称、数据库名称以及表名称。
Catalog是元数据管理中心(元数据包含数据库、表、表结构等信息),如果catalog或者数据库没有指明,就会使用默认值,Flink中默认catalog为default_catalog,默认database为default_database,例如在Flink中创建一张表tbl,该表全部名称为:default_catalog.default_database.tbl。用户可以在Flink程序中执行一个catalog和数据库作为“当前catalog”和“当前数据库”,这样创建表时只需要指定表名即可。
// 创建TableEnvironment
val tEnv: TableEnvironment = ...
// 指定使用的catalog和数据库
tEnv.useCatalog("custom_catalog")
tEnv.useDatabase("custom_database")
// 创建 Table对象
val table: Table = ...
// 注册名为 'exampleView' 的视图,该视图全称为'custom_catalog'.'custom_database'.'exampleView'
tableEnv.createTemporaryView("exampleView", table)
就算在Flink程序中指定了默认的Catalog和数据库,也可以创建属于其他Catalog或者数据库的表,如下:
// 接以上代码,在Flink中注册名称为'exampleView'的视图,该视图所属数据库为'other_database',所属catalog为'custom_catalog'
tableEnv.createTemporaryView("other_database.exampleView", table)
// 接以上代码,注册名称为'example.View'的视图,该视图所属catalog为'custom_catalog',所属数据库为'custom_database'
tableEnv.createTemporaryView("`example.View`", table)
// 接以上代码,注册名称为'exampleView'的视图,该视图所属catalog为'other_catalog',所属数据库为'other_database'
tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table)
注意:Flink表名遵循SQL标准,如果Flink中注册的表有“.”、“-”等特殊符号时,需要使用反引号(`)进行转义。
表类型
Flink中创建的表分为永久表(Permanent Table)和临时表(Temporary Table)两种。
永久表需要catalog维护表和元数据(例如Hive Metastore),一旦永久表被创建,它将对任何连接到Catalog的Flink会话可见且持续存在,直至该表被明确删除。
临时表通常保存于内存中并且仅在创建它们的Flink会话(Session)持续期间存在,这些表对于其他Flink会话不可见。
创建表方式
虚拟表创建方式
基于已有的Table对象创建虚拟表的方式如下:
// 创建TableEnvironment对象
TableEnvironment tableEnv = ...;
// 通过操作创建Table对象
Table projTable = tableEnv.from("X").select(...);
// 将projTable Table对象注册为名为"projectedTable"的表
tableEnv.createTemporaryView("projectedTable", projTable);
以上基于已有Table对象创建的表类似于数据库中的View视图,该表并不会存储对应Table对象的结果,当后续Flink操作引用该注册的表时,对应的Table对象会内嵌到对应查询中被执行,当多次引用该注册的表示,那么该Table对象会被内嵌到每个查询中并被执行多次。
常规表创建方式
常规表时直接读取外部数据注册为表。在Flink中创建常规表可以通过connector声明,Connector描述了存储表数据的外部系统,例如常见的Kafka、文件系统都可以通过这种方式来创建Flink表。Connector声明创建表时又可以通过Table API方式声明创建或者通过Flink SQL DDL语句来创建,建议使用Flink SQL DDL语句来创建。
1) Table API方式创建Flink表
// 创建TableDescriptor对象
TableDescriptor sourceDescriptor = TableDescriptor.forConnector("datagen")
.schema(Schema.newBuilder()
.column("f0", DataTypes.STRING())
.build())
.option(DataGenConnectorOptions.ROWS_PER_SECOND, 100L)
.build();
// 通过Table API方式创建表
tableEnv.createTable("SourceTableA", sourceDescriptor);
tableEnv.createTemporaryTable("SourceTableB", sourceDescriptor);
2) SQL DDL方式创建Flink表
// 通过SQL DDL方式创建表
tableEnv.executeSql("CREATE [TEMPORARY] TABLE MyTable (...) WITH (...)");
由于通过SQL DDL方式创建Flink表相比于Table API方式创建表要简单的多,所以在实际工作中建议直接使用SQL DDL方式创建Flink表。
查询表
Table API 查询表
//通过Table API 获取Table对象
Table stationLogTbl = tableEnv.from("stationlog_tbl");
//过滤通话成功并且通话时长大于10的数据信息
Table resultTbl = stationLogTbl .filter($("call_type").isEqual("success").and($("duration").isGreater(10))) .groupBy($("sid"))
.select($("sid"),$("duration").sum().as("total_duration"));
SQL查询表
Flink SQL是建立在Apache Calcite的基础上,实现了SQL标准。通过Flink SQL,我们可以使用常规字符串针对Flink批数据或者流数据来指定SQL查询。
Table resultTbl = tableEnv.sqlQuery("" +
"select sid,sum(duration) as total_duration " +
"from stationlog_tbl " +
"where call_type='success' and duration>10 " +
"group by sid");
输出表
Table API输出表
tableEnv.createTemporaryTable("CsvSinkTable", TableDescriptor.forConnector("filesystem") .schema(Schema.newBuilder()
.column("sid", DataTypes.STRING())
.column("call_out", DataTypes.STRING())
.column("call_in", DataTypes.STRING())
.column("call_type", DataTypes.STRING())
.column("call_time", DataTypes.BIGINT())
.column("duration", DataTypes.BIGINT())
.build())
.option("path", "file:///D:\\data\\flink\\output")
//设置检查生成文件的频率,每2秒检查一次,默认1分钟
.option("sink.rolling-policy.check-interval", "2s")
//设置文件滚动策略,每10秒生成一个文件,默认30分钟
.option("sink.rolling-policy.rollover-interval", "10s") .format(FormatDescriptor.forFormat("csv")
.option("field-delimiter", "|")
.build())
.build());
SQL输出表
tableEnv.executeSql("" +
"create table CsvSinkTable (" +
" sid string," +
" call_out string," +
" call_in string," +
" call_type string," +
" call_time bigint," +
" duration bigint" +
") with (" +
" 'connector' = 'filesystem'," +
" 'path' = 'file:///D:/data/flinkdata/result'," +
" 'sink.rolling-policy.check-interval' = '2s'," +
" 'sink.rolling-policy.rollover-interval' = '10s'," +
" 'format' = 'csv'," +
" 'csv.field-delimiter' = '|'" +
")");
//SQL方式将数据写入文件系统
tableEnv.executeSql("" +
"insert into CsvSinkTable " +
"select sid,call_out,call_in,call_type,call_time,duration " +
"from stationlog_tbl ");
DataStream转换成Table
DataStream 转换成Table操作有三种方式,可以通过TableEnvironment对象调用对应方法来实现:
1.TableEnvironment.fromDataStream(...)
2.TableEnvironment.createTemporaryView(...)
3.TableEnvironment.fromChangelogStream(...)
Table转换成DataStream
Table转换成DataStream操作有两种方式,可以通过TableEnvironment对象调用对应方法来实现:
1.TableEnvironment.toDataStream(...)
2.TableEnvironment.toChangelogStream(...)
tblEnv.toDataStream(…)只支持对insert-only(插入流)对应的Table进行转换
tblEnv.toChangelogStream(…)可以对changelog stream(更新日志流)对应的Table进行转换
Table及SQL流式概念
Table API中的状态
在Flink Table和SQL编程中也有状态数据的维护,例如通过Table API统计每个基站通话次数时,Flink就需要维护状态保存通话次数。所以在Flink Table API和SQL编程中也可以像DataStream API编程中一样设置状态后端、checkpoint以及Savepoint,当程序发生故障时基于保存的状态数据进行程序恢复。
状态使用
Flink Table API和SQL 程序是声明式的,在其计算过程中涉及到的状态并不会像DataStream API中容易获取,Flink Planner优化器会自动决定是否需要状态来计算正确结果。如,普通的select … from….where这种查询是没有状态的,对于表的Join、聚合、去重操作需要保存中间结果,这种情况就需要状态。此外,我们还可以通过TableEnvironment来设置table.exec.state.ttl参数,决定状态保存的时长。
状态注意点
在Flink Table API和SQL编程中使用状态需要注意如下几点:
1.关于Table参数的设置需要通过TableEnvironment来设置,例如:状态保存时长,关于Checkpoint的设置可以通过StreamEnvironment来设置。
2.目前Flink版本进行Savepoint恢复程序时需要保证Flink版本以及代码中查询语句不能改变,否则无法从保存的状态中恢复程序。
动态表(Dynamic Table)
动态表通过将流式数据转换为“插入流”(Insert-Only Streams)或“更新日志流”(Changelog Streams)的形式来实现数据的持续更新。
当数据流到来时,动态表会连续查询(Continuous Query),持续处理流式的INSERT、UPDATE和DELETE操作,并根据这些操作来实时更新查询结果。

Table API
在实际开发中Table API没有SQL编程方式简便,建议在开发中使用SQL编程方式.
基础操作
1) From
From与SQL中的From子句类似,可以对一个注册的表进行扫描得到Table对象,适用于批和流处理。
Table orders = tableEnv.from("Orders");
2) FromValues
FromValues和SQL查询中的VALUES 子句类似,可以基于给定的行生成一张表,可以通过row(…)表达式创建复合行,适用于批和流处理。
Table table = tEnv.fromValues(
row(1, "ABC"),
row(2L, "ABCDE")
);
3) Select&As
Select和SQL中的Select子句类似,执行一个Select查询,执行查询后可以通过as方法重命名字段,适用于批和流处理。
Table orders = tableEnv.from("Orders");
Table result = orders.select($("a"), $("c").as("d"));
4) Where&filter
Where和SQL中的Where子句类似,用来过滤数据,适用于批和流处理。
Table orders = tableEnv.from("Orders");
Table result = orders.where($("b").isEqual("red"));
5) Distinct
Distinct和SQL中的Distinct子句类似,对数据进行去重,适用于批和流处理。
Table orders = tableEnv.from("Orders");
Table result = orders.distinct();
6) InsertInto
InsertInto和SQL查询中的Insert into 子句类似,可以将Table数据写出到注册的输出表中。已注册表的Schema必须与查询中的schema相匹配,适用于批和流处理。
Table orders = tableEnv.from("Orders");
orders.insertInto("OutOrders").execute();
7) AddColumns
AddColumns可以对Table添加字段,添加字段名称不能已经存在于Table中,适用于批和流处理。
Table orders = tableEnv.from("Orders");
Table result = orders.addColumns(concat($("c"), "sunny"));
8) AddOrReplaceColumns
AddOrReplaceColumns执行字段添加操作。 如果添加的列名称和已存在的列名称相同,则已存在的字段将被替换。 此外,如果添加的字段里面有重复的字段名,则会使用最后一个字段,适用于批和流处理。
Table orders = tableEnv.from("Orders");
Table result = orders.addOrReplaceColumns(concat($("c"), "sunny").as("desc"));
9) DropColumns
删除列操作,适用于批和流处理。
Table orders = tableEnv.from("Orders");
Table result = orders.dropColumns($("b"), $("c"));
10) RenameColumns
对字段进行重命名操作, 字段表达式应该是别名表达式,并且仅当字段已存在时才能被重命名,适用于批和流处理。
Table orders = tableEnv.from("Orders");
Table result = orders.renameColumns($("b").as("b2"), $("c").as("c2"));
表连接操作
1) Union
Union和SQL中的union子句类似,可以对两张表进行合并,两张表必须有相同的字段类型,Union会自动删除两张表中重复的记录,只支持批操作,不支持流操作。
Table left = tableEnv.from("orders1");
Table right = tableEnv.from("orders2");
left.union(right);
2) UnionAll
UnionAll和SQL 中Union All类似,可以对两张表进行合并,两张表必须有相同的字段类型,与Union不同,该操作不会删除两张表中重复的记录。批流都支持。
Table left = tableEnv.from("orders1");
Table right = tableEnv.from("orders2");
left.unionAll(right);
3) Intersect
Intersect和SQL中Intersect子句类似,可以对两张表做连接,返回两张表中都存在的记录,要求两张表具有相同的字段类型。如果一条记录在一张或者两张表中存在多次,则只返回一条记录。返回的结果中不存在重复记录。只支持批操作。
Table left = tableEnv.from("orders1");
Table right = tableEnv.from("orders2");
left.intersect(right);
4) IntersectAll
IntersetcAll和SQL中Intersect All子句类似,可以对两张表做关联,返回两张表中都存在的记录,要求两张表具有相同的字段类型。如果一条记录在两张表中出现多次,那么也会返回到结果中,结果表中会有重复记录。只支持批操作。
Table left = tableEnv.from("orders1");
Table right = tableEnv.from("orders2");
left.intersectAll(right);
5) Minus
Minus和SQL中EXCEPT子句类似,Minus可以对两张表关联,返回左标中存在且右表中不存在的记录,要求两张表具有相同的字段类型。左表中重复记录只返回一次,结果表中不会存在重复记录。只支持批操作。
Table left = tableEnv.from("orders1");
Table right = tableEnv.from("orders2");
left.minus(right);
6) MinusAll
MinusAll与SQL中EXCEPT ALL子句类似,MinusAll可以对两张表关联,返回右表中不存在的记录,如果一条相同数据在左表中出现 n 次且在右表中出现 m 次的记录,在结果表中出现 (n – m) 次。要求两张表具有相同的字段类型。只支持批操作。
Table left = tableEnv.from("orders1");
Table right = tableEnv.from("orders2");
left.minusAll(right);
7) In
In和SQL中In子句类似,如果表达式的值存在于给定表的子查询中,那么In子句返回true。子查询必须由一列组成,这个列必须与表达式具有相同的数据类型。批流操作都支持。
Table left = tableEnv.from("Orders1")
Table right = tableEnv.from("Orders2");
Table result = left.select($("a"), $("b"), $("c")).where($("a").in(right));
Order By 操作
1) order by
OrderBy和SQL中Order by 子句类似,返回跨所有并行分区的全局有序记录,对于无界表,该操作可以对时间属性进行排序或进行后续的fetch操作,适用于批和流处理。
Table result = tab.orderBy($("a").asc());
2) offset&Fetch
Offset和Fetch类似于SQL中的OFFSET 和FETCH子句,Offset操作根据偏移位置来限定已排序的结果集,Fetch操作将排序的结果集限制为前n行。在无界表中offset操作需要和fetch一起使用,适用于批和流处理。
// 从已排序的结果集中返回前5条记录
Table result1 = in.orderBy($("a").asc()).fetch(5);
// 从已排序的结果集中返回跳过3条记录之后的所有记录
Table result2 = in.orderBy($("a").asc()).offset(3);
// 从已排序的结果集中返回跳过10条记录之后的前5条记录
Table result3 = in.orderBy($("a").asc()).offset(10).fetch(5);
Over Window操作
Table table = input
// 定义开窗函数并将窗口赋值给一个别名 w
.window(Over
.partitionBy($("a"))
.orderBy($("rowtime"))
.preceding(UNBOUNDED_RANGE)
.following(CURRENT_RANGE)
.as("w")
)
//基于窗口w上进行聚合统计
.select(
$("a"),
$("b").sum().over($("w")),
$("c").min().over($("w"))
);
Ø Partition By
可选项,在一个或者多个属性上定义输入的分组,每个分组单独排序,聚合函数分别应用于每个分组。在Flink流式处理中,如果不指定partition by 而设置Over开窗函数,则Flink所有数据分到一个分组中,并由一个并行度处理。
Ø Order By
必选项,定义每个分组内行的排序,从而定义聚合函数应用于行的顺序,可以升序,可以降序。对于Flink流式处理,order by 必须指定事件时间或者处理时间属性列,目前,仅支持单个排序属性。
Ø Preceding
可选项,定义了包含在窗口中并位于当前行之前的行间隔。间隔可以是时间间隔或行计数间隔,默认为时间间隔。
对于有界数据设置Over开窗函数时,preceding可以设置时间间隔或者行计数间隔,设置方式如下:
// 有界的事件时间 over window(假定有一个叫“rowtime”的事件时间属性)
.window(Over.partitionBy($("a")).orderBy($("rowtime")).preceding(lit(1).minutes()).as("w"));
// 有界的处理时间 over window(假定有一个叫“proctime”的处理时间属性)
.window(Over.partitionBy($("a")).orderBy($("proctime")).preceding(lit(1).minutes()).as("w"));
// 有界的事件时间行数 over window(假定有一个叫“rowtime”的事件时间属性)
.window(Over.partitionBy($("a")).orderBy($("rowtime")).preceding(rowInterval(10)).as("w"));
// 有界的处理时间行数 over window(假定有一个叫“proctime”的处理时间属性)
.window(Over.partitionBy($("a")).orderBy($("proctime")).preceding(rowInterval(10)).as("w"));
对于无界数据设置Over开窗函数时,preceding通过常量来指定。例如,用UNBOUNDED_RANGE指定时间间隔或用UNBOUNDED_ROW指定行计数间隔,设置方式如下:
// 无界的事件时间 over window(假定有一个叫“rowtime”的事件时间属性)
.window(Over.partitionBy($("a")).orderBy($("rowtime")).preceding(UNBOUNDED_RANGE).as("w"));
// 无界的处理时间 over window(假定有一个叫“proctime”的处理时间属性)
.window(Over.partitionBy($("a")).orderBy("proctime").preceding(UNBOUNDED_RANGE).as("w"));
// 无界的事件时间行数 over window(假定有一个叫“rowtime”的事件时间属性)
.window(Over.partitionBy($("a")).orderBy($("rowtime")).preceding(UNBOUNDED_ROW).as("w"));
// 无界的处理时间行数 over window(假定有一个叫“proctime”的处理时间属性)
.window(Over.partitionBy($("a")).orderBy($("proctime")).preceding(UNBOUNDED_ROW).as("w"));
如果在设置Over开窗函数时不指定preceding,Preceding默认值为UNBOUNDED_RANGE。
Ø Following
可选项,定义包含在窗口中并在当前行之后的行间隔。间隔必须与Preceding指定的单位相同,要么都是时间间隔,要么都是行计数间隔。
当指定了Following参数后,其与Preceding指定的参数组成整个开窗函数的窗口范围。特别需要注意的是目前Flink批和流处理中暂不支持FOLLOWING 往后指定时间间隔或者行计数间隔,只能设置两个值:CURRENT_ROW/CURRENT_RANGE,时间间隔窗口的上限为CURRENT_RANGE,行计数间隔窗口的上限为CURRENT_ROW。如果该值不设置,针对时间间隔窗口或者行计数间隔窗口以上两个值分别为默认值。
Ø as
必须项,为Over开窗函数指定别名,别名用于在之后的select子句中方便引用该窗口。值得注意的是目前在同一个select()中调用的所有聚合函数必须基于同一个Over窗口上计算。
聚合操作
1) GroupBy分组聚合
GroupBy和SQL中Group By子句一样,按照某列对数据进行分组,往往group by 后会跟上聚合算子按照分组对数据进行聚合。Group By适用于批和流。
Table orders = tableEnv.from("Orders");
Table result = orders.groupBy($("a")).select($("a"), $("b").sum().as("d"));
注意:对于流处理,聚合得到的结果会存储在状态中,随着数据的实时流入,状态会越来越大,最好设置状态保存的时效,避免状态无限扩大。
2) GroupBy Window 聚合
GroupBy Window指的是设置window窗口后,必须跟上GroupBy算子按照窗口进行分组聚合。GroupBy Window适用于批和流。
Table orders = tableEnv.from("Orders");
Table result = orders
// 定义窗口
.window(Tumble.over(lit(5).minutes()).on($("rowtime")).as("w"))
// 按窗口和键分组
.groupBy($("a"), $("w"))
// 访问窗口属性并聚合
.select(
$("a"),
$("w").start(),
$("w").end(),
$("w").rowtime(),
$("b").sum().as("d")
);
3) Over Window聚合
Table orders = tableEnv.from("Orders");
Table result = orders
// 定义窗口
.window(
Over
.partitionBy($("a"))
.orderBy($("rowtime"))
.preceding(UNBOUNDED_RANGE)
.following(CURRENT_RANGE)
.as("w"))
// 滑动聚合
.select(
$("a"),
$("b").avg().over($("w")),
$("b").max().over($("w")),
$("b").min().over($("w"))
);
4) Distinct 聚合
这里说的Distinct 聚合与SQL中Count(distinct a)聚合子句类似,在Flink Table中Distinct可以应用于GroupBy 聚合、GroupBy Window聚合、Over Window 聚合。可以对批和流处理使用。
Table orders = tableEnv.from("Orders");
// 按属性分组后的的互异(互不相同、去重)聚合
Table groupByDistinctResult = orders
.groupBy($("a"))
.select($("a"), $("b").sum().distinct().as("d"));
// 按属性、时间窗口分组后的互异(互不相同、去重)聚合
Table groupByWindowDistinctResult = orders
.window(Tumble
.over(lit(5).minutes())
.on($("rowtime"))
.as("w")
)
.groupBy($("a"), $("w"))
.select($("a"), $("b").sum().distinct().as("d"));
// over window 上的互异(互不相同、去重)聚合
Table result = orders
.window(Over
.partitionBy($("a"))
.orderBy($("rowtime"))
.preceding(UNBOUNDED_RANGE)
.as("w"))
.select(
$("a"), $("b").avg().distinct().over($("w")),
$("b").max().over($("w")),
$("b").min().over($("w"))
);
Join操作
1) Inner Join
Inner Join类似SQL中的Inner Join,用于关联两张表,两张表字段名不能存在相同,并且必须通过join算子或者使用where/filter算子定义至少一个join等式连接条件。可以对批和流处理使用。
Table left = tableEnv.from("MyTable").select($("a"), $("b"), $("c"));
Table right = tableEnv.from("MyTable").select($("d"), $("e"), $("f"));
Table result = left.join(right)
.where($("a").isEqual($("d")))
.select($("a"), $("b"), $("e"));
2) Outer Join
Outer Join类似SQL LEFT/RIGHT/FULL outer join 子句,用于关联两张表,两张表字段名不能存在相同,并且必须定义至少一个等式连接条件。可以对批和流处理使用。
Table left = tableEnv.from("MyTable").select($("a"), $("b"), $("c"));
Table right = tableEnv.from("MyTable").select($("d"), $("e"), $("f"));
Table leftOuterResult = left.leftOuterJoin(right, $("a").isEqual($("d")))
.select($("a"), $("b"), $("e"));
Table rightOuterResult = left.rightOuterJoin(right, $("a").isEqual($("d")))
.select($("a"), $("b"), $("e"));
Table fullOuterResult = left.fullOuterJoin(right, $("a").isEqual($("d")))
.select($("a"), $("b"), $("e"));
3) Interval Join
Interval Join 类似 DataStream API中Interval Join可以在指定时间区间内关联两个流数据。Flink Table API中的Interval Join也是基于时间区间进行关联,使用时至少需要一个equal join关联条件和一个限制两个流关联时间范围的条件,限制时间范围的条件可以基于两流ProcessTime/EventTime来定义。Interval Join可以对批和流处理使用。
Table left = tableEnv.from("MyTable").select($("a"), $("b"), $("c"), $("ltime"));
Table right = tableEnv.from("MyTable").select($("d"), $("e"), $("f"), $("rtime"));
Table result = left.join(right)
.where(
and(
$("a").isEqual($("d")),
$("ltime").isGreaterOrEqual($("rtime").minus(lit(5).minutes())),
$("ltime").isLess($("rtime").plus(lit(10).minutes()))
))
.select($("a"), $("b"), $("e"), $("ltime"));
时态Join(Temporal Join)
Flink Table API与SQL编程中创建时态表时需要指定主键约束和事件时间,只要定义一张表时包含事件时间和主键约束,那么这张表就是时态表,目前时态表函数仅支持通过Table API方式定义,不支持SQL DDL方式定义。
Table API 中可以基于普通表并指定时间属性和主键来定义时态表函数,后续在Table API中可以通过“.joinLateral(…)”方式使用时态表函数并查询版本数据。Table API中定义时态表函数及调用时态表函数查询数据方式如下:
#基于currency_rates普通表定义时态表函数
TemporalTableFunction rates = tEnv
.from("currency_rates")
#指定“update_time”为时间属性,“currency”为主键
.createTemporalTableFunction("update_time", "currency");
#创建和注册时态表函数,这样可以在Table API中使用rates函数
tEnv.createTemporarySystemFunction("rates", rates);
#通过joinLateral调用表函数关联表,查询版本数据
Table result = orders
.joinLateral(
call("rates", $("o_proctime")),
$("o_currency").isEqual($("r_currency")))
.select($("o_amount").sum().as("amount"));
在Table API中使用时态表时需要注意以下几个点:
1.创建时态表函数时需要指定时间属性和主键。时间属性格式必须为TIMESTAMP/TIMESTAMP_LTZ,指定主键主要是保证该时态表中数据能按照主键进行更新或删除。
2.时态表不能单独查询,需要通过与其他表进行Join关联查询,以上示例中通过joinLateral调用时态表函数,将时态表函数放在右侧,类似右表进行关联,左右表都需要设置watermark,两表关联条件中必须有主键。
3.目前仅支持类似Inner Join,即左右表关联上的数据才会输出,左表和右表关联不上的数据不会输出。
4.右表时态表必须为CDC(Change Data Capture,数据变更捕获)数据,即数据是有增删改查的UNBOUND实时流,不能是BOUND有边界的流。
5.在两表Join时,通过call方法调用时态表函数,时态表函数需要传入一个时间列,该列类型必须是TIMESTAMP/TIMESTAMP_LTZ类型,暂时不支持传入常量数据,一般这里传入的都是左表中的转换成TIMESTAMP/TIMESTAMP_LTZ类型的时间字段。
6.通过时态表函数查询数据时,需要注意时态表中只会存储上一个watermark到当前时刻的所有版本数据,watermark之前的数据不会存储。
windows操作
1) 滚动窗口 – Tumbling Windows
Table API中滚动窗口可以根据固定时间大小或者固定事件数量进行窗口划分,即滚动窗口可以定义在事件时间、处理时间或者行数上。使用方式如下:
// 基于事件时间的滚动窗口,rowtime为事件时间
.window(Tumble.over(lit(10).minutes()).on($("rowtime")).as("w"));
// 基于处理时间的滚动窗口,proctime为处理时间
.window(Tumble.over(lit(10).minutes()).on($("proctime")).as("w"));
// 基于行计数的滚动窗口
.window(Tumble.over(rowInterval(10)).on($("proctime")).as("w"));
2) 滑动窗口 – Sliding Windows
滑动窗口与滚动窗口类似,滑动窗口长度大小固定,使用滑动窗口时需要指定一个时间参数表示窗口长度(window size),既然是滑动窗口,那么同时还需要指定一个窗口滑动步长(window slide)来控制生成窗口的频率。
在Flink Table API中滑动窗口可以定义在事件时间、处理时间或行数上。
// 基于事件时间的滑动窗口
.window(Slide.over(lit(10).minutes())
.every(lit(5).minutes())
.on($("rowtime"))
.as("w"));
// 基于处理时间的滑动窗口
.window(Slide.over(lit(10).minutes())
.every(lit(5).minutes())
.on($("proctime"))
.as("w"));
// 基于行计数的滑动窗口
.window(Slide.over(rowInterval(10)).every(rowInterval(5)).on($("proctime")).as("w"));
3) 会话窗口 - Session Windows
会话窗口(Session Windows)主要是将某段时间内活跃度较高的数据聚合成一个窗口进行计算,窗口的触发的条件是Session Gap(会话间隔),是指在规定的时间内如果没有活跃数据接入,则认为窗口结束,然后触发窗口计算结果。
与滑动窗口、滚动窗口不同的是Session Windows不需要有固定windows size和slide time,只需要定义session gap,来规定不活跃数据的时间上限即可,此外,会话窗口不会相互重叠。
Session Windows 窗口类型比较适合非连续性数据处理或周期性产生数据场景。
在Flink Table API中会话窗口支持事件时间和处理时间。
// 基于事件时间的会话窗口
.window(Session.withGap(lit(10).minutes()).on($("rowtime")).as("w"));
// 基于处理时间的会话窗口
.window(Session.withGap(lit(10).minutes()).on($("proctime")).as("w"));