"PostgreSQL scales" - we have all heard this phrase over and over again. However, the question is: What does this actually mean? Does it mean 1 million rows? Or maybe even 1 billion rows? So, on a rainy weekend, I decided to do a little experiment to figure out if it is possible to squeeze 1 trillion rows (= 1000 billion rows) into my local personal computer.
Table of Contents
For those of you who don't believe this to be possible - here is proof:
1 2 3 4 5 |
test=# SELECT count(*) FROM t_data ; count --------------- 1024000000000 (1 row) |
Using Citus columnar storage, this translates to:
1 2 3 4 5 6 7 8 9 10 11 12 |
test=# \d t_data Table "public.t_data" Column | Type | Collation | Nullable | Default --------+---------+-----------+----------+--------- key_id | integer | | | data | bigint | | | test=# SELECT pg_size_pretty(citus_table_size('t_data')); pg_size_pretty ---------------- 1563 GB (1 row) |
But, before looking at the final result, we need to understand how we got here in the first place.
Obviously, I don't have a real world data set on hand that contains this much data, so I decided to generate this data synthetically. However, before we take a look at how this can be done, we have to discuss the basic setup.
Loading this much data into row storage would lead to insane amounts of data. Here is some math:
1 2 3 4 5 |
test=# SELECT pg_size_pretty(1024000000000::int8 * (24 + 12)); pg_size_pretty ---------------- 34 TB (1 row) |
Assuming a tuple header (for each row) of about 24 bytes and 12 bytes data, we would end up with roughly 34 TB, which is larger than the maximum size of a table in PostgreSQL (assuming 8k blocks).
This basically leaves us with a couple of choices:
To make sure that I can query the data a bit faster, I decided on a sharded Citus columnar storage, consisting of a coordinator and 4 nodes (all residing on my local desktop machine):
1 2 3 4 5 6 7 8 9 10 11 |
test=# SELECT nodeid, nodename, nodeport FROM pg_dist_node ORDER BY 1; nodeid | nodename | nodeport --------+-----------+---------- 1 | localhost | 5432 5 | localhost | 6001 6 | localhost | 6002 7 | localhost | 6003 8 | localhost | 6004 (5 rows) |
Once Citus has been installed and wired up (more on Citus can be found here), we can create a simple table and distribute it using one of the columns:
1 2 3 4 5 6 7 8 |
test=# CREATE TABLE t_data ( key_id int, data bigint ) USING columnar; CREATE TABLE test=# SELECT create_distributed_table('t_data', 'key_id'); SELECT |
To start, we can use the generate_series function to load some initial data. In this case, the INSERT statement generates 62.5 million rows in the most basic way:
1 2 3 4 |
test=# INSERT INTO t_data SELECT id % 100000, id FROM generate_series(1, 62500000) AS id; INSERT |
After populating some initial data, we can start to double the number of rows in our Citus table as often as we want:
1 2 3 |
... test=# INSERT INTO t_data SELECT * FROM t_data; ... <run more often> ... |
Voilà, after some time and enough steps to double the size of our distributed PostgreSQL table, we can query the content of the table. In the most simplistic of all cases, we can simply run a count:
1 2 3 4 5 6 7 |
test=# SELECT count(*) FROM t_data; count --------------- 1024000000000 (1 row) Time: 3225188.861 ms (53:45.189) |
The query takes roughly 53 minutes to run the operation. During the SELECT statement, we can see that Citus dispatches the load to a vast number of processes across those shards:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
top - 20:16:01 up 54 min, 1 user, load average: 30.02, 14.47, 6.39 Tasks: 703 total, 34 running, 669 sleeping, 0 stopped, 0 zombie %Cpu(s): 99.8 us, 0.2 sy, 0.0 ni, 0.0 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st MiB Mem : 128650.6 total, 50517.6 free, 20438.3 used, 72092.9 buff/cache MiB Swap: 8192.0 total, 8192.0 free, 0.0 used. 108212.4 avail Mem PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND 17845 hs 20 0 33.6g 225728 219904 R 100.0 0.2 2:33.04 postgres 16609 hs 20 0 249712 147672 144640 R 100.0 0.1 4:05.39 postgres 16610 hs 20 0 249704 149592 145984 R 100.0 0.1 3:49.71 postgres 17833 hs 20 0 247584 127732 122112 R 100.0 0.1 2:33.29 postgres 17835 hs 20 0 247588 145580 139520 R 100.0 0.1 2:32.36 postgres 17843 hs 20 0 247640 131620 125696 R 100.0 0.1 2:32.86 postgres 17850 hs 20 0 33.6g 203300 197376 R 100.0 0.2 2:33.20 postgres 17853 hs 20 0 247636 120900 115200 R 100.0 0.1 2:32.29 postgres 17855 hs 20 0 247636 125584 119808 R 100.0 0.1 2:32.84 postgres 17857 hs 20 0 33.6g 231460 225280 R 100.0 0.2 2:33.40 postgres 16608 hs 20 0 249704 148464 145140 R 99.7 0.1 3:47.70 postgres 17840 hs 20 0 247644 122632 117248 R 99.7 0.1 2:32.76 postgres 17841 hs 20 0 247644 131788 126208 R 99.7 0.1 2:32.44 postgres 17848 hs 20 0 247636 126248 120320 R 99.7 0.1 2:32.73 postgres 17854 hs 20 0 247644 126436 121088 R 99.7 0.1 2:32.59 postgres 16611 hs 20 0 249712 147620 144072 R 99.3 0.1 3:51.26 postgres ... |
What is important to see here, is that we are basically CPU and not I/O bound, which is a really important observation. Usually, CPU is a lot easier and a lot cheaper to scale than I/O. Therefore, seeing fully loaded CPUs is actually a good sign.
Often, we want to know how many instances of each group can be found in the data set. Here is how this works:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
test=# explain SELECT key_id, count(*) FROM t_data GROUP BY 1 ORDER BY 2 DESC; QUERY PLAN ---------------------------------------------------------------------------- Sort (cost=8304.82..8554.82 rows=100000 width=12) Sort Key: remote_scan.count DESC -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=12) Task Count: 32 Tasks Shown: One of 32 -> Task Node: host=localhost port=5432 dbname=test -> HashAggregate (cost=165451245.99..165451249.16 rows=317 width=12) Group Key: key_id -> Custom Scan (ColumnarScan) on t_data_106299 t_data (cost=0.00..3163471.27 rows=32457554944 width=4) Columnar Projected Columns: key_id (11 rows) Time: 287.430 ms |
Now, what can we see here? The most important thing to learn from this, is that Citus nicely dispatches the query to our shards. The plan shows that the shards are scanned (ColumnarScan) and aggregated locally. This works pretty much the same way a normal, single GROUP BY statement would aggregate data. However, there is more: Note that we are shared by key_id, so each incarnation of key_id is bound to a specific shard. Therefore, all Citus has to do is collect those partial results and sort them as desired in the query, which is now a quite simple task.
If we want to understand more deeply how this works, we can connect to one of the database nodes and check which queries are actually running.
Here is an example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
-[ RECORD 7 ]----+----------------------------------------------- datid | 17273 datname | test pid | 22237 leader_pid | usesysid | 10 usename | hs application_name | citus_internal gpid=10000022078 client_addr | 127.0.0.1 client_hostname | client_port | 37460 backend_start | 2025-01-08 21:20:52.062818+01 xact_start | 2025-01-08 21:20:52.066472+01 query_start | 2025-01-08 21:20:52.807719+01 state_change | 2025-01-08 21:20:52.807722+01 wait_event_type | wait_event | state | active backend_xid | backend_xmin | 2491 query_id | 6772522768391086066 query | SELECT key_id, count(*) AS count FROM public.t_data_106302 t_data WHERE true GROUP BY key_id backend_type | client backend |
Citus is, really, professional engineering. It sets a proper application_name, and we can easily inspect what is going on inside our cluster by using standard means provided by PostgreSQL.
I am a big fan of Citus and columnar storage - I strongly believe that Citus and PostgreSQL make a sensational team. The more I learn about both over the years, the more I adore these technologies. However, even at this stage, it is important to issue a warning: Just because something is good technology, it does not mean that you can simply throw every type of query against it and expect a sub-second answer.
Here is an example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
test=# explain SELECT key_id, avg(data - lag) FROM ( SELECT *, lag(data, 1) OVER (PARTITION BY key_id ORDER BY data) FROM t_data ) AS x GROUP BY 1; QUERY PLAN ----------------------------------------------------------------------------------- Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=36) Task Count: 32 Tasks Shown: One of 32 -> Task Node: host=localhost port=5432 dbname=test -> GroupAggregate (cost=6782453812.10..7675036577.03 rows=317 width=36) Group Key: t_data.key_id -> WindowAgg (cost=6782453812.10..7431604910.98 rows=32457554944 width=20) -> Sort (cost=6782453812.10..6863597699.46 rows=32457554944 width=12) Sort Key: t_data.key_id, t_data.data -> Custom Scan (ColumnarScan) on t_data_106299 t_data (cost=0.00..6326942.55 rows=32457554944 width=12) Columnar Projected Columns: key_id, data (12 rows) |
What is this query actually doing? For each incarnation of key_id, it calculates the average distance between values (ascending order). In real life, this means that the entire data set has to be sorted, sent through a window function, and then has to be grouped together. One can easily imagine that sorting 1
trillion rows is not really a quick operation - even when throwing hardware at the problem (remember: We are still only on my local PC).
The takeaway here is that if you start to run operations on ever larger data sets, you also have to scale up your thought process accordingly. One has to understand what is possible and what can be done easily. From a certain scale onwards, some operations are simply more tricky and can not be executed carelessly without facing dire consequences.
Leave a Reply