聊聊 Flink SQL增量查询Hudi表

​官网文档

地址:https://hudi.apache.org/cn/docs/querying_data#incremental-query

参数

  • read.start-commit 增量查询开始时间 对于流读,如果不指定该值,默认取最新的instantTime,也就是流读默认从最新的instantTime开始读(包含最新的)。对于批读,如果不指定该参数,只指定read.end-commit,则实现时间旅行的功能,可查询历史记录
  • read.end-commit 增量查询结束时间 不指定该参数则默认读取到最新的记录,该参数一般只适用于批读,因为流读一般的需求是查询所有的增量数据
  • read.streaming.enabled 是否流读 默认false
  • read.streaming.check-interval  流读的检查时间间隔,单位秒(s),默认值60,也就是一分钟查询范围 [BEGIN_INSTANTTIME,END_INSTANTTIME],既包含开始时间又包含结束时间,对于默认值可参考上面的参数说明

版本

建表造数:

  • Hudi 0.9.0
  • Spark 2.4.5

我这里建表造数使用Hudi Spark SQL 0.9.0,目的是为了模拟项目上用Java Client和Spark SQL创建的Hudi表,以验证Hudi Flink SQL增量查询时是否兼容旧版本的Hudi表(大家没有这种需求的,可以使用任何方式正常造数)

查询

  • Hudi 0.13.0-SNAPSHOT
  • Flink 1.14.3 (增量查询)
  • Spark 3.1.2 (主要是为了使用Call Procedures命令查看commit信息)

建表造数

-- Spark SQL Hudi 0.9.0
createtable hudi.test_flink_incremental(
id int,
name string,
price double,
ts long,
dt string
) using hudi
partitioned by(dt)
options (
primaryKey ='id',
preCombineField ='ts',
type ='cow'
);

insertinto hudi.test_flink_incrementalvalues(1,'a1',10,1000,'2022-11-25');
insertinto hudi.test_flink_incrementalvalues(2,'a2',20,2000,'2022-11-25');
update hudi.test_flink_incrementalset name='hudi2_update'where id =2;
insertinto hudi.test_flink_incrementalvalues(3,'a3',30,3000,'2022-11-26');
insertinto hudi.test_flink_incrementalvalues(4,'a4',40,4000,'2022-12-26');

用show_commits看一下有哪些commits(这里查询用的是Hudi的master,因为show_commits是在0.11.0版本开始支持的,也可以通过使用hadoop命令查看.hoodie文件夹下的.commit文件)

call show_commits(table=>'hudi.test_flink_incremental');
20221205152736
20221205152723
20221205152712
20221205152702
20221205152650

Flink SQL创建Hudi内存表

CREATETABLE test_flink_incremental (
id int PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
price double,
ts bigint,
dt VARCHAR(10)
)
PARTITIONED BY(dt)
WITH (
'connector'='hudi',
'path'='hdfs://cluster1/warehouse/tablespace/managed/hive/hudi.db/test_flink_incremental'
);

建表时不指定增量查询相关的参数,我们在查询时动态指定,这样比较灵活。动态指定参数方法,在查询语句后面加上如下形式的语句

/*+ 
options(
'read.start-commit' = '20221205152723',
'read.end-commit'='20221205152736'
)
*/

批读

Flink SQL读Hudi有两种模式:批读和流读。默认批读,先看一下批读的增量查询

验证是否包含起始时间和默认结束时间

select*from test_flink_incremental 
/*+
options(
'read.start-commit' = '20221205152723' --起始时间对应id=3的记录
)
*/

结果包含起始时间,不指定结束时间默认读到最新的数据

id   name     price        ts                 dt
4 a4 40.04000 dt=2022-12-26
3 a3 30.03000 dt=2022-11-26

验证是否包含结束时间

select*from test_flink_incremental 
/*+
options(
'read.start-commit' = '20221205152712', --起始时间对应id=2的记录
'read.end-commit'='20221205152723' --结束时间对应id=3的记录
)
*/

结果包含结束时间

id           name        price       ts                 dt
3 a3 30.03000 dt=2022-11-26
2 hudi2_update 20.02000 dt=2022-11-25

验证默认开始时间

这种情况是指定结束时间,但不指定开始时间,如果都不指定,则读表所有的最新版本的记录。

select*from test_flink_incremental 
/*+
options(
'read.end-commit'='20221205152712' --结束时间对应id=2的更新记录
)
*/

结果:只查询end-commit对应的记录

id           name        price       ts                 dt
2 hudi2_update 20.02000 dt=2022-11-25

时间旅行(查询历史记录)

验证是否可以查询历史记录,我们更新id为2的name,更新前name为a2,更新后为hudi2_update,我们验证一下,是否可以通过FlinkSQL查询Hudi历史记录,预期结果查出id=2,name=a2

select*from test_flink_incremental 
/*+
options(
'read.end-commit'='20221205152702' --结束时间对应id=2的历史记录
)
*/

结果:可以正确查询历史记录

id           name        price       ts                 dt
2 a2 20.02000 dt=2022-11-25

流读

开启流读的参数:

read.streaming.enabled=true

流读不需要设置结束时间,因为一般的需求是读所有的增量数据,我们只需要验证开始时间就好了

验证默认开始时间

select*from test_flink_incremental 
/*+
options(
'read.streaming.enabled'='true',
'read.streaming.check-interval' = '4'
)
*/

结果:从最新的instantTime开始增量读取,也就是默认的read.start-commit为最新的instantTime

id   name     price        ts                 dt
4 a4 40.04000 dt=2022-12-26

验证指定开始时间

select*from test_flink_incremental 
/*+
options(
'read.streaming.enabled'='true',
'read.streaming.check-interval' = '4',
'read.start-commit' = '20221205152712'
)
*/

结果:

id           name        price       ts                 dt
2 hudi2_update 20.02000 dt=2022-11-25
3 a3 30.03000 dt=2022-11-26
4 a4 40.04000 dt=2022-11-26

如果想第一次查询全部的历史数据,可以将start-commit设置的早一点,比如设置到去年:’read.start-commit’ = ‘20211205152712’

select*from test_flink_incremental 
/*+
options(
'read.streaming.enabled'='true',
'read.streaming.check-interval' = '4',
'read.start-commit' = '20211205152712'
)
*/
id           name        price       ts                 dt
1 a1 10.01000 dt=2022-11-25
2 hudi2_update 20.02000 dt=2022-11-25
3 a3 30.03000 dt=2022-11-26
4 a4 40.04000 dt=2022-11-26

验证流读的连续性

验证新的增量数据进来,是否可以持续消费Hudi增量数据,验证数据的准确一致性,为了方便验证,我可以使用Flink SQL增量流读Hudi表然后Sink到MySQL表中,最后通过读取MySQL表中的数据验证数据的准确性

Flink SQL读写MySQL需要配置jar包,将flink-connector-jdbc_2.12-1.14.3.jar​放到lib​下即可,下载地址:https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc_2.12/1.14.3/flink-connector-jdbc_2.12-1.14.3.jar

先在MySQL中创建一张Sink表

-- MySQL
CREATETABLE `test_sink` (
`id` int(11),
`name` text DEFAULT NULL,
`price` int(11),
`ts` int(11),
`dt` text DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

Flink中创建对应的sink表

createtable test_sink (
id int,
name string,
price double,
ts bigint,
dt string
) with (
'connector'='jdbc',
'url'='jdbc:mysql://192.468.44.128:3306/hudi?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8',
'username'='root',
'password'='root-123',
'table-name'='test_sink',
'sink.buffer-flush.max-rows'='1'
);

然后流式增量读取Hudi表Sink Mysql

insertinto test_sink
select*from test_flink_incremental
/*+
options(
'read.streaming.enabled'='true',
'read.streaming.check-interval' = '4',
'read.start-commit' = '20221205152712'
)
*/

这样会起一个长任务,一直处于running状态,我们可以在yarn-session界面上验证这一点

然后先在MySQL中验证一下历史数据的准确性

再利用Spark SQL往source表插入两条数据

-- Spark SQL
insertinto hudi.test_flink_incrementalvalues(5,'a5',50,5000,'2022-12-07');
insertinto hudi.test_flink_incrementalvalues(6,'a6',60,6000,'2022-12-07');

我们增量读取的间隔设置的4s,成功插入数据等待4s后,再在MySQL表中验证一下数据

发现新增的数据已经成功Sink到MySQL中了,并且数据没有重复

最后验证一下更新的增量数据,Spark SQL更新Hudi source表

-- Spark SQL
update hudi.test_flink_incrementalset name='hudi5_update'where id =5;

继续验证结果

结果是更新的增量数据也会insert到MySQL中的sink表,但是不会更新原来的数据

那如果想实现更新的效果呢?我们需要在MySQL和Flink的sink表中加上主键字段,两者缺一不可,如下:

-- MySQL
CREATETABLE `test_sink` (
`id` int(11),
`name` text DEFAULT NULL,
`price` int(11),
`ts` int(11),
`dt` text DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- Flink SQL
createtable test_sink (
id int PRIMARY KEY NOT ENFORCED,
name string,
price double,
ts bigint,
dt string
) with (
'connector'='jdbc',
'url'='jdbc:mysql://192.468.44.128:3306/hudi?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8',
'username'='root',
'password'='root-123',
'table-name'='test_sink',
'sink.buffer-flush.max-rows'='1'
);

将刚才起的长任务关掉,重新执行刚才的insert语句,先跑一下历史数据,最后再验证一下增量效果

-- Spark SQL
update hudi.test_flink_incremental set name='hudi6_update' where id = 6;
insert into hudi.test_flink_incremental values (7,'a7', 70, 7000, '2022-12-07');

可以看到,达到了预期效果,对于id=6的执行更新操作,对于id=7的执行插入操作。

文章来源网络,作者:运维,如若转载,请注明出处:https://shuyeidc.com/wp/252871.html<

(0)
运维的头像运维
上一篇2025-04-29 15:37
下一篇 2025-04-29 15:38

相关推荐

  • 个人主题怎么制作?

    制作个人主题是一个将个人风格、兴趣或专业领域转化为视觉化或结构化内容的过程,无论是用于个人博客、作品集、社交媒体账号还是品牌形象,核心都是围绕“个人特色”展开,以下从定位、内容规划、视觉设计、技术实现四个维度,详细拆解制作个人主题的完整流程,明确主题定位:找到个人特色的核心主题定位是所有工作的起点,需要先回答……

    2025-11-20
    0
  • 社群营销管理关键是什么?

    社群营销的核心在于通过建立有温度、有价值、有归属感的社群,实现用户留存、转化和品牌传播,其管理需贯穿“目标定位-内容运营-用户互动-数据驱动-风险控制”全流程,以下从五个维度展开详细说明:明确社群定位与目标社群管理的首要任务是精准定位,需明确社群的核心价值(如行业交流、产品使用指导、兴趣分享等)、目标用户画像……

    2025-11-20
    0
  • 香港公司网站备案需要什么材料?

    香港公司进行网站备案是一个涉及多部门协调、流程相对严谨的过程,尤其需兼顾中国内地与香港两地的监管要求,由于香港公司注册地与中国内地不同,其网站若主要服务内地用户或使用内地服务器,需根据服务器位置、网站内容性质等,选择对应的备案路径(如工信部ICP备案或公安备案),以下从备案主体资格、流程步骤、材料准备、注意事项……

    2025-11-20
    0
  • 如何企业上云推广

    企业上云已成为数字化转型的核心战略,但推广过程中需结合行业特性、企业痛点与市场需求,构建系统性、多维度的推广体系,以下从市场定位、策略设计、执行落地及效果优化四个维度,详细拆解企业上云推广的实践路径,精准定位:明确目标企业与核心价值企业上云并非“一刀切”的方案,需先锁定目标客户群体,提炼差异化价值主张,客户分层……

    2025-11-20
    0
  • PS设计搜索框的实用技巧有哪些?

    在PS中设计一个美观且功能性的搜索框需要结合创意构思、视觉设计和用户体验考量,以下从设计思路、制作步骤、细节优化及交互预览等方面详细说明,帮助打造符合需求的搜索框,设计前的规划明确使用场景:根据网站或APP的整体风格确定搜索框的调性,例如极简风适合细线条和纯色,科技感适合渐变和发光效果,电商类则可能需要突出搜索……

    2025-11-20
    0

发表回复

您的邮箱地址不会被公开。必填项已用 * 标注