Flink SQL 知其所以然:万字详述 Flink SQL 四种时间窗口语义!

DML:窗口聚合

大家好我是老羊,由于窗口涉及到的知识内容比较多,所以博主先为大家说明介绍下面内容时的思路,大家跟着思路走。思路如下:

  • 先介绍 Flink SQL 支持的 4 种时间窗口。
  • 分别详细介绍上述的 4 种时间窗口的功能及 SQL 语法。
  • 结合实际案例介绍 4 种时间窗口。

首先来看看 Flink SQL 中支持的 4 种窗口的运算。

  • 滚动窗口(TUMBLE)。
  • 滑动窗口(HOP)。
  • Session 窗口(SESSION)。
  • 渐进式窗口(CUMULATE)。

1、滚动窗口(TUMBLE)

滚动窗口定义:滚动窗口将每个元素指定给指定窗口大小的窗口。滚动窗口具有固定大小,且不重叠。例如,指定一个大小为 5 分钟的滚动窗口。在这种情况下,Flink 将每隔 5 分钟开启一个新的窗口,其中每一条数都会划分到唯一一个 5 分钟的窗口中,如下图所示。

tumble window

应用场景:常见的按照一分钟对数据进行聚合,计算一分钟内 PV,UV 数据。

实际案例:简单且常见的分维度分钟级别同时在线用户数、总销售额。

那么上面这个案例的 SQL 要咋写呢?

关于滚动窗口,在 1.13 版本之前和 1.13 及之后版本有两种 Flink SQL 实现方式,分别是:

  • Group Window Aggregation(1.13 之前只有此类方案,此方案在 1.13 及之后版本已经标记为废弃,不推荐小伙伴萌使用)。
  • Windowing TVF(1.13 及之后建议使用 Windowing TVF)。

博主这里两种方法都会介绍:

  • Group Window Aggregation 方案(支持 Batch\Streaming 任务):
--数据源表
CREATETABLEsource_table (
--维度数据
dimSTRING,
--用户id
user_idBIGINT,
--用户
priceBIGINT,
--事件时间戳
row_timeAScast(CURRENT_TIMESTAMPastimestamp(3)),
--watermark设置
WATERMARKFORrow_timeASrow_time-INTERVAL'5'SECOND
) WITH (
'connector'='datagen',
'rows-per-second'='10',
'fields.dim.length'='1',
'fields.user_id.min'='1',
'fields.user_id.max'='100000',
'fields.price.min'='1',
'fields.price.max'='100000'
)
--数据汇表
CREATETABLEsink_table (
dimSTRING,
pvBIGINT,
sum_priceBIGINT,
max_priceBIGINT,
min_priceBIGINT,
uvBIGINT,
window_startbigint
) WITH (
'connector'='print'
)
--数据处理逻辑
insertintosink_table
select
dim,
count(*) aspv,
sum(price) assum_price,
max(price) asmax_price,
min(price) asmin_price,
--计算uv
count(distinctuser_id) asuv,
UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval'1'minute) ASSTRING)) *1000aswindow_start
fromsource_table
groupby
dim,
tumble(row_time, interval'1'minute)

可以看到 Group Window Aggregation 滚动窗口的 SQL 语法就是把 tumble window 的声明写在了 group by 子句中,即 tumble(row_time, interval ‘1’ minute),第一个参数为事件时间的时间戳;第二个参数为滚动窗口大小。

  • Window TVF 方案(1.13 只支持 Streaming 任务):
--数据源表
CREATETABLEsource_table (
--维度数据
dimSTRING,
--用户id
user_idBIGINT,
--用户
priceBIGINT,
--事件时间戳
row_timeAScast(CURRENT_TIMESTAMPastimestamp(3)),
--watermark设置
WATERMARKFORrow_timeASrow_time-INTERVAL'5'SECOND
) WITH (
'connector'='datagen',
'rows-per-second'='10',
'fields.dim.length'='1',
'fields.user_id.min'='1',
'fields.user_id.max'='100000',
'fields.price.min'='1',
'fields.price.max'='100000'
)
--数据汇表
CREATETABLEsink_table (
dimSTRING,
pvBIGINT,
sum_priceBIGINT,
max_priceBIGINT,
min_priceBIGINT,
uvBIGINT,
window_startbigint
) WITH (
'connector'='print'
)
--数据处理逻辑
insertintosink_table
SELECT
dim,
UNIX_TIMESTAMP(CAST(window_startASSTRING)) *1000aswindow_start,
count(*) aspv,
sum(price) assum_price,
max(price) asmax_price,
min(price) asmin_price,
count(distinctuser_id) asuv
FROMTABLE(TUMBLE(
TABLEsource_table
, DESCRIPTOR(row_time)
, INTERVAL'60'SECOND))
GROUPBYwindow_start,
window_end,
dim

可以看到 Windowing TVF 滚动窗口的写法就是把 tumble window 的声明写在了数据源的 Table 子句中,即 TABLE(TUMBLE(TABLE source_table, DESCRIPTOR(row_time), INTERVAL ’60’ SECOND)),包含三部分参数。

第一个参数 TABLE source_table 声明数据源表;第二个参数 DESCRIPTOR(row_time) 声明数据源的时间戳;第三个参数 INTERVAL ’60’ SECOND 声明滚动窗口大小为 1 min。

SQL 语义:

由于离线没有相同的时间窗口聚合概念,这里就直接说实时场景 SQL 语义,假设 Orders 为 kafka,target_table 也为 Kafka,这个 SQL 生成的实时任务,在执行时,会生成三个算子:

数据源算子(From Order):连接到 Kafka topic,数据源算子一直运行,实时的从 Order Kafka 中一条一条的读取数据,然后一条一条发送给下游的 窗口聚合算子。

窗口聚合算子(TUMBLE 算子):接收到上游算子发的一条一条的数据,然后将每一条数据按照时间戳划分到对应的窗口中(根据事件时间、处理时间的不同语义进行划分),上述案例为事件时间,事件时间中,滚动窗口算子接收到上游的 Watermark 大于窗口的结束时间时,则说明当前这一分钟的滚动窗口已经结束了,将窗口计算完的结果发往下游算子(一条一条发给下游 数据汇算子)。

数据汇算子(INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Kafka 中。

这个实时任务也是 24 小时一直在运行的,所有的算子在同一时刻都是处于 running 状态的。

注意:

事件时间中滚动窗口的窗口计算触发是由 Watermark 推动的。

2、滑动窗口(HOP)

滑动窗口定义:滑动窗口也是将元素指定给固定长度的窗口。与滚动窗口功能一样,也有窗口大小的概念。不一样的地方在于,滑动窗口有另一个参数控制窗口计算的频率(滑动窗口滑动的步长)。因此,如果滑动的步长小于窗口大小,则滑动窗口之间每个窗口是可以重叠。在这种情况下,一条数据就会分配到多个窗口当中。举例,有 10 分钟大小的窗口,滑动步长为 5 分钟。这样,每 5 分钟会划分一次窗口,这个窗口包含的数据是过去 10 分钟内的数据,如下图所示。

hop window

应用场景:比如计算同时在线的数据,要求结果的输出频率是 1 分钟一次,每次计算的数据是过去 5 分钟的数据(有的场景下用户可能在线,但是可能会 2 分钟不活跃,但是这也要算在同时在线数据中,所以取最近 5 分钟的数据就能计算进去了)。

实际案例:简单且常见的分维度分钟级别同时在线用户数,1 分钟输出一次,计算最近 5 分钟的数据。

依然是 Group Window Aggregation、Windowing TVF 两种方案:

  • Group Window Aggregation 方案(支持 Batch\Streaming 任务):
--数据源表
CREATETABLEsource_table (
--维度数据
dimSTRING,
--用户id
user_idBIGINT,
--用户
priceBIGINT,
--事件时间戳
row_timeAScast(CURRENT_TIMESTAMPastimestamp(3)),
--watermark设置
WATERMARKFORrow_timeASrow_time-INTERVAL'5'SECOND
) WITH (
'connector'='datagen',
'rows-per-second'='10',
'fields.dim.length'='1',
'fields.user_id.min'='1',
'fields.user_id.max'='100000',
'fields.price.min'='1',
'fields.price.max'='100000'
);
--数据汇表
CREATETABLEsink_table (
dimSTRING,
uvBIGINT,
window_startbigint
) WITH (
'connector'='print'
);
--数据处理逻辑
insertintosink_table
SELECTdim,
UNIX_TIMESTAMP(CAST(hop_start(row_time, interval'1'minute, interval'5'minute) ASSTRING)) *1000aswindow_start,
count(distinctuser_id) asuv
FROMsource_table
GROUPBYdim
, hop(row_time, interval'1'minute, interval'5'minute)

可以看到 Group Window Aggregation 滚动窗口的写法就是把 hop window 的声明写在了 group by 子句中,即 hop(row_time, interval ‘1’ minute, interval ‘5’ minute)。其中:

第一个参数为事件时间的时间戳;第二个参数为滑动窗口的滑动步长;第三个参数为滑动窗口大小。

  • Windowing TVF 方案(1.13 只支持 Streaming 任务):
--数据源表
CREATETABLEsource_table (
--维度数据
dimSTRING,
--用户id
user_idBIGINT,
--用户
priceBIGINT,
--事件时间戳
row_timeAScast(CURRENT_TIMESTAMPastimestamp(3)),
--watermark设置
WATERMARKFORrow_timeASrow_time-INTERVAL'5'SECOND
) WITH (
'connector'='datagen',
'rows-per-second'='10',
'fields.dim.length'='1',
'fields.user_id.min'='1',
'fields.user_id.max'='100000',
'fields.price.min'='1',
'fields.price.max'='100000'
);
--数据汇表
CREATETABLEsink_table (
dimSTRING,
uvBIGINT,
window_startbigint
) WITH (
'connector'='print'
);
--数据处理逻辑
insertintosink_table
SELECT
dim,
UNIX_TIMESTAMP(CAST(window_startASSTRING)) *1000aswindow_start,
count(distinctuser_id) asbucket_uv
FROMTABLE(HOP(
TABLEsource_table
, DESCRIPTOR(row_time)
, INTERVAL'1'MINUTES, INTERVAL'5'MINUTES))
GROUPBYwindow_start,
window_end,
dim

可以看到 Windowing TVF 滚动窗口的写法就是把 hop window 的声明写在了数据源的 Table 子句中,即 TABLE(HOP(TABLE source_table, DESCRIPTOR(row_time), INTERVAL ‘1’ MINUTES, INTERVAL ‘5’ MINUTES)),包含四部分参数:

第一个参数 TABLE source_table 声明数据源表;第二个参数 DESCRIPTOR(row_time) 声明数据源的时间戳;第三个参数 INTERVAL ‘1’ MINUTES 声明滚动窗口滑动步长大小为 1 min。第四个参数 INTERVAL ‘5’ MINUTES 声明滚动窗口大小为 5 min。

SQL 语义:

滑动窗口语义和滚动窗口类似,这里不再赘述。

3、Session 窗口(SESSION)

  • Session 窗口定义:Session 时间窗口和滚动、滑动窗口不一样,其没有固定的持续时间,如果在定义的间隔期(Session Gap)内没有新的数据出现,则 Session 就会窗口关闭。如下图对比所示:

session window

实际案例:计算每个用户在活跃期间(一个 Session)总共购买的商品数量,如果用户 5 分钟没有活动则视为 Session 断开。

目前 1.13 版本中 Flink SQL 不支持 Session 窗口的 Window TVF,所以这里就只介绍 Group Window Aggregation 方案:

  • Group Window Aggregation 方案(支持 Batch\Streaming 任务):
--数据源表,用户购买行为记录表
CREATETABLEsource_table (
--维度数据
dimSTRING,
--用户id
user_idBIGINT,
--用户
priceBIGINT,
--事件时间戳
row_timeAScast(CURRENT_TIMESTAMPastimestamp(3)),
--watermark设置
WATERMARKFORrow_timeASrow_time-INTERVAL'5'SECOND
) WITH (
'connector'='datagen',
'rows-per-second'='10',
'fields.dim.length'='1',
'fields.user_id.min'='1',
'fields.user_id.max'='100000',
'fields.price.min'='1',
'fields.price.max'='100000'
);
--数据汇表
CREATETABLEsink_table (
dimSTRING,
pvBIGINT, --购买商品数量
window_startbigint
) WITH (
'connector'='print'
);
--数据处理逻辑
insertintosink_table
SELECT
dim,
UNIX_TIMESTAMP(CAST(session_start(row_time, interval'5'minute) ASSTRING)) *1000aswindow_start,
count(1) aspv
FROMsource_table
GROUPBYdim
, session(row_time, interval'5'minute)

注意:

上述 SQL 任务是在整个 Session 窗口结束之后才会把数据输出。Session 窗口即支持 处理时间 也支持 事件时间。但是处理时间只支持在Streaming 任务中运行,Batch 任务不支持。

可以看到 Group Window Aggregation 中 Session 窗口的写法就是把 session window 的声明写在了 group by 子句中,即 session(row_time, interval ‘5’ minute)。其中:

第一个参数为事件时间的时间戳;第二个参数为 Session gap 间隔。

SQL 语义:

Session 窗口语义和滚动窗口类似,这里不再赘述。

4、渐进式窗口(CUMULATE)

渐进式窗口定义(1.13 只支持 Streaming 任务):渐进式窗口在其实就是 固定窗口间隔内提前触发的的滚动窗口,其实就是 Tumble Window + early-fire 的一个事件时间的版本。例如,从每日零点到当前这一分钟绘制累积 UV,其中 10:00 时的 UV 表示从 00:00 到 10:00 的 UV总数。渐进式窗口可以认为是首先开一个最大窗口大小的滚动窗口,然后根据用户设置的触发的时间间隔将这个滚动窗口拆分为多个窗口,这些窗口具有相同的窗口起点和不同的窗口终点。如下图所示:

cumulate window

应用场景:周期内累计 PV,UV 指标(如每天累计到当前这一分钟的 PV,UV)。这类指标是一段周期内的累计状态,对分析师来说更具统计分析价值,而且几乎所有的复合指标都是基于此类指标的统计(不然离线为啥都要累计一天的数据,而不要一分钟累计的数据呢)。

实际案例:每天的截止当前分钟的累计 money(sum(money)),去重 id 数(count(distinct id))。每天代表渐进式窗口大小为 1 天,分钟代表渐进式窗口移动步长为分钟级别。举例如下:

明细输入数据:

预期经过渐进式窗口计算的输出数据:

转化为折线图长这样:

当日累计

可以看到,其特点就在于,每一分钟的输出结果都是当天零点累计到当前的结果。

渐进式窗口目前只有 Windowing TVF 方案支持:

  • Windowing TVF 方案(1.13 只支持 Streaming 任务):
--数据源表
CREATETABLEsource_table (
--用户id
user_idBIGINT,
--用户
moneyBIGINT,
--事件时间戳
row_timeAScast(CURRENT_TIMESTAMPastimestamp(3)),
--watermark设置
WATERMARKFORrow_timeASrow_time-INTERVAL'5'SECOND
) WITH (
'connector'='datagen',
'rows-per-second'='10',
'fields.user_id.min'='1',
'fields.user_id.max'='100000',
'fields.price.min'='1',
'fields.price.max'='100000'
);
--数据汇表
CREATETABLEsink_table (
window_endbigint,
window_startbigint,
sum_moneyBIGINT,
count_distinct_idbigint
) WITH (
'connector'='print'
);
--数据处理逻辑
insertintosink_table
SELECT
UNIX_TIMESTAMP(CAST(window_endASSTRING)) *1000aswindow_end,
window_start,
sum(money) assum_money,
count(distinctid) ascount_distinct_id
FROMTABLE(CUMULATE(
TABLEsource_table
, DESCRIPTOR(row_time)
, INTERVAL'60'SECOND
, INTERVAL'1'DAY))
GROUPBY
window_start,
window_end

可以看到 Windowing TVF 滚动窗口的写法就是把 cumulate window 的声明写在了数据源的 Table 子句中,即 TABLE(CUMULATE(TABLE source_table, DESCRIPTOR(row_time), INTERVAL ’60’ SECOND, INTERVAL ‘1’ DAY)),其中包含四部分参数:

第一个参数 TABLE source_table 声明数据源表;第二个参数 DESCRIPTOR(row_time) 声明数据源的时间戳;第三个参数 INTERVAL ’60’ SECOND 声明渐进式窗口触发的渐进步长为 1 min。第四个参数 INTERVAL ‘1’ DAY 声明整个渐进式窗口的大小为 1 天,到了第二天新开一个窗口重新累计。

SQL 语义:

渐进式窗口语义和滚动窗口类似,这里不再赘述。

5、Window TVF 支持 Grouping Sets、Rollup、Cube

具体应用场景:实际的案例场景中,经常会有多个维度进行组合(cube)计算指标的场景。如果把每个维度组合的代码写一遍,然后

union all 起来,这样写起来非常麻烦,而且会导致一个数据源读取多遍。

这时,有离线 Hive SQL 使用经验的小伙伴萌就会想到,如果有了 Grouping Sets,我们就可以直接用 Grouping Sets 将维度组合写在一条 SQL 中,写起来方便并且执行效率也高。当然,Flink 支持这个功能。

但是目前 Grouping Sets 只在 Window TVF 中支持,不支持 Group Window Aggregation。

来一个实际案例感受一下,计算每日零点累计到当前这一分钟的分汇总、age、sex、age+sex 维度的用户数。

--用户访问明细表
CREATETABLEsource_table (
ageSTRING,
sexSTRING,
user_idBIGINT,
row_timeAScast(CURRENT_TIMESTAMPastimestamp(3)),
WATERMARKFORrow_timeASrow_time-INTERVAL'5'SECOND
) WITH (
'connector'='datagen',
'rows-per-second'='1',
'fields.age.length'='1',
'fields.sex.length'='1',
'fields.user_id.min'='1',
'fields.user_id.max'='100000'
);
CREATETABLEsink_table (
ageSTRING,
sexSTRING,
uvBIGINT,
window_endbigint
) WITH (
'connector'='print'
);
insertintosink_table
SELECT
UNIX_TIMESTAMP(CAST(window_endASSTRING)) *1000aswindow_end,
if (ageisnull, 'ALL', age) asage,
if (sexisnull, 'ALL', sex) assex,
count(distinctuser_id) asbucket_uv
FROMTABLE(CUMULATE(
TABLEsource_table
, DESCRIPTOR(row_time)
, INTERVAL'5'SECOND
, INTERVAL'1'DAY))
GROUPBY
window_start,
window_end,
--groupingsets写法
GROUPINGSETS (
()
, (age)
, (sex)
, (age, sex)
)

小伙伴萌这里需要注意下!

Flink SQL 中 Grouping Sets 的语法和 Hive SQL 的语法有一些不同,如果我们使用 Hive SQL 实现上述 SQL 的语义,其实现如下:

insertintosink_table
SELECT
UNIX_TIMESTAMP(CAST(window_endASSTRING)) *1000aswindow_end,
if (ageisnull, 'ALL', age) asage,
if (sexisnull, 'ALL', sex) assex,
count(distinctuser_id) asbucket_uv
FROMsource_table
GROUPBY
age
, sex
--hivesqlgroupingsets写法
GROUPINGSETS (
()
, (age)
, (sex)
, (age, sex)
)

文章来源网络,作者:运维,如若转载,请注明出处:https://shuyeidc.com/wp/240797.html<

(0)
运维的头像运维
上一篇2025-04-24 01:10
下一篇 2025-04-24 01:11

相关推荐

  • 个人主题怎么制作?

    制作个人主题是一个将个人风格、兴趣或专业领域转化为视觉化或结构化内容的过程,无论是用于个人博客、作品集、社交媒体账号还是品牌形象,核心都是围绕“个人特色”展开,以下从定位、内容规划、视觉设计、技术实现四个维度,详细拆解制作个人主题的完整流程,明确主题定位:找到个人特色的核心主题定位是所有工作的起点,需要先回答……

    2025-11-20
    0
  • 社群营销管理关键是什么?

    社群营销的核心在于通过建立有温度、有价值、有归属感的社群,实现用户留存、转化和品牌传播,其管理需贯穿“目标定位-内容运营-用户互动-数据驱动-风险控制”全流程,以下从五个维度展开详细说明:明确社群定位与目标社群管理的首要任务是精准定位,需明确社群的核心价值(如行业交流、产品使用指导、兴趣分享等)、目标用户画像……

    2025-11-20
    0
  • 香港公司网站备案需要什么材料?

    香港公司进行网站备案是一个涉及多部门协调、流程相对严谨的过程,尤其需兼顾中国内地与香港两地的监管要求,由于香港公司注册地与中国内地不同,其网站若主要服务内地用户或使用内地服务器,需根据服务器位置、网站内容性质等,选择对应的备案路径(如工信部ICP备案或公安备案),以下从备案主体资格、流程步骤、材料准备、注意事项……

    2025-11-20
    0
  • 如何企业上云推广

    企业上云已成为数字化转型的核心战略,但推广过程中需结合行业特性、企业痛点与市场需求,构建系统性、多维度的推广体系,以下从市场定位、策略设计、执行落地及效果优化四个维度,详细拆解企业上云推广的实践路径,精准定位:明确目标企业与核心价值企业上云并非“一刀切”的方案,需先锁定目标客户群体,提炼差异化价值主张,客户分层……

    2025-11-20
    0
  • PS设计搜索框的实用技巧有哪些?

    在PS中设计一个美观且功能性的搜索框需要结合创意构思、视觉设计和用户体验考量,以下从设计思路、制作步骤、细节优化及交互预览等方面详细说明,帮助打造符合需求的搜索框,设计前的规划明确使用场景:根据网站或APP的整体风格确定搜索框的调性,例如极简风适合细线条和纯色,科技感适合渐变和发光效果,电商类则可能需要突出搜索……

    2025-11-20
    0

发表回复

您的邮箱地址不会被公开。必填项已用 * 标注