
表函数Join代码示例
自定义UDTF函数
package com.imooc.scala.sqljoin
import org.apache.flink.table.annotation.{DataTypeHint, FunctionHint}
import org.apache.flink.table.functions.TableFunction
import org.apache.flink.types.Row
/**
* 自定义UDTF函数
* 注意:需要通过注解指定输出数据Row中的字段名称和类型,这里的字段名称随便指定即可。
* Created by xuwei
*/
@FunctionHint(output = new DataTypeHint("ROW<f0 STRING>"))
class MyColumnToRowFunc extends TableFunction[Row]{
/**
* 函数的核心业务逻辑。
* 进来一条数据,这个函数会被触发执行一次,输出0条或者多条数据
* @param arrs
*/
def eval(arrs: Array[String]): Unit ={
arrs.foreach(arr=>{
//在这里只输出数组中元素为a-z的,便于后续判断INNER JOIN和LEFT JOIN的区别
if(arr.matches("[a-z]")){
collect(Row.of(arr))
}
})
}
}
TableFunctionJoin_InnerJoin
package com.imooc.scala.sqljoin
import org.apache.flink.configuration.CoreOptions
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
/**
* 表函数Join(Table Function Join)之 Inner Join
* Created by xuwei
*/
object TableFunctionJoin_InnerJoin {
def main(args: Array[String]): Unit = {
//创建执行环境
val settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build()
val tEnv = TableEnvironment.create(settings)
//设置全局并行度为1
tEnv.getConfig.set(CoreOptions.DEFAULT_PARALLELISM.key(),"1")
//创建输入表
val inTableSql =
"""
|CREATE TABLE user_action_log (
| id BIGINT,
| params ARRAY<STRING>
|)WITH(
| 'connector' = 'datagen',
| 'rows-per-second' = '1',
| 'fields.id.min' = '1',
| 'fields.id.max' = '10',
| 'fields.params.element.length' = '1' -- 数组中的元素长度改为1,便于后续在自定义函数中判断
|)
|""".stripMargin
tEnv.executeSql(inTableSql)
//创建输出表
val outTableSql =
"""
|CREATE TABLE print_sink(
| id BIGINT,
| param STRING
|)WITH(
| 'connector' = 'print'
|)
|""".stripMargin
tEnv.executeSql(outTableSql)
//注册自定义函数
val funSql =
"""
|CREATE FUNCTION my_column_to_row AS 'com.imooc.scala.sqljoin.MyColumnToRowFunc'
|""".stripMargin
tEnv.executeSql(funSql)
//业务逻辑
val execSql =
"""
|INSERT INTO print_sink
|SELECT
| id,
| tmp.param
|FROM user_action_log , -- 注意:下面没有显式指定INNER JOIN的时候,表名后面必须有逗号
|-- 给表起别名为tmp,并且重命名函数返回的字段,如果有多个字段,则使用逗号分割开即可
|LATERAL TABLE (my_column_to_row(params)) tmp(param)
|""".stripMargin
val execSql2 =
"""
|INSERT INTO print_sink
|SELECT
| id,
| tmp.param
|FROM user_action_log
|-- 注意:显式指定INNER JOIN的时候,表名后面不能有逗号,并且在语句最后需要指定一个ON TRUE条件,这属于语法要求,类似于WHERE 1=1的效果。
|INNER JOIN LATERAL TABLE (my_column_to_row(params)) tmp(param)
| ON TRUE
|""".stripMargin
tEnv.executeSql(execSql2)
}
}
TableFunctionJoin_LeftJoin
package com.imooc.scala.sqljoin
import org.apache.flink.configuration.CoreOptions
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
/**
* 表函数Join(Table Function Join)之 Left Join
* Created by xuwei
*/
object TableFunctionJoin_LeftJoin {
def main(args: Array[String]): Unit = {
//创建执行环境
val settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build()
val tEnv = TableEnvironment.create(settings)
//设置全局并行度为1
tEnv.getConfig.set(CoreOptions.DEFAULT_PARALLELISM.key(),"1")
//创建输入表
val inTableSql =
"""
|CREATE TABLE user_action_log (
| id BIGINT,
| params ARRAY<STRING>
|)WITH(
| 'connector' = 'datagen',
| 'rows-per-second' = '1',
| 'fields.id.min' = '1',
| 'fields.id.max' = '10',
| 'fields.params.element.length' = '1' -- 数组中的元素长度改为1,便于后续在自定义函数中判断
|)
|""".stripMargin
tEnv.executeSql(inTableSql)
//创建输出表
val outTableSql =
"""
|CREATE TABLE print_sink(
| id BIGINT,
| param STRING
|)WITH(
| 'connector' = 'print'
|)
|""".stripMargin
tEnv.executeSql(outTableSql)
//注册自定义函数
val funSql =
"""
|CREATE FUNCTION my_column_to_row AS 'com.imooc.scala.sqljoin.MyColumnToRowFunc'
|""".stripMargin
tEnv.executeSql(funSql)
val execSql2 =
"""
|INSERT INTO print_sink
|SELECT
| id,
| tmp.param
|FROM user_action_log
|-- 注意:显式指定LEFT JOIN的时候,表名后面不能有逗号,并且在语句最后需要指定一个ON TRUE条件,这属于语法要求,类似于WHERE 1=1的效果。
|LEFT JOIN LATERAL TABLE (my_column_to_row(params)) tmp(param)
| ON TRUE
|""".stripMargin
tEnv.executeSql(execSql2)
}
}