Flink SQL 知其所以然:Explain、Show、Load、Set 子句

EXPLAIN 子句

大家好,我是老羊,今天我们来学习 Flink SQL 中的的 Explain、Show、Load、Set 共 4 个子句。

  1. 应用场景:EXPLAIN 子句其实就是用于查看当前这个 sql 查询的逻辑计划以及优化的执行计划。
  2. SQL 语法标准:
EXPLAIN PLAN FOR <query_statement_or_insert_statement>
  1. 实际案例:
public class Explain_Test {
public static void main(String[] args) throws Exception {
FlinkEnv flinkEnv = FlinkEnvUtils.getStreamTableEnv(args);
flinkEnv.env().setParallelism(1);
String sql ="CREATE TABLE source_table (\n"
+" user_id BIGINT COMMENT '用户 id',\n"
+" name STRING COMMENT '用户姓名',\n"
+" server_timestamp BIGINT COMMENT '用户访问时间戳',\n"
+" proctime AS PROCTIME()\n"
+") WITH (\n"
+" 'connector' = 'datagen',\n"
+" 'rows-per-second' = '1',\n"
+" 'fields.name.length' = '1',\n"
+" 'fields.user_id.min' = '1',\n"
+" 'fields.user_id.max' = '10',\n"
+" 'fields.server_timestamp.min' = '1',\n"
+" 'fields.server_timestamp.max' = '100000'\n"
+");\n"
+"\n"
+"CREATE TABLE sink_table (\n"
+" user_id BIGINT,\n"
+" name STRING,\n"
+" server_timestamp BIGINT\n"
+") WITH (\n"
+" 'connector' = 'print'\n"
+");\n"
+"\n"
+"EXPLAIN PLAN FOR\n"
+"INSERT INTO sink_table\n"
+"select user_id,\n"
+" name,\n"
+" server_timestamp\n"
+"from (\n"
+" SELECT\n"
+" user_id,\n"
+" name,\n"
+" server_timestamp,\n"
+" row_number() over(partition by user_id order by proctime) as rn\n"
+" FROM source_table\n"
+")\n"
+"where rn = 1";
/**
* 算子 {@link org.apache.flink.streaming.api.operators.KeyedProcessOperator}
* -- {@link org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepFirstRowFunction}
*/
for (String innerSql : sql.split(";")){
TableResult tableResult = flinkEnv.streamTEnv().executeSql(innerSql);
tableResult.print();
}
}
}

上述代码执行结果如下:

1. 抽象语法树
== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.sink_table], fields=[user_id, name, server_timestamp])
+- LogicalProject(user_id=[$0], name=[$1], server_timestamp=[$2])
+- LogicalFilter(condition=[=($3,1)])
+- LogicalProject(user_id=[$0], name=[$1], server_timestamp=[$2], rn=[ROW_NUMBER() OVER (PARTITION BY $0 ORDERBY PROCTIME() NULLS FIRST)])
+- LogicalTableScan(table=[[default_catalog, default_database, source_table]])

2. 优化后的物理计划
== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.sink_table], fields=[user_id, name, server_timestamp])
+- Calc(select=[user_id, name, server_timestamp])
+- Deduplicate(keep=[FirstRow], key=[user_id],order=[PROCTIME])
+- Exchange(distribution=[hash[user_id]])
+- Calc(select=[user_id, name, server_timestamp, PROCTIME()AS $3])
+- TableSourceScan(table=[[default_catalog, default_database, source_table]], fields=[user_id, name, server_timestamp])

3. 优化后的执行计划
== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.sink_table], fields=[user_id, name, server_timestamp])
+- Calc(select=[user_id, name, server_timestamp])
+- Deduplicate(keep=[FirstRow], key=[user_id],order=[PROCTIME])
+- Exchange(distribution=[hash[user_id]])
+- Calc(select=[user_id, name, server_timestamp, PROCTIME()AS $3])
+- TableSourceScan(table=[[default_catalog, default_database, source_table]], fields=[user_id, name, server_timestamp])

USE 子句

  1. 应用场景:如果熟悉 MySQL 的同学会非常熟悉这个子句,在 MySQL 中,USE 子句通常被用于切换库,那么在 Flink SQL 体系中,它的作用也是和 MySQL 中 USE 子句的功能基本一致,用于切换 Catalog,DataBase,使用 Module。
  2. SQL 语法标准:
  • 切换 Catalog:
USE CATALOG catalog_name
  • 使用 Module:
USE MODULES module_name1[, module_name2, ...]
  • 切换 Database:
USE db名称
  1. 实际案例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
//create a catalog
tEnv.executeSql("CREATE CATALOG cat1 WITH (...)");
tEnv.executeSql("SHOW CATALOGS").print();
//+-----------------+
//| catalog name |
//+-----------------+
//| default_catalog |
//| cat1 |
//+-----------------+
// change default catalog
tEnv.executeSql("USE CATALOG cat1");
tEnv.executeSql("SHOW DATABASES").print();
// databases are empty
//+---------------+
//| database name |
//+---------------+
//+---------------+
//create a database
tEnv.executeSql("CREATE DATABASE db1 WITH (...)");
tEnv.executeSql("SHOW DATABASES").print();
//+---------------+
//| database name |
//+---------------+
//| db1 |
//+---------------+
// change default database
tEnv.executeSql("USE db1");
// change module resolution orderand enabled status
tEnv.executeSql("USE MODULES hive");
tEnv.executeSql("SHOW FULL MODULES").print();
//+-------------+-------+
//| module name | used |
//+-------------+-------+
//| hive |true|
//| core |false|
//+-------------+-------+

SHOW 子句

  1. 应用场景:如果熟悉 MySQL 的同学会非常熟悉这个子句,在 MySQL 中,SHOW 子句常常用于查询库、表、函数等,在 Flink SQL 体系中也类似。Flink SQL 支持 SHOW 以下内容。
  2. SQL 语法标准:
SHOW CATALOGS:展示所有 Catalog
SHOW CURRENT CATALOG:展示当前的 Catalog
SHOW DATABASES:展示当前 Catalog 下所有 Database
SHOW CURRENT DATABASE:展示当前的 Database
SHOW TABLES:展示当前 Database 下所有表
SHOW VIEWS:展示所有视图
SHOW FUNCTIONS:展示所有的函数
SHOW MODULES:展示所有的 Module(Module 是用于 UDF 扩展)
  1. 实际案例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// show catalogs
tEnv.executeSql("SHOW CATALOGS").print();
//+-----------------+
//| catalog name |
//+-----------------+
//| default_catalog |
//+-----------------+
// show current catalog
tEnv.executeSql("SHOW CURRENT CATALOG").print();
//+----------------------+
//| current catalog name |
//+----------------------+
//| default_catalog |
//+----------------------+
// show databases
tEnv.executeSql("SHOW DATABASES").print();
//+------------------+
//| database name |
//+------------------+
//| default_database |
//+------------------+
// show current database
tEnv.executeSql("SHOW CURRENT DATABASE").print();
//+-----------------------+
//| current database name |
//+-----------------------+
//| default_database |
//+-----------------------+
//create a table
tEnv.executeSql("CREATE TABLE my_table (...) WITH (...)");
// show tables
tEnv.executeSql("SHOW TABLES").print();
//+------------+
//|table name |
//+------------+
//| my_table |
//+------------+
//create a view
tEnv.executeSql("CREATE VIEW my_view AS ...");
// show views
tEnv.executeSql("SHOW VIEWS").print();
//+-----------+
//| view name |
//+-----------+
//| my_view |
//+-----------+
// show functions
tEnv.executeSql("SHOW FUNCTIONS").print();
//+---------------+
//| function name |
//+---------------+
//| mod |
//| sha256 |
//| ... |
//+---------------+
//create a user defined function
tEnv.executeSql("CREATE FUNCTION f1 AS ...");
// show user defined functions
tEnv.executeSql("SHOW USER FUNCTIONS").print();
//+---------------+
//| function name |
//+---------------+
//| f1 |
//| ... |
//+---------------+
// show modules
tEnv.executeSql("SHOW MODULES").print();
//+-------------+
//| module name |
//+-------------+
//| core |
//+-------------+
// show full modules
tEnv.executeSql("SHOW FULL MODULES").print();
//+-------------+-------+
//| module name | used |
//+-------------+-------+
//| core |true|
//| hive |false|
//+-------------+-------+

LOAD、UNLOAD 子句

  1. 应用场景:我们可以使用 LOAD 子句去加载 Flink SQL 体系内置的或者用户自定义的 Module,UNLOAD 子句去卸载 Flink SQL 体系内置的或者用户自定义的 Module。
  2. SQL 语法标准:
-- 加载
LOAD MODULE module_name [WITH ('key1'='val1','key2'='val2', ...)]
-- 卸载
UNLOAD MODULE module_name
  1. 实际案例:
  • LOAD 案例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// 加载 Flink SQL 体系内置的 Hive module
tEnv.executeSql("LOAD MODULE hive WITH ('hive-version' = '3.1.2')");
tEnv.executeSql("SHOW MODULES").print();
//+-------------+
//| module name |
//+-------------+
//| core |
//| hive |
//+-------------+
  • UNLOAD 案例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// 卸载唯一的一个 CoreModule
tEnv.executeSql("UNLOAD MODULE core");
tEnv.executeSql("SHOW MODULES").print();
// 结果啥 Moudle 都没有了

SET、RESET 子句

  1. 应用场景:SET 子句可以用于修改一些 Flink SQL 的环境配置,RESET 子句是可以将所有的环境配置恢复成默认配置,但只能在 SQL CLI 中进行使用,主要是为了让用户更纯粹的使用 SQL 而不必使用其他方式或者切换系统环境。
  2. SQL 语法标准:
SET(key = value)?
RESET (key)?
  1. 实际案例:

启动一个 SQL CLI 之后,在 SQL CLI 中可以进行以下 SET 设置:

Flink SQL>SETtable.planner= blink;
[INFO] Session property has been set.
Flink SQL>SET;
table.planner=blink;
Flink SQL> RESET table.planner;
[INFO] Session property has been reset.
Flink SQL> RESET;
[INFO] All session properties have been set to their default values.

文章来源网络,作者:运维,如若转载,请注明出处:https://shuyeidc.com/wp/262909.html<

(0)
运维的头像运维
上一篇2025-05-04 07:56
下一篇 2025-05-04 07:57

相关推荐

  • 个人主题怎么制作?

    制作个人主题是一个将个人风格、兴趣或专业领域转化为视觉化或结构化内容的过程,无论是用于个人博客、作品集、社交媒体账号还是品牌形象,核心都是围绕“个人特色”展开,以下从定位、内容规划、视觉设计、技术实现四个维度,详细拆解制作个人主题的完整流程,明确主题定位:找到个人特色的核心主题定位是所有工作的起点,需要先回答……

    2025-11-20
    0
  • 社群营销管理关键是什么?

    社群营销的核心在于通过建立有温度、有价值、有归属感的社群,实现用户留存、转化和品牌传播,其管理需贯穿“目标定位-内容运营-用户互动-数据驱动-风险控制”全流程,以下从五个维度展开详细说明:明确社群定位与目标社群管理的首要任务是精准定位,需明确社群的核心价值(如行业交流、产品使用指导、兴趣分享等)、目标用户画像……

    2025-11-20
    0
  • 香港公司网站备案需要什么材料?

    香港公司进行网站备案是一个涉及多部门协调、流程相对严谨的过程,尤其需兼顾中国内地与香港两地的监管要求,由于香港公司注册地与中国内地不同,其网站若主要服务内地用户或使用内地服务器,需根据服务器位置、网站内容性质等,选择对应的备案路径(如工信部ICP备案或公安备案),以下从备案主体资格、流程步骤、材料准备、注意事项……

    2025-11-20
    0
  • 如何企业上云推广

    企业上云已成为数字化转型的核心战略,但推广过程中需结合行业特性、企业痛点与市场需求,构建系统性、多维度的推广体系,以下从市场定位、策略设计、执行落地及效果优化四个维度,详细拆解企业上云推广的实践路径,精准定位:明确目标企业与核心价值企业上云并非“一刀切”的方案,需先锁定目标客户群体,提炼差异化价值主张,客户分层……

    2025-11-20
    0
  • PS设计搜索框的实用技巧有哪些?

    在PS中设计一个美观且功能性的搜索框需要结合创意构思、视觉设计和用户体验考量,以下从设计思路、制作步骤、细节优化及交互预览等方面详细说明,帮助打造符合需求的搜索框,设计前的规划明确使用场景:根据网站或APP的整体风格确定搜索框的调性,例如极简风适合细线条和纯色,科技感适合渐变和发光效果,电商类则可能需要突出搜索……

    2025-11-20
    0

发表回复

您的邮箱地址不会被公开。必填项已用 * 标注