跳至内容
- — 输入表
- CREATE TABLE source_table (
- sku_id STRING,
- price BIGINT,
- row_time AS cast(CURRENT_TIMESTAMP as timestamp_LTZ(3)),
- WATERMARK FOR row_time AS row_time – INTERVAL ‘5’ SECOND
- ) WITH (
- ‘connector’ = ‘datagen’, –数据自动生成
- ‘rows-per-second’ = ‘1’, –每秒一行
- ‘fields.sku_id.length’ = ‘1’, –长度
- ‘fields.price.min’ = ‘1’, –最小
- ‘fields.price.max’ = ‘1000000’ –最大
- );
- — 输出表
- CREATE TABLE sink_table (
- window_start TIMESTAMP(3), –窗口开始
- window_end TIMESTAMP(3), –窗口结束
- sku_id STRING,
- count_result BIGINT,
- sum_result BIGINT,
- avg_result DOUBLE,
- min_result BIGINT,
- max_result BIGINT,
- PRIMARY KEY (`sku_id`) NOT ENFORCED
- ) WITH (
- ‘connector’ = ‘print’ –打印在stdout中
- );
- — 输出表,使用的是滑动窗口函数 CUMULATE(表名,窗口参数,滑动步长,窗口大小)
- insert into sink_table
- select
-
- window_start,
- window_end,
- sku_id,
- count(*) as count_result,
- sum(price) as sum_result,
- avg(price) as avg_result,
- min(price) as min_result,
- max(price) as max_result
- from TABLE(CUMULATE(TABLE source_table, DESCRIPTOR(row_time), INTERVAL ‘5’ SECOND, INTERVAL ‘1’ DAY)) group by sku_id, window_start,window_end;