推荐学习视频 – 1-9-1 FlinkSQL之窗口Join(Window Join)介绍_哔哩哔哩_bilibili

使用条件

窗口Join原理图解

Window Join 和 Interval Join的区别

Window Join代码示例

package com.imooc.scala.sqljoin

import java.time.ZoneId

import org.apache.flink.configuration.CoreOptions
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

/**
 * 滚动窗口Join
 * Created by xuwei
 */
object TumbleWindowJoin_WindowTVF {

  def main(args: Array[String]): Unit = {
    //创建执行环境
    val settings = EnvironmentSettings
      .newInstance()
      .inStreamingMode()
      .build()
    val tEnv = TableEnvironment.create(settings)

    //设置全局并行度为1--为了便于验证窗口触发的效果
    tEnv.getConfig.set(CoreOptions.DEFAULT_PARALLELISM.key(),"1")

    //指定国内的时区
    tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))

    //设置Source自动置为idle的时间--如果数据源可以一直正常产生数据,则不需要配置此参数!!!
    tEnv.getConfig.set("table.exec.source.idle-timeout","10s")

    //订单表
    val UserOrderTableSql =
      """
        |CREATE TABLE user_order(
        |  order_id BIGINT,
        |  order_type STRING,
        |  ts BIGINT,
        |  order_time AS TO_TIMESTAMP_LTZ(ts,3),
        |  -- 指定最大允许乱序的时间是10S
        |  WATERMARK FOR order_time AS order_time - INTERVAL '10' SECOND
        |)WITH(
        |  'connector' = 'kafka',
        |  'topic' = 'user_order',
        |  'properties.bootstrap.servers' = 'bigdata01:9092,bigdata02:9092,bigdata03:9092',
        |  'properties.group.id' = 'gid-sql-order',
        |  -- 为了便于演示,在这使用latest-offset,每次启动都使用最新的数据
        |  'scan.startup.mode' = 'latest-offset',
        |  'format' = 'json',
        |  'json.fail-on-missing-field' = 'false',
        |  'json.ignore-parse-errors' = 'true'
        |)
        |""".stripMargin
    tEnv.executeSql(UserOrderTableSql)

    //支付表
    val PaymentFlowTableSql =
      """
        |CREATE TABLE payment_flow(
        |  order_id BIGINT,
        |  pay_money BIGINT,
        |  ts BIGINT,
        |  pay_time AS TO_TIMESTAMP_LTZ(ts,3),
        |  -- 指定最大允许乱序的时间是10S
        |  WATERMARK FOR pay_time AS pay_time - INTERVAL '10' SECOND
        |)WITH(
        |  'connector' = 'kafka',
        |  'topic' = 'payment_flow',
        |  'properties.bootstrap.servers' = 'bigdata01:9092,bigdata02:9092,bigdata03:9092',
        |  'properties.group.id' = 'gid-sql-payment',
        |  -- 为了便于演示,在这使用latest-offset,每次启动都使用最新的数据
        |  'scan.startup.mode' = 'latest-offset',
        |  'format' = 'json',
        |  'json.fail-on-missing-field' = 'false',
        |  'json.ignore-parse-errors' = 'true'
        |)
        |""".stripMargin
    tEnv.executeSql(PaymentFlowTableSql)


    //结果表
    val resTableSql =
      """
        |CREATE TABLE order_payment(
        |  order_id BIGINT,
        |  order_type STRING,
        |  order_time TIMESTAMP_LTZ(3),
        |  pay_money BIGINT,
        |  -- 结果表中只能保留数据流中的一个事件时间字段(order_time or pay_time)
        |  -- 如果有多个,需要把其他的事件时间字段类型改为TIMESTAMP或者TIMESTAMP_LTZ
        |  pay_time TIMESTAMP_LTZ
        |)WITH(
        |  'connector' = 'kafka',
        |  'topic' = 'order_payment',
        |  'properties.bootstrap.servers' = 'bigdata01:9092,bigdata02:9092,bigdata03:9092',
        |  'format' = 'json',
        |  'sink.partitioner' = 'default'
        |)
        |""".stripMargin
    tEnv.executeSql(resTableSql)

    //关联订单表和支付表
    val joinSql =
      """
        |INSERT INTO order_payment
        |SELECT
        |  l.order_id,
        |  l.order_type,
        |  l.order_time,
        |  r.pay_money,
        |  r.pay_time
        |FROM (
        |  SELECT *
        |  FROM TABLE(TUMBLE(TABLE user_order,DESCRIPTOR(order_time),INTERVAL '3' SECOND))
        |) AS l
        |INNER JOIN(
        |  SELECT *
        |  FROM TABLE(TUMBLE(TABLE payment_flow,DESCRIPTOR(pay_time),INTERVAL '3' SECOND))
        |) AS r
        |ON l.order_id = r.order_id
        |AND l.window_start = r.window_start
        |AND l.window_end = r.window_end
        |""".stripMargin
    tEnv.executeSql(joinSql)

  }

}

作者 admin

张宴银,大数据开发工程师

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注