No.9 – 时序数据库随笔 – Apache IoTDB 多条时序数据联合分析

01 问题

本篇我们一起来看看Apache IoTDB如何解决要解决​​No8​​​提到的网友问题,如下:

简单说就是如何处理两条时间线的数值计算?上面例子是一个 “+” 加法。

02 数据准备

要处理多条时序数据分析问题,我们首先先建立两条时间序列,我们做一下数据准备:

  • 启动IoTDB服务实例

我们首先源码编译Master最新代码(),并启动服务实例。如下

iotdbgit:(master) mvncleaninstall-DskipTests
...
...
[INFO] -------------------------------------------
[INFO] BUILDSUCCESS
[INFO] -------------------------------------------
[INFO] Totaltime: 02:41min
[INFO] Finishedat: 2021-04-07T07:13:32+08:00
[INFO] -------------------------------------------
iotdbgit:(master) cddistribution/target/apache-iotdb-0.12.0-SNAPSHOT-server-bin/apache-iotdb-0.12.0-SNAPSHOT-server-bin/
apache-iotdb-0.12.0-SNAPSHOT-server-bingit:(master) sbin/start-server.sh
---------------------
StartingIoTDB
...
...
2021-04-0707:33:50,636 [main] INFOo.a.i.db.service.IoTDB:93-IoTDBhasstarted.
  • 启动Cli
iotdbgit:(master)  cddistribution/target/apache-iotdb-0.12.0-SNAPSHOT-server-bin/apache-iotdb-0.12.0-SNAPSHOT-server-bin/
apache-iotdb-0.12.0-SNAPSHOT-server-bingit:(master) sbin/start-cli.sh-h127.0.0.1-p6667-uroot-pwroot

  • 创建存储组
IoTDB>setstoragegrouptoroot.lemming

  • 创建时间线
IoTDB>createtimeseriesroot.lemming.device1.m1withdatatype=FLOAT,encoding=RLE
IoTDB>createtimeseriesroot.lemming.device1.m2withdatatype=FLOAT,encoding=RLE
...
IoTDB>showtimeseries;

  • 插入时间序列数据
IoTDB>insertintoroot.lemming.device1(timestamp,m1, m2) values(1,3333,4444)
IoTDB>select*fromroot.lemming.device1;

看到这里,不知道大家是否发现,IoTDB虽然在存储层面是按时间序列进行列存储的,但是在API层面已经为大家抽象好来二维表结构,大家可以把多个时间序列当成一张二维度表的不同列,这个抽象大家就免去来入在InfluxDB中利用PIVOT进行行转列的需求,这样就更方便理解和对多时序数据的进行查询分析。好,那么下面我们看看如果实现 m1 + m2 这个需求。

03 代码数据准备

我们一个一个敲命令不如咱直接写个Java代码实例,方便反复测试,测试本篇内容,我们只需要依赖iotdb-jdbc模块,pom如下:

<dependencies>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-jdbc</artifactId>
<version>${iotdb.version}</version>
</dependency>
</dependencies>

创建存储组,时间序列和准备数据的测试代码如下:

publicclassNo9JDBCExample {

/**
* 第一次运行 init(),如果一切顺利,正面环境和数据准备完成
*/
publicstaticvoidmain(String[] args) throwsException {
Class.forName("org.apache.iotdb.jdbc.IoTDBDriver");
try (Connectionconnection=
DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
Statementstatement=connection.createStatement()) {
init(statement);
} catch (IoTDBSQLExceptione) {
e.printStackTrace();
}
}

publicstaticvoidinit(Statementstatement) throwsException {
setStorageGroup(statement);
statement.execute(
"CREATE TIMESERIES root.lemming.device1.m1 with datatype=FLOAT,encoding=RLE");
statement.execute(
"CREATE TIMESERIES root.lemming.device1.m2 with datatype=FLOAT,encoding=RLE");
statement.execute("INSERT INTO root.lemming.device1(timestamp,m1, m2) VALUES (1,3333,4444)");

ResultSetresultSet=statement.executeQuery("SELECT timestamp, m1 as m1, m2 as m2 FROM root.lemming.device1");
outputResult(resultSet);
}

publicstaticvoidsetStorageGroup(Statementstatement) throwsException{
try {
statement.execute("SET STORAGE GROUP TO root.lemming");
}catch (Exceptione){
statement.execute("DELETE STORAGE GROUP root.lemming");
statement.execute("SET STORAGE GROUP TO root.lemming");
}
}

privatestaticvoidoutputResult(ResultSetresultSet) throwsSQLException {
if (resultSet!=null) {
System.out.println("--------------------------");
finalResultSetMetaDatametaData=resultSet.getMetaData();
finalintcolumnCount=metaData.getColumnCount();
for (inti=0; i<columnCount; i++) {
System.out.print(metaData.getColumnLabel(i+1) +" ");
}
System.out.println();
while (resultSet.next()) {
for (inti=1; ; i++) {
System.out.print(resultSet.getString(i));
if (i<columnCount) {
System.out.print(", ");
} else {
System.out.println();
break;
}
}
}
System.out.println("--------------------------\n");
}
}
}

如果你运行得到如下结果,正面一切准备工作就绪:

04 UDF解决加法

我们接下来看看Apache IoTDB如何解决加法问题,目前IoTDB提供来自定义UDF的方式解决用户自定义的操作。如果要使用UDF我们需要增加依赖如下:

<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-server</artifactId>
<version>0.12.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>

完整配置查阅:https://github.com/sunjincheng121/know_how_know_why/blob/master/khkw_iotdb/No9udf/pom.xml

添加依赖之后,我们可以实现如下UDTF(User Defined Timeseries Generating Function), 这里写一下全称,不然大家会以为是User defined Table Function。当然不可否则,UDF的接口我们还可以有改进的空间,我们就目前现状与大家简单说明,如下:

publicinterfaceUDTFextendsUDF {

/**
* This method is mainly used to customize UDTF. In this method, the user can do the following
* things:
*
* <ul>
* <li>Use UDFParameters to get the time series paths and parse key-value pair attributes
* entered by the user.
* <li>Set the strategy to access the original data and set the output data type in
* UDTFConfigurations.
* <li>Create resources, such as establishing external connections, opening files, etc.
* </ul>
*
* <p>This method is called after the UDTF is instantiated and before the beginning of the
* transformation process.
*
* @param parameters used to parse the input parameters entered by the user
* @param configurations used to set the required properties in the UDTF
* @throws Exception the user can throw errors if necessary
*/
@SuppressWarnings("squid:S112")
voidbeforeStart(UDFParametersparameters, UDTFConfigurationsconfigurations) throwsException;

/**
* When the user specifies {@link RowByRowAccessStrategy} to access the original data in {@link
* UDTFConfigurations}, this method will be called to process the transformation. In a single UDF
* query, this method may be called multiple times.
*
* @param row original input data row (aligned by time)
* @param collector used to collect output data points
* @throws Exception the user can throw errors if necessary
* @see RowByRowAccessStrategy
*/
@SuppressWarnings("squid:S112")
defaultvoidtransform(Rowrow, PointCollectorcollector) throwsException {}

/**
* When the user specifies {@link SlidingSizeWindowAccessStrategy} or {@link
* SlidingTimeWindowAccessStrategy} to access the original data in {@link UDTFConfigurations},
* this method will be called to process the transformation. In a single UDF query, this method
* may be called multiple times.
*
* @param rowWindow original input data window (rows inside the window are aligned by time)
* @param collector used to collect output data points
* @throws Exception the user can throw errors if necessary
* @see SlidingSizeWindowAccessStrategy
* @see SlidingTimeWindowAccessStrategy
*/
@SuppressWarnings("squid:S112")
defaultvoidtransform(RowWindowrowWindow, PointCollectorcollector) throwsException {}

/**
* This method will be called once after all {@link UDTF#transform(Row, PointCollector) calls or
* {@link UDTF#transform(RowWindow, PointCollector) calls have been executed. In a single UDF
* query, this method will and will only be called once.
*
* @param collector used to collect output data points
* @throws Exception the user can throw errors if necessary
*/
@SuppressWarnings("squid:S112")
defaultvoidterminate(PointCollectorcollector) throwsException {}
}

其中最核心的是transform方法,对transform的实现逻辑就是我们业务的需求逻辑,面对我们要实现 加法 的逻辑,我们的核心逻辑就是transform实现,如下:

publicclassAddFuncimplementsUDTF {
...
...
@Override
publicvoidtransform(Rowrow, PointCollectorcollector) throwsException {
if (row.isNull(0) ||row.isNull(1)) {
return;
}
collector.putLong(
row.getTime(), (long) (extractDoubleValue(row, 0) +extractDoubleValue(row, 1) +addend));
}

privatedoubleextractDoubleValue(Rowrow, intindex) {
doublevalue;
switch (row.getDataType(index)) {
caseINT32:
value=row.getInt(index);
break;
caseINT64:
value= (double) row.getLong(index);
break;
caseFLOAT:
value=row.getFloat(index);
break;
caseDOUBLE:
value=row.getDouble(index);
break;
default:
thrownewUnSupportedDataTypeException(row.getDataType(index).toString());
}
returnvalue;
}
}

完整代码查阅:https://github.com/sunjincheng121/know_how_know_why/blob/master/khkw_iotdb/No9udf/src/main/java/org/khkw/iotdb/no9/AddFunc.java

当然开发完UDF,你还需要编写查询逻辑,如下:

publicstaticvoidudf(Statementstatement) {
try{
statement.execute("CREATE FUNCTION plus AS \"org.khkw.iotdb.no9.AddFunc\"");
statement.execute("SHOW FUNCTIONS");
ResultSetresultSet=statement.executeQuery("SELECT timestamp, plus(m1, m2) FROM root.lemming.device1");
outputResult(resultSet);
}catch (Exceptione) {
e.printStackTrace();
}
}

如上代码我们注册了名为 plus的UDF,然后plus(m1, m2)的方式实现了加法需求。当然,我们还不能直接运行udf方法,直接运行会报出找不到rg.khkw.iotdb.no9.AddFunc类,如下:

这是因为IoTDB配置udf的JAR加载需要指定JAR所在目录,我们利用 mvn clean package 生成JAR,然后再修改一下iotdb-engine.properties配置文件,如下:

我配置成为自己项目的jar所在路径了,如下:

配置完成之后,重新启动IoTDB服务实例。启动日志你会发现对UDF的加载:

apache-iotdb-0.12.0-SNAPSHOT-server-bingit:(master) sbin/start-server.sh
。。。
。。。
2021-04-0708:17:53,398 [main] INFOo.a.i.d.q.u.s.UDFClassLoaderManager:56-UDFlibroot: /Users/jincheng/work/know_how_know_why/khkw_iotdb/No9udf/target
。。。
2021-04-0712:34:11,622 [main] INFOo.a.i.d.s.t.ThriftService:125-IoTDB: startRPCServerServicesuccessfully, listeningonip0.0.0.0port6667

确保我们自定义的UDF已经被加载之后,我们运行udf,如下:

好,到目前我们利用UDF解决了加法需求,那么UDF是否可以在CLI里面使用呢?

05 CLI使用UDF

当然,我们可以在CLI里面使用UDF,我们启动一个CLI客户端,并用命令 SHOW FUNCTIONS 显示已有的FUNCTIONS,如下:

发现我们自定义的UDF已经在刚才运行Java程序时候注册了,我们可以删除让后在CLI在操作一遍(演示作用),如下:

IoTDB>CREATEFUNCTIONplusAS"org.khkw.iotdb.no9.AddFunc"
Msg: 411: Failedtoregisternon-TEMPORARYUDFPLUS(org.khkw.iotdb.no9.AddFunc), becauseanon-TEMPORARYUDFPLUS(org.khkw.iotdb.no9.AddFunc) withthesamefunctionnameandtheclassnamehasalreadybeenregistered.
IoTDB>DROPFUNCTIONplus
Msg: Thestatementisexecutedsuccessfully.
IoTDB>CREATEFUNCTIONplusAS"org.khkw.iotdb.no9.AddFunc"
Msg: Thestatementisexecutedsuccessfully.
IoTDB>SELECTtimestamp, plus(m1, m2) FROMroot.lemming.device1;
+-----------------------------+------------------------------------------------------+
|Time|plus(root.lemming.device1.m1, root.lemming.device1.m2)|
+-----------------------------+------------------------------------------------------+
|1970-01-01T08:00:00.001+08:00|7777|
+-----------------------------+------------------------------------------------------+
Totallinenumber=1
Itcosts0.552s

OK,到这里不论是Java方式还是CLI方式我们都完成了 多时序数据联合分析的需求。

06 贡献社区

上面我们发现IoTDB利用UDF的方式解决加法问题,虽然是通用做法,但是我们还可以做的更好,比如直接支持 m1 + m2 这种(加/减/乘/除)的便捷运算,所以这是你贡献的好机会,社区JIRA等你take。

https://issues.apache.org/jira/browse/IOTDB-1285

作者介绍

孙金城,社区编辑,Apache Flink PMC 成员,Apache Beam Committer,Apache IoTDB PMC 成员,ALC Beijing 成员,Apache ShenYu 导师,Apache 软件基金会成员。关注技术领域流计算和时序数据存储。

文章来源网络,作者:运维,如若转载,请注明出处:https://shuyeidc.com/wp/228688.html<

(0)
运维的头像运维
上一篇2025-04-18 10:19
下一篇 2025-04-18 10:20

相关推荐

  • 个人主题怎么制作?

    制作个人主题是一个将个人风格、兴趣或专业领域转化为视觉化或结构化内容的过程,无论是用于个人博客、作品集、社交媒体账号还是品牌形象,核心都是围绕“个人特色”展开,以下从定位、内容规划、视觉设计、技术实现四个维度,详细拆解制作个人主题的完整流程,明确主题定位:找到个人特色的核心主题定位是所有工作的起点,需要先回答……

    2025-11-20
    0
  • 社群营销管理关键是什么?

    社群营销的核心在于通过建立有温度、有价值、有归属感的社群,实现用户留存、转化和品牌传播,其管理需贯穿“目标定位-内容运营-用户互动-数据驱动-风险控制”全流程,以下从五个维度展开详细说明:明确社群定位与目标社群管理的首要任务是精准定位,需明确社群的核心价值(如行业交流、产品使用指导、兴趣分享等)、目标用户画像……

    2025-11-20
    0
  • 香港公司网站备案需要什么材料?

    香港公司进行网站备案是一个涉及多部门协调、流程相对严谨的过程,尤其需兼顾中国内地与香港两地的监管要求,由于香港公司注册地与中国内地不同,其网站若主要服务内地用户或使用内地服务器,需根据服务器位置、网站内容性质等,选择对应的备案路径(如工信部ICP备案或公安备案),以下从备案主体资格、流程步骤、材料准备、注意事项……

    2025-11-20
    0
  • 如何企业上云推广

    企业上云已成为数字化转型的核心战略,但推广过程中需结合行业特性、企业痛点与市场需求,构建系统性、多维度的推广体系,以下从市场定位、策略设计、执行落地及效果优化四个维度,详细拆解企业上云推广的实践路径,精准定位:明确目标企业与核心价值企业上云并非“一刀切”的方案,需先锁定目标客户群体,提炼差异化价值主张,客户分层……

    2025-11-20
    0
  • PS设计搜索框的实用技巧有哪些?

    在PS中设计一个美观且功能性的搜索框需要结合创意构思、视觉设计和用户体验考量,以下从设计思路、制作步骤、细节优化及交互预览等方面详细说明,帮助打造符合需求的搜索框,设计前的规划明确使用场景:根据网站或APP的整体风格确定搜索框的调性,例如极简风适合细线条和纯色,科技感适合渐变和发光效果,电商类则可能需要突出搜索……

    2025-11-20
    0

发表回复

您的邮箱地址不会被公开。必填项已用 * 标注