推荐学习视频 – 1-6-1 FlinkSQL之维表Join(Lookup Join)介绍_哔哩哔哩_bilibili

lookup Join 使用条件

维表Join和双流Join的区别

维表Join原理图解

维表数据更新后,之后的join结果示例如下

表A中uid为1001的数据并不会获取到表B中最新的level_id = 1的level_name值。因为表A中uid为1001的数据已经属于历史数据了。

维表Join使用时的注意事项:

维表中的数据在初次读取后会被缓存在内存中。假设flink程序有6个线程,则每个线程会cache自己的一份缓存维表数据。

维表数据更新之后,没有读取过该维表的线程,可以拿到最新的维表数据。

已经读取过该维表的线程,直接使用其内存中缓存的维表数据,即过期的非最新维表数据。

所以需要对缓存的数据进行定时过期,主动清理flink线程中的维表缓存数据,但是,依旧不能保证维表更新后,第一时间就拿到最新的维表数据。所以会引入部分脏数据,导致数据质量问题。

connector中相关参数为
-- 通过lookup缓存可以减少Flink任务和数据库的请求次数,启用之后每个子任务中会保存一份缓存数据
  'lookup.cache.max-rows' = '100', -- 控制lookup缓存中最多存储的数据条数
  'lookup.cache.ttl' = '3600000', -- 控制lookup缓存中数据的生命周期(毫秒),太大或者太小都不合适
代码示例 LookupJoin Inner / left  Join

LookupJoin Inner Join

package com.imooc.scala.sqljoin

import java.time.ZoneId

import org.apache.flink.configuration.CoreOptions
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

/**
 * 维表Join(Lookup Join)之 Inner Join
 * Created by xuwei
 */
object LookupJoin_InnerJoin {

  def main(args: Array[String]): Unit = {
    //创建执行环境
    val settings = EnvironmentSettings
      .newInstance()
      .inStreamingMode()
      .build()
    val tEnv = TableEnvironment.create(settings)

    //设置全局并行度为1
    tEnv.getConfig.set(CoreOptions.DEFAULT_PARALLELISM.key(),"1")

    //指定国内的时区
    tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))


    //直播开播记录表
    val VideoDataTableSql =
      """
        |CREATE TABLE video_data(
        |  vid STRING,
        |  uid STRING,
        |  start_time BIGINT,
        |  country STRING,
        |  proc_time AS PROCTIME() -- 处理时间
        |)WITH(
        |  'connector' = 'kafka',
        |  'topic' = 'video_data',
        |  'properties.bootstrap.servers' = 'bigdata01:9092,bigdata02:9092,bigdata03:9092',
        |  'properties.group.id' = 'gid-sql-video',
        |  'scan.startup.mode' = 'latest-offset',
        |  'format' = 'json',
        |  'json.fail-on-missing-field' = 'false',
        |  'json.ignore-parse-errors' = 'true'
        |)
        |""".stripMargin
    tEnv.executeSql(VideoDataTableSql)

    //国家和大区映射关系-维表
    val CountryAreaTableSql =
      """
        |CREATE TABLE country_area(
        |  country STRING,
        |  area STRING
        |)WITH(
        |  'connector' = 'jdbc',
        |  'driver' = 'com.mysql.cj.jdbc.Driver', -- mysql8.x使用这个driver class
        |  'url' = 'jdbc:mysql://localhost:3306/flink_data?serverTimezone=Asia/Shanghai', -- mysql8.x中需要指定时区
        |  'username' = 'root',
        |  'password' = 'admin',
        |  'table-name' = 'country_area',
        |  -- 通过lookup缓存可以减少Flink任务和数据库的请求次数,启用之后每个子任务中会保存一份缓存数据
        |  'lookup.cache.max-rows' = '100', -- 控制lookup缓存中最多存储的数据条数
        |  'lookup.cache.ttl' = '3600000', -- 控制lookup缓存中数据的生命周期(毫秒),太大或者太小都不合适
        |  'lookup.max-retries' = '1' -- 查询数据库失败后重试的次数
        |)
        |""".stripMargin
    tEnv.executeSql(CountryAreaTableSql)


    //结果表
    val resTableSql =
      """
        |CREATE TABLE new_video_data(
        |  vid STRING,
        |  uid STRING,
        |  start_time BIGINT,
        |  area STRING
        |)WITH(
        |  'connector' = 'kafka',
        |  'topic' = 'new_video_data',
        |  'properties.bootstrap.servers' = 'bigdata01:9092,bigdata02:9092,bigdata03:9092',
        |  'format' = 'json',
        |  'sink.partitioner' = 'default'
        |)
        |""".stripMargin
    tEnv.executeSql(resTableSql)

    //关联开播记录表和国家大区关系表
    val joinSql =
      """
        |INSERT INTO new_video_data
        |SELECT
        |  vid,
        |  uid,
        |  start_time,
        |  area
        |FROM video_data
        |INNER JOIN country_area FOR SYSTEM_TIME AS OF video_data.proc_time
        |ON video_data.country = country_area.country
        |""".stripMargin
    tEnv.executeSql(joinSql)

  }

}

LookupJoin Left Join

package com.imooc.scala.sqljoin

import java.time.ZoneId

import org.apache.flink.configuration.CoreOptions
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

/**
 * 维表Join(Lookup Join)之 Left Join
 * Created by xuwei
 */
object LookupJoin_LeftJoin {

  def main(args: Array[String]): Unit = {
    //创建执行环境
    val settings = EnvironmentSettings
      .newInstance()
      .inStreamingMode()
      .build()
    val tEnv = TableEnvironment.create(settings)

    //指定国内的时区
    tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))


    //直播开播记录表
    val VideoDataTableSql =
      """
        |CREATE TABLE video_data(
        |  vid STRING,
        |  uid STRING,
        |  start_time BIGINT,
        |  country STRING,
        |  proc_time AS PROCTIME() -- 处理时间
        |)WITH(
        |  'connector' = 'kafka',
        |  'topic' = 'video_data',
        |  'properties.bootstrap.servers' = 'bigdata01:9092,bigdata02:9092,bigdata03:9092',
        |  'properties.group.id' = 'gid-sql-video',
        |  'scan.startup.mode' = 'latest-offset',
        |  'format' = 'json',
        |  'json.fail-on-missing-field' = 'false',
        |  'json.ignore-parse-errors' = 'true'
        |)
        |""".stripMargin
    tEnv.executeSql(VideoDataTableSql)

    //国家和大区映射关系-维表
    val CountryAreaTableSql =
      """
        |CREATE TABLE country_area(
        |  country STRING,
        |  area STRING
        |)WITH(
        |  'connector' = 'jdbc',
        |  'driver' = 'com.mysql.cj.jdbc.Driver', -- mysql8.x使用这个driver class
        |  'url' = 'jdbc:mysql://localhost:3306/flink_data?serverTimezone=Asia/Shanghai', -- mysql8.x中需要指定时区
        |  'username' = 'root',
        |  'password' = 'admin',
        |  'table-name' = 'country_area',
        |  -- 通过lookup缓存可以减少Flink任务和数据库的请求次数,启用之后每个子任务中会保存一份缓存数据
        |  'lookup.cache.max-rows' = '100', -- 控制lookup缓存中最多存储的数据条数
        |  'lookup.cache.ttl' = '3600000', -- 控制lookup缓存中数据的生命周期(毫秒),太大或者太小都不合适
        |  'lookup.max-retries' = '1' -- 查询数据库失败后重试的次数
        |)
        |""".stripMargin
    tEnv.executeSql(CountryAreaTableSql)


    //结果表
    val resTableSql =
      """
        |CREATE TABLE new_video_data(
        |  vid STRING,
        |  uid STRING,
        |  start_time BIGINT,
        |  area STRING
        |)WITH(
        |  'connector' = 'kafka',
        |  'topic' = 'new_video_data',
        |  'properties.bootstrap.servers' = 'bigdata01:9092,bigdata02:9092,bigdata03:9092',
        |  'format' = 'json',
        |  'sink.partitioner' = 'default'
        |)
        |""".stripMargin
    tEnv.executeSql(resTableSql)

    //关联开播记录表和国家大区关系表
    val joinSql =
      """
        |INSERT INTO new_video_data
        |SELECT
        |  vid,
        |  uid,
        |  start_time,
        |  area
        |FROM video_data
        |LEFT JOIN country_area FOR SYSTEM_TIME AS OF video_data.proc_time
        |ON video_data.country = country_area.country
        |""".stripMargin
    tEnv.executeSql(joinSql)

  }

}

作者 admin

张宴银,大数据开发工程师

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注