Temporal Join的使用条件



快照Join原理图解

快照Join Flink SQL语法

示例代码 temporal Join – inner / left Join
temporal Join – inner Join
package com.imooc.scala.sqljoin
import java.time.ZoneId
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
/**
* 时态/快照Join(Temporal Join)之 Inner Join
* Created by xuwei
*/
object TemporalJoin_InnerJoin {
def main(args: Array[String]): Unit = {
//创建执行环境
val settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build()
val tEnv = TableEnvironment.create(settings)
//指定国内的时区
tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))
//设置Source自动置为idle的时间--如果数据源可以一直正常产生数据,则不需要配置此参数!!!
tEnv.getConfig.set("table.exec.source.idle-timeout","10s")
//订单表--仅追加表
val OrdersTableSql =
"""
|CREATE TABLE orders(
| order_id BIGINT,
| price DECIMAL(32,2),
| currency STRING,
| ts BIGINT,
| order_time AS TO_TIMESTAMP_LTZ(ts,3),
| WATERMARK FOR order_time AS order_time
|)WITH(
| 'connector' = 'kafka',
| 'topic' = 'orders',
| 'properties.bootstrap.servers' = 'bigdata01:9092,bigdata02:9092,bigdata03:9092',
| 'properties.group.id' = 'gid-sql-orders',
| 'scan.startup.mode' = 'latest-offset',
| 'format' = 'json',
| 'json.fail-on-missing-field' = 'false',
| 'json.ignore-parse-errors' = 'true'
|)
|""".stripMargin
tEnv.executeSql(OrdersTableSql)
//汇率表-版本表
val CurrencyRatesTableSql =
"""
|CREATE TABLE currency_rates(
| currency STRING,
| rate DECIMAL(32,2),
| ts BIGINT,
| update_time AS TO_TIMESTAMP_LTZ(ts,3),
| WATERMARK FOR update_time AS update_time,
| PRIMARY KEY (currency) NOT ENFORCED
|)WITH(
| 'connector' = 'upsert-kafka',
| 'topic' = 'currency_rates',
| 'properties.bootstrap.servers' = 'bigdata01:9092,bigdata02:9092,bigdata03:9092',
| 'properties.group.id' = 'gid-sql-rates',
| 'key.format' = 'json',
| 'value.format' = 'json'
|)
|""".stripMargin
tEnv.executeSql(CurrencyRatesTableSql)
//结果表
val resTableSql =
"""
|CREATE TABLE orders_rates(
| order_id BIGINT,
| price DECIMAL(32,2),
| currency STRING,
| rate DECIMAL(32,2),
| order_time TIMESTAMP_LTZ(3)
|)WITH(
| 'connector' = 'kafka',
| 'topic' = 'orders_rates',
| 'properties.bootstrap.servers' = 'bigdata01:9092,bigdata02:9092,bigdata03:9092',
| 'format' = 'json',
| 'sink.partitioner' = 'default'
|)
|""".stripMargin
tEnv.executeSql(resTableSql)
//关联订单表和汇率表
val joinSql =
"""
|INSERT INTO orders_rates
|SELECT
| order_id,
| price,
| orders.currency,
| rate,
| order_time
|FROM orders
|INNER JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
|ON orders.currency = currency_rates.currency
|""".stripMargin
//关联订单表和汇率表--SQL中给表起别名
val joinSqlWithAs =
"""
|INSERT INTO orders_rates
|SELECT
| os.order_id,
| os.price,
| os.currency,
| cr.rate,
| os.order_time
|FROM orders AS os
|INNER JOIN currency_rates FOR SYSTEM_TIME AS OF os.order_time AS cr
|ON os.currency = cr.currency
|""".stripMargin
tEnv.executeSql(joinSql)
}
}
temporal Join – left Join
package com.imooc.scala.sqljoin
import java.time.ZoneId
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
/**
* 时态/快照Join(Temporal Join)之 Left Join
* Created by xuwei
*/
object TemporalJoin_LeftJoin {
def main(args: Array[String]): Unit = {
//创建执行环境
val settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build()
val tEnv = TableEnvironment.create(settings)
//指定国内的时区
tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))
//设置Source自动置为idle的时间--如果数据源可以一直正常产生数据,则不需要配置此参数!!!
tEnv.getConfig.set("table.exec.source.idle-timeout","10s")
//订单表--仅追加表
val OrdersTableSql =
"""
|CREATE TABLE orders(
| order_id BIGINT,
| price DECIMAL(32,2),
| currency STRING,
| ts BIGINT,
| order_time AS TO_TIMESTAMP_LTZ(ts,3),
| WATERMARK FOR order_time AS order_time
|)WITH(
| 'connector' = 'kafka',
| 'topic' = 'orders',
| 'properties.bootstrap.servers' = 'bigdata01:9092,bigdata02:9092,bigdata03:9092',
| 'properties.group.id' = 'gid-sql-orders',
| 'scan.startup.mode' = 'latest-offset',
| 'format' = 'json',
| 'json.fail-on-missing-field' = 'false',
| 'json.ignore-parse-errors' = 'true'
|)
|""".stripMargin
tEnv.executeSql(OrdersTableSql)
//汇率表-版本表
val CurrencyRatesTableSql =
"""
|CREATE TABLE currency_rates(
| currency STRING,
| rate DECIMAL(32,2),
| ts BIGINT,
| update_time AS TO_TIMESTAMP_LTZ(ts,3),
| WATERMARK FOR update_time AS update_time,
| PRIMARY KEY (currency) NOT ENFORCED
|)WITH(
| 'connector' = 'upsert-kafka',
| 'topic' = 'currency_rates',
| 'properties.bootstrap.servers' = 'bigdata01:9092,bigdata02:9092,bigdata03:9092',
| 'properties.group.id' = 'gid-sql-rates',
| 'key.format' = 'json',
| 'value.format' = 'json'
|)
|""".stripMargin
tEnv.executeSql(CurrencyRatesTableSql)
//结果表
val resTableSql =
"""
|CREATE TABLE orders_rates(
| order_id BIGINT,
| price DECIMAL(32,2),
| currency STRING,
| rate DECIMAL(32,2),
| order_time TIMESTAMP_LTZ(3)
|)WITH(
| 'connector' = 'kafka',
| 'topic' = 'orders_rates',
| 'properties.bootstrap.servers' = 'bigdata01:9092,bigdata02:9092,bigdata03:9092',
| 'format' = 'json',
| 'sink.partitioner' = 'default'
|)
|""".stripMargin
tEnv.executeSql(resTableSql)
//关联订单表和汇率表
val joinSql =
"""
|INSERT INTO orders_rates
|SELECT
| order_id,
| price,
| orders.currency,
| rate,
| order_time
|FROM orders
|LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
|ON orders.currency = currency_rates.currency
|""".stripMargin
tEnv.executeSql(joinSql)
}
}
使用版本表视图实现快照JOIN代码示例
package com.imooc.scala.sqljoin
import java.time.ZoneId
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
/**
* 版本表视图JOIN
* Created by xuwei
*/
object TemporalJoin_LeftJoin_VersionedView {
def main(args: Array[String]): Unit = {
//创建执行环境
val settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build()
val tEnv = TableEnvironment.create(settings)
//指定国内的时区
tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))
//设置Source自动置为idle的时间--如果数据源可以一直正常产生数据,则不需要配置此参数!!!
tEnv.getConfig.set("table.exec.source.idle-timeout","10s")
//订单表--仅追加表
val OrdersTableSql =
"""
|CREATE TABLE orders(
| order_id BIGINT,
| price DECIMAL(32,2),
| currency STRING,
| ts BIGINT,
| order_time AS TO_TIMESTAMP_LTZ(ts,3),
| WATERMARK FOR order_time AS order_time
|)WITH(
| 'connector' = 'kafka',
| 'topic' = 'orders',
| 'properties.bootstrap.servers' = 'bigdata01:9092,bigdata02:9092,bigdata03:9092',
| 'properties.group.id' = 'gid-sql-orders',
| 'scan.startup.mode' = 'latest-offset',
| 'format' = 'json',
| 'json.fail-on-missing-field' = 'false',
| 'json.ignore-parse-errors' = 'true'
|)
|""".stripMargin
tEnv.executeSql(OrdersTableSql)
//汇率表-仅追加表
val CurrencyRatesTableSql =
"""
|CREATE TABLE currency_rates(
| currency STRING,
| rate DECIMAL(32,2),
| ts BIGINT,
| update_time AS TO_TIMESTAMP_LTZ(ts,3),
| WATERMARK FOR update_time AS update_time
|)WITH(
| 'connector' = 'kafka',
| 'topic' = 'currency_rates',
| 'properties.bootstrap.servers' = 'bigdata01:9092,bigdata02:9092,bigdata03:9092',
| 'properties.group.id' = 'gid-sql-rates',
| -- 为了便于演示,在这使用latest-offset,每次启动都使用最新的数据
| 'scan.startup.mode' = 'latest-offset',
| 'format' = 'json',
| 'json.fail-on-missing-field' = 'false',
| 'json.ignore-parse-errors' = 'true'
|)
|""".stripMargin
tEnv.executeSql(CurrencyRatesTableSql)
//定义版本表视图(间接实现版本表的效果)
val VersionedRatesViewSql =
"""
|CREATE VIEW versioned_rates AS
|SELECT currency, rate, update_time -- 1:指定update_time为时间字段
| FROM (
| SELECT *,
| ROW_NUMBER() OVER (PARTITION BY currency -- 2:指定currency为主键
| ORDER BY update_time DESC -- 3:ORDER BY中必须指定TIMESTAMP格式的时间字段
| ) AS row_num
| FROM currency_rates
| )
|WHERE row_num = 1
|""".stripMargin
tEnv.executeSql(VersionedRatesViewSql)
//结果表
val resTableSql =
"""
|CREATE TABLE orders_rates(
| order_id BIGINT,
| price DECIMAL(32,2),
| currency STRING,
| rate DECIMAL(32,2),
| order_time TIMESTAMP_LTZ(3)
|)WITH(
| 'connector' = 'kafka',
| 'topic' = 'orders_rates',
| 'properties.bootstrap.servers' = 'bigdata01:9092,bigdata02:9092,bigdata03:9092',
| 'format' = 'json',
| 'sink.partitioner' = 'default'
|)
|""".stripMargin
tEnv.executeSql(resTableSql)
//关联订单表和汇率表
val joinSql =
"""
|INSERT INTO orders_rates
|SELECT
| order_id,
| price,
| orders.currency,
| rate,
| order_time
|FROM orders
|LEFT JOIN versioned_rates FOR SYSTEM_TIME AS OF orders.order_time
|ON orders.currency = versioned_rates.currency
|""".stripMargin
tEnv.executeSql(joinSql)
}
}