表函数Join代码示例

自定义UDTF函数

package com.imooc.scala.sqljoin

import org.apache.flink.table.annotation.{DataTypeHint, FunctionHint}
import org.apache.flink.table.functions.TableFunction
import org.apache.flink.types.Row

/**
 * 自定义UDTF函数
 * 注意:需要通过注解指定输出数据Row中的字段名称和类型,这里的字段名称随便指定即可。
 * Created by xuwei
 */
@FunctionHint(output = new DataTypeHint("ROW<f0 STRING>"))
class MyColumnToRowFunc extends TableFunction[Row]{

  /**
   * 函数的核心业务逻辑。
   * 进来一条数据,这个函数会被触发执行一次,输出0条或者多条数据
   * @param arrs
   */
  def eval(arrs: Array[String]): Unit ={
    arrs.foreach(arr=>{
      //在这里只输出数组中元素为a-z的,便于后续判断INNER JOIN和LEFT JOIN的区别
      if(arr.matches("[a-z]")){
        collect(Row.of(arr))
      }
    })
  }

}
TableFunctionJoin_InnerJoin
package com.imooc.scala.sqljoin

import org.apache.flink.configuration.CoreOptions
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

/**
 * 表函数Join(Table Function Join)之 Inner Join
 * Created by xuwei
 */
object TableFunctionJoin_InnerJoin {
  def main(args: Array[String]): Unit = {
    //创建执行环境
    val settings = EnvironmentSettings
      .newInstance()
      .inStreamingMode()
      .build()
    val tEnv = TableEnvironment.create(settings)

    //设置全局并行度为1
    tEnv.getConfig.set(CoreOptions.DEFAULT_PARALLELISM.key(),"1")

    //创建输入表
    val inTableSql =
      """
        |CREATE TABLE user_action_log (
        |  id  BIGINT,
        |  params ARRAY<STRING>
        |)WITH(
        |  'connector' = 'datagen',
        |  'rows-per-second' = '1',
        |  'fields.id.min' = '1',
        |  'fields.id.max' = '10',
        |  'fields.params.element.length' = '1' -- 数组中的元素长度改为1,便于后续在自定义函数中判断
        |)
        |""".stripMargin
    tEnv.executeSql(inTableSql)


    //创建输出表
    val outTableSql =
      """
        |CREATE TABLE print_sink(
        |  id BIGINT,
        |  param STRING
        |)WITH(
        |  'connector' = 'print'
        |)
        |""".stripMargin
    tEnv.executeSql(outTableSql)


    //注册自定义函数
    val funSql =
      """
        |CREATE FUNCTION my_column_to_row AS 'com.imooc.scala.sqljoin.MyColumnToRowFunc'
        |""".stripMargin
    tEnv.executeSql(funSql)

    //业务逻辑
    val execSql =
      """
        |INSERT INTO print_sink
        |SELECT
        |  id,
        |  tmp.param
        |FROM user_action_log , -- 注意:下面没有显式指定INNER JOIN的时候,表名后面必须有逗号
        |-- 给表起别名为tmp,并且重命名函数返回的字段,如果有多个字段,则使用逗号分割开即可
        |LATERAL TABLE (my_column_to_row(params)) tmp(param)
        |""".stripMargin

    val execSql2 =
      """
        |INSERT INTO print_sink
        |SELECT
        |  id,
        |  tmp.param
        |FROM user_action_log
        |-- 注意:显式指定INNER JOIN的时候,表名后面不能有逗号,并且在语句最后需要指定一个ON TRUE条件,这属于语法要求,类似于WHERE 1=1的效果。
        |INNER JOIN LATERAL TABLE (my_column_to_row(params)) tmp(param)
        | ON TRUE
        |""".stripMargin
    tEnv.executeSql(execSql2)

  }

}
TableFunctionJoin_LeftJoin
package com.imooc.scala.sqljoin

import org.apache.flink.configuration.CoreOptions
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

/**
 * 表函数Join(Table Function Join)之 Left Join
 * Created by xuwei
 */
object TableFunctionJoin_LeftJoin {
  def main(args: Array[String]): Unit = {
    //创建执行环境
    val settings = EnvironmentSettings
      .newInstance()
      .inStreamingMode()
      .build()
    val tEnv = TableEnvironment.create(settings)

    //设置全局并行度为1
    tEnv.getConfig.set(CoreOptions.DEFAULT_PARALLELISM.key(),"1")

    //创建输入表
    val inTableSql =
      """
        |CREATE TABLE user_action_log (
        |  id  BIGINT,
        |  params ARRAY<STRING>
        |)WITH(
        |  'connector' = 'datagen',
        |  'rows-per-second' = '1',
        |  'fields.id.min' = '1',
        |  'fields.id.max' = '10',
        |  'fields.params.element.length' = '1' -- 数组中的元素长度改为1,便于后续在自定义函数中判断
        |)
        |""".stripMargin
    tEnv.executeSql(inTableSql)


    //创建输出表
    val outTableSql =
      """
        |CREATE TABLE print_sink(
        |  id BIGINT,
        |  param STRING
        |)WITH(
        |  'connector' = 'print'
        |)
        |""".stripMargin
    tEnv.executeSql(outTableSql)


    //注册自定义函数
    val funSql =
      """
        |CREATE FUNCTION my_column_to_row AS 'com.imooc.scala.sqljoin.MyColumnToRowFunc'
        |""".stripMargin
    tEnv.executeSql(funSql)


    val execSql2 =
      """
        |INSERT INTO print_sink
        |SELECT
        |  id,
        |  tmp.param
        |FROM user_action_log
        |-- 注意:显式指定LEFT JOIN的时候,表名后面不能有逗号,并且在语句最后需要指定一个ON TRUE条件,这属于语法要求,类似于WHERE 1=1的效果。
        |LEFT JOIN LATERAL TABLE (my_column_to_row(params)) tmp(param)
        | ON TRUE
        |""".stripMargin
    tEnv.executeSql(execSql2)

  }

}

作者 admin

张宴银,大数据开发工程师

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注