
准备工作
这里假设,你已经在 k8s 上部署好了基于 Citus 扩展的分布式 PostgreSQL 集群。
查看 Citus 集群(kubectl get po -n citus),1 个 Coordinator(协调器) 节点 + 3 个 Worker(工作器) 节点。
NAME READY STATUS RESTARTS AGE
citus-coordinator-02/2 Running 03h55m
citus-worker-02/2 Running 022m
citus-worker-12/2 Running 021m
citus-worker-22/2 Running 021m
进入 coordinator 节点(kubectl -n citus exec -it citus-coordinator-0 — bash),查看活动的 worker 节点(psql ‘host=citus-coordinator user=postgres’ -c “SELECT * FROM citus_get_active_worker_nodes();”)。
node_name | node_port
-----------------------------------------------------+-----------
citus-worker-1.citus-worker.citus.svc.cluster.local|6432
citus-worker-2.citus-worker.citus.svc.cluster.local|6432
citus-worker-0.citus-worker.citus.svc.cluster.local|6432
(3 rows)
一旦拥有 Citus 集群,就可以开始创建分布式表、引用表和使用列存储。
创建分布式表
create_distributed_table 将在本地或工作节点之间透明地切分您的表。
进入命令行工具:psql ‘host=citus-coordinator user=postgres’。
建表
CREATETABLE events (
device_id bigint,
event_id bigserial,
event_time timestamptz default now(),
data jsonb notnull,
PRIMARY KEY (device_id, event_id)
);
-- 将事件表分布在本地或工作节点上的分片上
SELECT create_distributed_table('events','device_id');
执行此操作后,对特定设备 ID 的查询将有效地路由到单个工作节点,而跨设备 ID 的查询将在集群中并行化。
插入一些事件
INSERTINTO events (device_id, data)
SELECT s %100,('{"measurement":'||random()||'}')::jsonb FROM generate_series(1,1000000) s;
-- INSERT 0 1000000
获取设备 1 的最后 3 个事件,路由到单个节点
命令行开启计时:postgres=# \timing。
SELECT*FROM events WHERE device_id =1ORDERBY event_time DESC, event_id DESCLIMIT3;
device_id | event_id | event_time | data
-----------+----------+-------------------------------+-------------------------------------
1|999901|2022-03-2402:30:50.205478+00|{"measurement":0.8822990134507691}
1|999801|2022-03-2402:30:50.205478+00|{"measurement":0.5239176115816448}
1|999701|2022-03-2402:30:50.205478+00|{"measurement":0.9900647926398349}
(3 rows)
Time:4.779 ms
解释跨分片并行化的查询的计划,以下显示了查询其中一个分片的计划以及如何完成跨分片的聚合
执行 sql 语句:
EXPLAIN (VERBOSE ON)SELECTcount(*)FROM events;
QUERY PLAN
---------------------------------------------------------------------------------------------------------
Aggregate (cost=250.00..250.02 rows=1 width=8)
Output: COALESCE((pg_catalog.sum(remote_scan.count))::bigint,'0'::bigint)
-> Custom Scan (Citus Adaptive)(cost=0.00..0.00 rows=100000 width=8)
Output: remote_scan.count
Task Count:32
Tasks Shown: One of 32
-> Task
Query:SELECTcount(*)AScountFROM public.events_102008 events WHEREtrue
Node: host=citus-worker-0.citus-worker.citus.svc.cluster.local port=6432 dbname=postgres
-> Aggregate (cost=725.00..725.01 rows=1 width=8)
Output:count(*)
-> Seq Scan on public.events_102008 events (cost=0.00..650.00 rows=30000 width=0)
Output: device_id, event_id, event_time, data
(13 rows)
Time:5.427 ms
使用共置创建分布式表
具有相同分布列的分布式表可以位于同一位置,以实现分布式表之间的高性能分布式连接(join)和外键。默认情况下,分布式表将根据分布列的类型位于同一位置,但您可以使用 create_distributed_table 中的 colocate_with 参数显式定义同一位置。
建表
CREATETABLE devices (
device_id bigint primary key,
device_name text,
device_type_id int
);
CREATE INDEX ON devices (device_type_id);
-- 将设备表与事件表放在一起
SELECT create_distributed_table('devices','device_id', colocate_with :='events');
插入设备元数据
INSERTINTO devices (device_id, device_name, device_type_id)
SELECT s,'device-'||s,55FROM generate_series(0,99) s;
可选:确保应用程序只能插入已知设备的事件
ALTERTABLE events ADD CONSTRAINT device_id_fk
FOREIGN KEY (device_id) REFERENCES devices (device_id);
获得跨分片并行的所有类型 55 设备的平均测量值
SELECT avg((data->>'measurement')::doubleprecision)
FROM events JOIN devices USING (device_id)
WHERE device_type_id =55;
avg
--------------------
0.4997412230952178
(1 row)
Time:122.548 ms
Co-location 还可以帮助您扩展 INSERT..SELECT、存储过程和分布式事务。
INSERT..SELECT:
https://docs.citusdata.com/en/stable/articles/aggregation.html。
存储过程:
https://www.citusdata.com/blog/2020/11/21/making-postgres-stored-procedures-9x-faster-in-citus/。
分布式事务:
https://www.citusdata.com/blog/2017/06/02/scaling-complex-sql-transactions/。
创建引用表
当您需要不包含分布列的快速 join 或外键时,您可以使用 create_reference_table 在集群中的所有节点之间复制表。
建表
CREATETABLE device_types (
device_type_id int primary key,
device_type_name textnotnull unique
);
跨所有节点复制表以在任何列上启用外键和 join
SELECT create_reference_table('device_types');
插入设备类型
INSERTINTO device_types (device_type_id, device_type_name)VALUES(55,'laptop');
可选:确保应用程序只能插入已知类型的设备
ALTERTABLE devices ADD CONSTRAINT device_type_fk
FOREIGN KEY (device_type_id) REFERENCES device_types (device_type_id);
获取类型名称以笔记本电脑开头的设备的最后 3 个事件,跨分片并行
SELECT device_id, event_time, data->>'measurement'AS value, device_name, device_type_name
FROM events JOIN devices USING (device_id)JOIN device_types USING (device_type_id)
WHERE device_type_name LIKE'laptop%'ORDERBY event_time DESCLIMIT3;
device_id | event_time | value | device_name | device_type_name
-----------+-------------------------------+---------------------+-------------+------------------
31|2022-03-2402:30:50.205478+00|0.9994211581289107| device-31| laptop
31|2022-03-2402:30:50.205478+00|0.13771543211483106| device-31| laptop
88|2022-03-2402:30:50.205478+00|0.5585740912470349| device-88| laptop
(3 rows)
Time:96.537 ms
引用表使您能够扩展复杂的数据模型并充分利用关系数据库的功能。
使用列式存储创建表
要在 PostgreSQL 数据库中使用列式存储,您只需将 USING columnar 添加到 CREATE TABLE 语句中,您的数据将使用列式访问方法自动压缩。
建表
CREATETABLE events_columnar (
device_id bigint,
event_id bigserial,
event_time timestamptz default now(),
data jsonb notnull
)
USING columnar;
插入一些数据
INSERTINTO events_columnar (device_id, data)
SELECT d,'{"hello":"columnar"}'FROM generate_series(1,10000000) d;
创建一个基于行的表进行比较
CREATETABLE events_row ASSELECT*FROM events_columnar;
查看表大小
注意 events_row(806 MB) 与 events_columnar(25 MB) 的对比。压缩了几十倍,效果非常的惊人,大大节省了存储空间。
postgres=# \d+
List of relations
Schema | Name | Type | Owner | Persistence | Access method | Size | Description
--------+------------------------------+----------+----------+-------------+---------------+------------+-------------
public | citus_tables | view | postgres | permanent ||0 bytes |
public | device_types |table| postgres | permanent | heap |8192 bytes |
public | devices |table| postgres | permanent | heap |8192 bytes |
public | events |table| postgres | permanent | heap |8192 bytes |
public | events_columnar |table| postgres | permanent | columnar |25 MB |
public | events_columnar_event_id_seq | sequence | postgres | permanent ||8192 bytes |
public | events_event_id_seq | sequence | postgres | permanent ||8192 bytes |
public | events_row |table| postgres | permanent | heap |806 MB |
(8 rows)
您可以单独使用列存储,也可以在分布式表中使用,以结合压缩和分布式查询引擎的优势。
使用列式存储时,您应该只使用 COPY 或 INSERT..SELECT 批量加载数据以实现良好的压缩。柱状表目前不支持更新、删除和外键。但是,您可以使用分区表,其中较新的分区使用基于行的存储,而较旧的分区使用列存储进行压缩。
文章来源网络,作者:运维,如若转载,请注明出处:https://shuyeidc.com/wp/254029.html<