Citus 多节点部署及分布式表的使用

1. 环境准备

centos11 centos12 centos13
Coordinator Worker Worker
PG15、Citus12 PG15、Citus12 PG15、Citus12

2. Citus 多节点部署

  • 所有节点都执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
$ curl https://install.citusdata.com/community/rpm.sh | sudo bash
$ sudo yum install -y citus120_15
$ sudo service postgresql-15 initdb || sudo /usr/pgsql-15/bin/postgresql-15-setup initdb
$ echo "shared_preload_libraries = 'citus'" | sudo tee -a /var/lib/pgsql/15/data/postgresql.conf

$ vim /var/lib/pgsql/15/data/postgresql.conf
listen_addresses = '*'
$ vim /var/lib/pgsql/15/data/pg_hba.conf
# TYPE DATABASE USER ADDRESS METHOD
# "local" is for Unix domain socket connections only
local all all peer
# # IPv4 local connections:
host all all 127.0.0.1/32 scram-sha-256
# # IPv6 local connections:
host all all ::1/128 scram-sha-256
# # Allow replication connections from localhost, by a user with the
# # replication privilege.
local replication all peer
host all all 127.0.0.1/32 trust
host all all ::1/128 trust

host all all 192.168.7.0/24 trust
host all all 127.0.0.1/32 trust
host all all ::1/128 trust
$ systemctl restart postgresql-15
$ systemctl enable postgresql-15

$ sudo -i -u postgres psql -c "CREATE EXTENSION citus;"
  • Coordinator 节点执行
1
2
3
4
5
6
7
8
9
$ sudo -i -u postgres psql -c "SELECT citus_set_coordinator_host('centos11', 5432);"
$ sudo -i -u postgres psql -c "SELECT * from citus_add_node('centos12', 5432);"
$ sudo -i -u postgres psql -c "SELECT * from citus_add_node('centos13', 5432);"

$ sudo -i -u postgres psql -c "SELECT * FROM citus_get_active_worker_nodes();"
node_name | node_port
-----------+-----------
centos13 | 5432
centos12 | 5432

3. 操作 Citus

  • Azure Cosmos DB for PostgreSQL 同理

(1) 集群操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
$ sudo -i -u postgres psql
# 查看PG版本
postgres=# SHOW server_version;
server_version
----------------
15.8
# 查看所有节点
postgres=# select nodeid, nodename from pg_dist_node where isactive;
nodeid | nodename
--------+----------
1 | centos11
2 | centos12
3 | centos13
# 查看在线的工作节点
postgres=# SELECT * FROM citus_get_active_worker_nodes();
node_name | node_port
-----------+-----------
centos13 | 5432
centos12 | 5432
# 设置分片(默认32)和副本(默认1,副本数量不能超过Worker节点数量)
postgres=# show citus.shard_count;
citus.shard_count
-------------------
32
postgres=# show citus.shard_replication_factor;
citus.shard_replication_factor
--------------------------------
1
postgres=# alter system set citus.shard_count=8;
postgres=# alter system set citus.shard_replication_factor=2;

(2) 创建分布式表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
$ sudo -i -u postgres psql
# 创建表
postgres=# CREATE TABLE test_table(id INT, name VARCHAR(16));
# 分发表(将表分解为分片,这些分片分布在各个节点)
postgres=# SELECT create_distributed_table('test_table', 'id', 'hash', shard_count:=2);
#postgres=# SELECT create_distributed_table('test_table', 'id'); # 默认Hash分布,32分片
#postgres=# SELECT create_distributed_table('test_table', 'id', 'append'); # 设置Append分布,需手动创建分片,轮询
#postgres=# SELECT create_distributed_table('test_table2', 'my_id', colocate_with => 'test_table'); # 亲和表(分布列值相同的行会分布到同一个Worker节点)
#postgres=# SELECT create_reference_table('test_table3'); # 引用表(不进行分片,每个节点存储相同数据,通过2PC保证副本一致性)
# 插入数据
postgres=# INSERT INTO test_table VALUES (1, 'a'),(2, 'b'),(3, 'c'),(4, 'd'),(5, 'e'),(6, 'f'),(7, 'g');
# 查看分布式表的详细信息
postgres=# SELECT * FROM citus_tables;
table_name | citus_table_type | distribution_column | colocation_id | table_size | shard_count | table_owner | access_method
------------+------------------+---------------------+---------------+------------+-------------+-------------+---------------
test_table | distributed | id | 4 | 16 kB | 2 | postgres | heap

(3) 查询分片信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
$ sudo -i -u postgres psql
# 查看分布式表分片的详细信息
postgres=# select * from citus_shards;
table_name | shardid | shard_name | citus_table_type | colocation_id | nodename | nodeport | shard_size
------------+---------+-------------------+------------------+---------------+----------+----------+------------
test_table | 102076 | test_table_102076 | distributed | 4 | centos12 | 5432 | 8192
test_table | 102077 | test_table_102077 | distributed | 4 | centos13 | 5432 | 8192
# 查看分片信息
postgres=# SELECT * from pg_dist_shard;
logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue
--------------+---------+--------------+---------------+---------------
test_table | 102076 | t | -2147483648 | -1
test_table | 102077 | t | 0 | 2147483647
# 查看副本信息(每个分片默认1个副本)
postgres=# select * from pg_dist_placement;
placementid | shardid | shardstate | shardlength | groupid
-------------+---------+------------+-------------+---------
69 | 102076 | 1 | 0 | 1
70 | 102077 | 1 | 0 | 2
# 查看每个分片所在的位置
postgres=# select * from pg_dist_shard_placement;
shardid | shardstate | shardlength | nodename | nodeport | placementid
---------+------------+-------------+----------+----------+-------------
102076 | 1 | 0 | centos12 | 5432 | 69
102077 | 1 | 0 | centos13 | 5432 | 70
postgres=# select shard.logicalrelid as table, placement.shardid as shard, node.nodename as host
from pg_dist_placement placement, pg_dist_node node, pg_dist_shard shard
where placement.groupid = node.groupid and shard.shardid = placement.shardid
order by shard limit 5;
table | shard | host
------------+--------+----------
test_table | 102076 | centos12
test_table | 102077 | centos13
# 查看每个分片的大小
postgres=# select * from run_command_on_shards('test_table', $cmd$ select pg_size_pretty(pg_table_size('%1$s')); $cmd$)
order by shardid limit 5;
shardid | success | result
---------+---------+------------
102076 | t | 8192 bytes
102077 | t | 8192 bytes
# 查看某个值对应的分片ID
postgres=# select get_shard_id_for_distribution_column('test_table', '1');
get_shard_id_for_distribution_column
--------------------------------------
102076
# 分片再平衡
postgres=# select rebalance_table_shards('test_table');

(4) 查询分布式表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
$ sudo -i -u postgres psql
# 查询分布式表
postgres=# select avg(id) from test_table;
avg
--------------------
4.0000000000000000
# 程序会为每个分片创建一个单独的查询,在工作节点上运行这些查询,并将结果组合在一起
postgres=# explain select avg(id) from test_table;
QUERY PLAN
-------------------------------------------------------------------------------------------------------
Aggregate (cost=500.00..500.02 rows=1 width=32) # 协调器节点合并聚合
-> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=16)
Task Count: 2
Tasks Shown: One of 2 # 2个任务
-> Task
Node: host=centos12 port=5432 dbname=postgres
-> Aggregate (cost=24.85..24.86 rows=1 width=16) # 工作节点聚合
-> Seq Scan on test_table_102076 test_table (cost=0.00..19.90 rows=990 width=4) # 扫描分片表
# 在centos12上查询test_table_102076
postgres=# select * from test_table_102076;
id | name
----+------
1 | a
3 | c
4 | d
5 | e
7 | g
# 在centos13上查询test_table_102077
id | name
----+------
2 | b
6 | f
# 在centos13上查询test_table_102076
postgres=# select * from test_table_102076;
ERROR: relation "test_table_102076" does not exist

参考