架构
大家好,我是老羊,今天我们来学习 Flink SQL 中的· Over 聚合操作。
- Over 聚合定义(支持 Batch\Streaming):可以理解为是一种特殊的滑动窗口聚合函数。
那这里我们拿 Over 聚合 与 窗口聚合 做一个对比,其之间的最大不同之处在于:
窗口聚合:不在 group by 中的字段,不能直接在 select 中拿到
Over 聚合:能够保留原始字段
注意:其实在生产环境中,Over 聚合的使用场景还是比较少的。在 Hive 中也有相同的聚合,但是小伙伴萌可以想想你在离线数仓经常使用嘛?
- 应用场景:计算最近一段滑动窗口的聚合结果数据。
- 际案例:查询每个产品最近一小时订单的金额总和:
SELECT order_id, order_time, amount,
SUM(amount) OVER (
PARTITION BY product
ORDERBY order_time
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
)AS one_hour_prod_amount_sum
FROM Orders
Over 聚合的语法总结如下:
SELECT
agg_func(agg_col) OVER (
[PARTITION BY col1[, col2, ...]]
ORDERBY time_col
range_definition),
...
FROM ...
其中:
- ORDER BY:必须是时间戳列(事件时间、处理时间)
- PARTITION BY:标识了聚合窗口的聚合粒度,如上述案例是按照 product 进行聚合
- range_definition:这个标识聚合窗口的聚合数据范围,在 Flink 中有两种指定数据范围的方式。第一种为按照行数聚合,第二种为按照时间区间聚合。如下案例所示:
a. 时间区间聚合:
按照时间区间聚合就是时间区间的一个滑动窗口,比如下面案例 1 小时的区间,最新输出的一条数据的 sum 聚合结果就是最近一小时数据的 amount 之和。
CREATETABLE source_table (
order_id BIGINT,
product BIGINT,
amount BIGINT,
order_time as cast(CURRENT_TIMESTAMP asTIMESTAMP(3)),
WATERMARK FOR order_time AS order_time - INTERVAL '0.001' SECOND
) WITH (
'connector'='datagen',
'rows-per-second'='1',
'fields.order_id.min'='1',
'fields.order_id.max'='2',
'fields.amount.min'='1',
'fields.amount.max'='10',
'fields.product.min'='1',
'fields.product.max'='2'
);
CREATETABLE sink_table (
product BIGINT,
order_time TIMESTAMP(3),
amount BIGINT,
one_hour_prod_amount_sum BIGINT
) WITH (
'connector'='print'
);
INSERTINTO sink_table
SELECT product, order_time, amount,
SUM(amount) OVER (
PARTITION BY product
ORDERBY order_time
-- 标识统计范围是一个 product 的最近 1 小时的数据
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
)AS one_hour_prod_amount_sum
FROM source_table
结果如下:
+I[2,2021-12-24T22:08:26.583,7,73]
+I[2,2021-12-24T22:08:27.583,7,80]
+I[2,2021-12-24T22:08:28.583,4,84]
+I[2,2021-12-24T22:08:29.584,7,91]
+I[2,2021-12-24T22:08:30.583,8,99]
+I[1,2021-12-24T22:08:31.583,9,138]
+I[2,2021-12-24T22:08:32.584,6,105]
+I[1,2021-12-24T22:08:33.584,7,145]
b. 行数聚合:
按照行数聚合就是数据行数的一个滑动窗口,比如下面案例,最新输出的一条数据的 sum 聚合结果就是最近 5 行数据的 amount 之和。
CREATETABLE source_table (
order_id BIGINT,
product BIGINT,
amount BIGINT,
order_time as cast(CURRENT_TIMESTAMP asTIMESTAMP(3)),
WATERMARK FOR order_time AS order_time - INTERVAL '0.001' SECOND
) WITH (
'connector'='datagen',
'rows-per-second'='1',
'fields.order_id.min'='1',
'fields.order_id.max'='2',
'fields.amount.min'='1',
'fields.amount.max'='2',
'fields.product.min'='1',
'fields.product.max'='2'
);
CREATETABLE sink_table (
product BIGINT,
order_time TIMESTAMP(3),
amount BIGINT,
one_hour_prod_amount_sum BIGINT
) WITH (
'connector'='print'
);
INSERTINTO sink_table
SELECT product, order_time, amount,
SUM(amount) OVER (
PARTITION BY product
ORDERBY order_time
-- 标识统计范围是一个 product 的最近 5 行数据
ROWS BETWEEN5 PRECEDING AND CURRENT ROW
)AS one_hour_prod_amount_sum
FROM source_table
预跑结果如下:
+I[2,2021-12-24T22:18:19.147,1,9]
+I[1,2021-12-24T22:18:20.147,2,11]
+I[1,2021-12-24T22:18:21.147,2,12]
+I[1,2021-12-24T22:18:22.147,2,12]
+I[1,2021-12-24T22:18:23.148,2,12]
+I[1,2021-12-24T22:18:24.147,1,11]
+I[1,2021-12-24T22:18:25.146,1,10]
+I[1,2021-12-24T22:18:26.147,1,9]
+I[2,2021-12-24T22:18:27.145,2,11]
+I[2,2021-12-24T22:18:28.148,1,10]
+I[2,2021-12-24T22:18:29.145,2,10]
当然,如果你在一个 SELECT 中有多个聚合窗口的聚合方式,Flink SQL 支持了一种简化写法,如下案例:
SELECT order_id, order_time, amount,
SUM(amount) OVER w AS sum_amount,
AVG(amount) OVER w AS avg_amount
FROM Orders
-- 使用下面子句,定义 Over Window
WINDOW w AS(
PARTITION BY product
ORDERBY order_time
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW)
文章来源网络,作者:运维,如若转载,请注明出处:https://shuyeidc.com/wp/299634.html<

