Citus is a capable sharding solution for PostgreSQL. It solves a ton of scalability issues: these can be addressed using a sharding approach. We at CYBERTEC have used Citus for some time and can wholeheartedly recommend it (check out our services to find out more).
Table of Contents
Since the need for PostgreSQL sharding is constantly growing, we thought we’d share some of our knowledge. Here’s how to shard a table from scratch.
Most people run Citus directly by firing up a couple of containers which already contain a working version of the solution. This is a good way to get started; it helps to shave off some of the overhead associated with installation and configuration.
1 2 3 4 5 6 7 8 |
# run PostgreSQL with Citus on port 5500 docker run -d --name citus -p 5500:5432 -e POSTGRES_PASSWORD=mypassword citusdata/citus # connect using psql within the Docker container docker exec -it citus psql -U postgres # or, connect using local psql psql -U postgres -d postgres -h localhost -p 5500 |
This method will provide you with a working solution in minutes.
However, if you want to get your hands dirty and find out what’s going on behind the scenes, you can also configure PostgreSQL manually to run Citus. After installing the binaries, it is necessary to load the Citus library directly at startup. The way to do that is to add the citus extension to shared_preload_libraries
in postgresql.conf:
1 2 |
shared_preload_libraries = 'citus,pg_stat_statements' # (change requires restart) |
I found it super useful to also add pg_stat_statements to the variable, in order to monitor performance as professionally as possible. Note that “citus” has to be the first extension in the list. Otherwise, the server won’t start.
Once this is done, we can provide a set of database instances which we’ll run. In my case I prepared 5 empty instances with the following setup:
These are empty instances which have been given the configuration discussed above. First we have to set the coordinator node, then we can add the worker shards.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
postgres=# SELECT citus_set_coordinator_host('localhost', 5432); citus_set_coordinator_host ---------------------------- (1 row) postgres=# SELECT citus_add_node('localhost', 6001); citus_add_node ---------------- 2 (1 row) postgres=# SELECT citus_add_node('localhost', 6002); citus_add_node ---------------- 3 (1 row) postgres=# SELECT citus_add_node('localhost', 6003); citus_add_node ---------------- 4 (1 row) postgres=# SELECT citus_add_node('localhost', 6004); citus_add_node ---------------- 5 (1 row) |
You are not supposed to run operations there. Citus is even kind enough to remind us of this fact when we try to do things we are not supposed to do:
1 2 3 |
postgres=# CREATE EXTENSION pg_stat_statements ; ERROR: operation is not allowed on this node HINT: Connect to the coordinator and run it again. |
This is relevant because it ensures consistency and avoids shards getting out of control. Centralising things through the coordinator is a good idea, and helps to keep your cluster healthy.
After successfully configuring Citus it is time to create a table. The creation of the table itself is simple. All it takes is a simple CREATE TABLE command. However, this does not mean that the new relation is already shared: We have to tell Citus to actually shard the table and define the key which we want to use to split the data. In my case the sharding key is called shard_key and is part of the table.
1 2 3 4 5 6 7 8 9 10 11 |
postgres=# CREATE TABLE t_shard ( id serial, shard_key int, n int, placeholder char(100) DEFAULT 'a'); CREATE TABLE postgres=# SELECT create_distributed_table('t_shard', 'shard_key'); create_distributed_table -------------------------- (1 row) |
The beauty here is that we can simply send data to the table normally and Citus will take care of the rest for us automatically:
1 2 3 4 5 6 7 |
postgres=# timing Timing is on. postgres=# INSERT INTO t_shard (shard_key, n) SELECT id % 16, random()*100000 FROM generate_series(1, 5000000) AS id; INSERT 0 5000000 Time: 3443.606 ms (00:03.444) |
5 million rows have been inserted on a simple Mac Mini machine (local desktop). Still, the entire process takes hardly more than 3 seconds. It seems parallelism indeed pays off.
What is noteworthy about this is the size of the tables:
1 2 3 4 5 6 7 8 9 |
postgres=# d+ List of relations Schema | Name | Type | Owner | Persistence | Access method | Size | … --------+----------------+----------+-------+-------------+---------------+------------+ … public | citus_schemas | view | hs | permanent | | 0 bytes | public | citus_tables | view | hs | permanent | | 0 bytes | public | t_shard | table | hs | permanent | heap | 0 bytes | public | t_shard_id_seq | sequence | hs | permanent | | 8192 bytes | (4 rows) |
Don’t trust d+ here. The table has no “size” on the PostgreSQL side as we are using Citus storage. To inspect the real size of things we have to take a look at the Citus meta data:
1 2 3 4 5 6 7 8 9 10 11 12 |
postgres=# x Expanded display is on. postgres=# SELECT * FROM citus_tables ; -[ RECORD 1 ]-------+------------ table_name | t_shard citus_table_type | distributed distribution_column | shard_key colocation_id | 1 table_size | 711 MB shard_count | 32 table_owner | hs access_method | heap |
Now we can see the real size of the table. Why does it matter? Well, many monitoring tools are using the standard PostgreSQL functions to gather size information. Monitoring has to be made aware of the existence of sharded tables. If you are using pgwatch
, you can easily add metrics to take that into account.
Now that we have created a table and inserted some data, it's important to see how the query is actually handled. We can run a count ( * ) and see which kind of execution plan we'll get. It will enlighten us on the performance side of things:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
postgres=# explain (analyze, buffers, timing) SELECT count(*) FROM t_shard; QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Aggregate (cost=250.00..250.02 rows=1 width=8) (actual time=520.360..520.361 rows=1 loops=1) Buffers: shared hit=93 -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=8) (actual time=520.347..520.350 rows=32 loops=1) Task Count: 32 Tuple data received from nodes: 256 bytes Tasks Shown: One of 32 -> Task Tuple data received from node: 8 bytes Node: host=localhost port=6001 dbname=postgres -> Finalize Aggregate (cost=22929.03..22929.04 rows=1 width=8) (actual time=453.723..454.161 rows=1 loops=1) Buffers: shared hit=2336 read=14710 -> Gather (cost=22928.81..22929.02 rows=2 width=8) (actual time=453.632..454.157 rows=3 loops=1) Workers Planned: 2 Workers Launched: 2 Buffers: shared hit=2336 read=14710 -> Partial Aggregate (cost=21928.81..21928.82 rows=1 width=8) (actual time=448.045..448.047 rows=1 loops=3) Buffers: shared hit=2336 read=14710 -> Parallel Seq Scan on t_shard_102016 t_shard (cost=0.00..20952.25 rows=390625 width=0) (actual time=0.094..434.632 rows=312500 loops=3) Buffers: shared hit=2336 read=14710 Planning Time: 0.127 ms Execution Time: 454.193 ms Buffers: shared hit=93 Planning: Buffers: shared hit=349 Planning Time: 5.199 ms Execution Time: 520.431 ms (26 rows) |
PostgreSQL will distribute the query for us and send it to all shards. Note that PostgreSQL will not show the operations sent to each shard but just show things for one. However, those shards are created equal so we don’t care much.
If we look at a specific value in the shard_key
column, we'll see that only one shard is addressed:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
postgres=# explain (analyze, buffers, timing) SELECT count(*) FROM t_shard WHERE shard_key = 2; QUERY PLAN --------------------------------------------------------------------------------------------------------------------------------------------------------------------- Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) (actual time=33.874..33.875 rows=1 loops=1) Task Count: 1 Tuple data received from nodes: 8 bytes Tasks Shown: All -> Task Tuple data received from node: 8 bytes Node: host=localhost port=6001 dbname=postgres -> Finalize Aggregate (cost=8635.34..8635.35 rows=1 width=8) (actual time=30.490..31.309 rows=1 loops=1) Buffers: shared hit=2208 read=3474 -> Gather (cost=8635.12..8635.33 rows=2 width=8) (actual time=30.407..31.303 rows=3 loops=1) Workers Planned: 2 Workers Launched: 2 Buffers: shared hit=2208 read=3474 -> Partial Aggregate (cost=7635.12..7635.13 rows=1 width=8) (actual time=26.023..26.024 rows=1 loops=3) Buffers: shared hit=2208 read=3474 -> Parallel Seq Scan on t_shard_102032 t_shard (cost=0.00..7309.60 rows=130208 width=0) (actual time=0.420..19.400 rows=104167 loops=3) Filter: (shard_key = 2) Buffers: shared hit=2208 read=3474 Planning Time: 0.516 ms Execution Time: 31.384 ms Planning Time: 0.146 ms Execution Time: 33.903 ms (22 rows) |
The “Node” inside the plan shows that data is only comes from the instance running on port 6001, which tells us that we don’t have to scan all shards. That's super beneficial.
Often, sharding is envisioned as a fix for performance. The truth is, it is ONE step to good performance, but it does not substitute thinking and indexing.
Consider the following query:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
postgres=# explain (analyze, buffers, timing) SELECT count(*) FROM t_shard WHERE id = 10; QUERY PLAN --------------------------------------------------------------------------------------------------------------------------------------------------------------- Aggregate (cost=250.00..250.02 rows=1 width=8) (actual time=541.532..541.534 rows=1 loops=1) Buffers: shared hit=3 -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=8) (actual time=540.910..540.914 rows=32 loops=1) Task Count: 32 Tuple data received from nodes: 256 bytes Tasks Shown: One of 32 -> Task Tuple data received from node: 8 bytes Node: host=localhost port=6001 dbname=postgres -> Aggregate (cost=22928.91..22928.92 rows=1 width=8) (actual time=463.883..464.342 rows=1 loops=1) Buffers: shared hit=2528 read=14518 -> Gather (cost=1000.00..22928.91 rows=1 width=0) (actual time=463.881..464.339 rows=0 loops=1) Workers Planned: 2 Workers Launched: 2 Buffers: shared hit=2528 read=14518 -> Parallel Seq Scan on t_shard_102016 t_shard (cost=0.00..21928.81 rows=1 width=0) (actual time=456.989..456.990 rows=0 loops=3) Filter: (id = 10) Rows Removed by Filter: 312500 Buffers: shared hit=2528 read=14518 Planning Time: 0.267 ms Execution Time: 464.378 ms Buffers: shared hit=3 Planning: Buffers: shared hit=28 Planning Time: 4.432 ms Execution Time: 542.064 ms (26 rows) |
Citus will turn on the sharding machinery, query things in parallel, and return a result by going through ALL the data. In reality, this is really bad and throwing more hardware at the problem is going to make things even worse.
As is so often the case, the solution to the problem is to use indexing:
1 2 |
postgres=# CREATE INDEX ON t_shard (id); CREATE INDEX |
As you can see, after indexing, performance improves by orders of magnitude:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
postgres=# explain (analyze, buffers, timing) SELECT count(*) FROM t_shard WHERE id = 10; QUERY PLAN --------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Aggregate (cost=250.00..250.02 rows=1 width=8) (actual time=8.795..8.796 rows=1 loops=1) -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=8) (actual time=8.780..8.783 rows=32 loops=1) Task Count: 32 Tuple data received from nodes: 256 bytes Tasks Shown: One of 32 -> Task Tuple data received from node: 8 bytes Node: host=localhost port=6001 dbname=postgres -> Aggregate (cost=4.44..4.45 rows=1 width=8) (actual time=0.053..0.053 rows=1 loops=1) Buffers: shared read=3 -> Index Only Scan using t_shard_id_idx_102008 on t_shard_102008 t_shard (cost=0.42..4.44 rows=1 width=0) (actual time=0.049..0.049 rows=0 loops=1) Index Cond: (id = 10) Heap Fetches: 0 Buffers: shared read=3 Planning Time: 0.466 ms Execution Time: 0.479 ms Planning: Buffers: shared hit=117 read=1 Planning Time: 2.537 ms Execution Time: 8.875 ms (20 rows) |
This is orders of magnitudes faster than having no index, but of course also orders of magnitudes slower than working on a local, non-sharded database. In other words: There is no magic solution to performance - not even sharding.
Check out our other blog posts about sharding, indexing and performance:
In order to receive regular updates on important changes in PostgreSQL, subscribe to our newsletter, or follow us on Facebook or LinkedIn.
You need to load content from reCAPTCHA to submit the form. Please note that doing so will share data with third-party providers.
More InformationYou are currently viewing a placeholder content from Facebook. To access the actual content, click the button below. Please note that doing so will share data with third-party providers.
More InformationYou are currently viewing a placeholder content from X. To access the actual content, click the button below. Please note that doing so will share data with third-party providers.
More Information
Leave a Reply