• — 输入表
  • CREATE TABLE source_table (
  • sku_id STRING,
  • price BIGINT,
  • row_time AS cast(CURRENT_TIMESTAMP as timestamp_LTZ(3)),
  • WATERMARK FOR row_time AS row_time – INTERVAL ‘5’ SECOND
  • ) WITH (
  • ‘connector’ = ‘datagen’, –数据自动生成
  • ‘rows-per-second’ = ‘1’, –每秒一行
  • ‘fields.sku_id.length’ = ‘1’, –长度
  • ‘fields.price.min’ = ‘1’, –最小
  • ‘fields.price.max’ = ‘1000000’ –最大
  • );
  • — 输出表
  • CREATE TABLE sink_table (
  • window_start TIMESTAMP(3), –窗口开始
  • window_end TIMESTAMP(3), –窗口结束
  • sku_id STRING,
  • count_result BIGINT,
  • sum_result BIGINT,
  • avg_result DOUBLE,
  • min_result BIGINT,
  • max_result BIGINT,
  • PRIMARY KEY (`sku_id`) NOT ENFORCED
  • ) WITH (
  • ‘connector’ = ‘print’ –打印在stdout中
  • );
  • — 输出表,使用的是滑动窗口函数 CUMULATE(表名,窗口参数,滑动步长,窗口大小)
  • insert into sink_table
  • select
      • window_start,
      • window_end,
      • sku_id,
  • count(*) as count_result,
  • sum(price) as sum_result,
  • avg(price) as avg_result,
  • min(price) as min_result,
  • max(price) as max_result
  • from TABLE(CUMULATE(TABLE source_table, DESCRIPTOR(row_time), INTERVAL ‘5’ SECOND, INTERVAL ‘1’ DAY)) group by sku_id, window_start,window_end;

作者 admin

张宴银,大数据开发工程师

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注