3.3. Queries #
When all the migration operations were performed successfully, it's time to check how queries are executed in the distributed schema.
3.3.1. q1
Query #
The q1
query is pretty simple, it selects the booking with the specified number:
SELECT * FROM bookings.bookings b WHERE b.book_ref = '0824C5';
For the regular PostgreSQL and for the ticket_no
sharding key, this query runs comparably fast. How fast the query is for the book_ref
sharding key, depends on the shard where it is executed. If it is executed in a shard where there is physically no data, Shardman sends the query to another shard, which causes a time delay due to network communication.
3.3.2. q2
Query #
This q2
query selects all the tickets from the specified booking:
SELECT t.* FROM bookings.bookings b JOIN bookings.tickets t ON t.book_ref = b.book_ref WHERE b.book_ref = '0824C5';
With the book_ref
sharding key, the query is pushed down to shards and the global table is joined with partitions of a sharded table:
Foreign Scan (actual rows=2 loops=1) Relations: (bookings_2_fdw b) INNER JOIN (tickets_2_fdw t) Network: FDW bytes sent=433 received=237
Let's look at the query plan for the ticket_no
sharding key:
Append (actual rows=2 loops=1) Network: FDW bytes sent=1263 received=205 -> Nested Loop (actual rows=1 loops=1) -> Seq Scan on tickets_0 t_1 (actual rows=1 loops=1) Filter: (book_ref = '0824C5'::bpchar) Rows Removed by Filter: 207092 -> Index Only Scan using bookings_pkey on bookings b (actual rows=1 loops=1) Index Cond: (book_ref = '0824C5'::bpchar) Heap Fetches: 0 -> Async Foreign Scan (actual rows=1 loops=1) Relations: (tickets_1_fdw t_2) INNER JOIN (bookings b) Network: FDW bytes sent=421 received=205 -> Async Foreign Scan (actual rows=0 loops=1) Relations: (tickets_2_fdw t_3) INNER JOIN (bookings b) Network: FDW bytes sent=421 -> Async Foreign Scan (actual rows=0 loops=1) Relations: (tickets_3_fdw t_4) INNER JOIN (bookings b) Network: FDW bytes sent=421
The plan contains Async Foreign Scan
nodes, which mean network data exchange between the query source node and shards, that is, data is received from shards and final processing is done on the query source node.
Look at the Network
line. A good criterion of whether query execution on shards is optimal is the value of received
. The lower its value, the better shards execute distributed queries. Most processing is done remotely, and the query source node gets the result that is ready for further processing.
The case where the sharding key is book_ref
looks much better as the table with ticket numbers already contains book_ref
.
The plan of the query to be executed on an arbitrary node is as follows:
Foreign Scan (actual rows=2 loops=1) Relations: (bookings_2_fdw b) INNER JOIN (tickets_2_fdw t) Network: FDW bytes sent=433 received=237
The network data exchange is only done with one shard, in which the query is executed. It is shard-3
, and the tickets_2
partition of the tickets
table is on the fourth node.
If this query is executed in the shard where the data is physically located, the query will be executed yet faster.
Let's look at the plan:
Nested Loop (actual rows=2 loops=1) -> Index Only Scan using bookings_2_pkey on bookings_2 -> Bitmap Heap Scan on tickets_2 -> Bitmap Index Scan on tickets_2_book_ref_idx
Network data exchange is not needed here as the requested data is located within the shard in which the query is executed.
In some cases, the choice of the shard for query execution matters. Being aware of the distribution logic, you can implement it at the application level and send some queries immediately to the shard where the needed data is located based on the sharding key.
3.3.3. q3
Query #
The q3
query finds all the flights for one of the tickets in the booking selected earlier:
SELECT tf.*, t.* FROM bookings.tickets t JOIN bookings.ticket_flights tf ON tf.ticket_no = t.ticket_no WHERE t.ticket_no = '0005435126781';
To choose a specific shard for query execution, as discussed in Section 3.3.2, note that with the ticket_no
sharding key, the query execution will be more optimal in the shard that contains the partition with the data. The planner knows that the shard contains all the data needed for joining tables, so no network communication between shards will occur.
For the book_ref
sharding key, note that from the booking number you can compute the ticket number and request it right from the “proper” shard.
So the query is as follows:
SELECT tf.*, t.* FROM bookings.tickets t JOIN bookings.ticket_flights tf ON tf.ticket_no = t.ticket_no AND t.book_ref = tf.book_ref WHERE t.ticket_no = '0005435126781' AND tf.book_ref = '0824C5';
The query is executed more slowly in the shard that does not contain the partition with the data sought:
Foreign Scan (actual rows=6 loops=1) Relations: (tickets_1_fdw t) INNER JOIN (ticket_flights_1_fdw tf) Network: FDW bytes sent=434 received=369
Network communication between shards is present in the plan, as it contains the Foreign Scan
node.
The importance of including the sharding key in a query can be illustrated with the following query for the book_ref
sharding key:
SELECT tf.*, t.* FROM bookings.tickets t JOIN bookings.ticket_flights tf ON tf.ticket_no = t.ticket_no WHERE t.ticket_no = '0005435126781' AND tf.book_ref = '0824C5';
Here the sharding key is not included in join
on purpose. Let's look at the plan:
Nested Loop (actual rows=6 loops=1) Network: FDW bytes sent=1419 received=600 -> Foreign Scan on ticket_flights_2_fdw tf (actual rows=6 loops=1) Network: FDW bytes sent=381 received=395 -> Append (actual rows=1 loops=6) Network: FDW bytes sent=1038 received=205 -> Seq Scan on tickets_0 t_1 (actual rows=0 loops=6) Filter: (ticket_no = '0005435126781'::bpchar) Rows Removed by Filter: 207273 -> Async Foreign Scan on tickets_1_fdw t_2 (actual rows=0 loops=6) Network: FDW bytes sent=346 received=205 -> Async Foreign Scan on tickets_2_fdw t_3 (actual rows=1 loops=6) Network: FDW bytes sent=346 -> Async Foreign Scan on tickets_3_fdw t_4 (actual rows=0 loops=6) Network: FDW bytes sent=346
We can notice differences from previous examples. Here the query was executed on all nodes and index was not used, so to return as few as 6 rows, Shardman had to sequentially scan whole partitions of the tickets
table, return the result to the query source node and after that perform join
with the ticket_flights
table. Async Foreign Scan
nodes indicate the sequential scan of the tickets
table on shards.
3.3.4. q4
Query #
This query returns all the flights for all the tickets included in a booking. There are several ways to do this: include a subquery in a WHERE
clause with the booking number, in the IN
clause, explicitly list ticket numbers or use the WHERE...OR
clause. Let's check execution of the query for all these variants.
SELECT tf.*, t.* FROM bookings.tickets t JOIN bookings.ticket_flights tf ON tf.ticket_no = t.ticket_no WHERE t.ticket_no IN ( SELECT t.ticket_no FROM bookings.bookings b JOIN bookings.tickets t ON t.book_ref = b.book_ref WHERE b.book_ref = '0824C5' );
This is just the query from the non-distributed database that we tried to execute. But its execution is equally poor for both sharding keys.
The query plan is like this:
Hash Join (actual rows=12 loops=1) Hash Cond: (tf.ticket_no = t.ticket_no) -> Append (actual rows=2360335 loops=1) -> Async Foreign Scan on ticket_flights_0_fdw tf_1 (actual rows=589983 loops=1) -> Async Foreign Scan on ticket_flights_1_fdw tf_2 (actual rows=590175 loops=1) -> Seq Scan on ticket_flights_2 tf_3 (actual rows=590174 loops=1) -> Async Foreign Scan on ticket_flights_3_fdw tf_4 (actual rows=590003 loops=1) -> Hash (actual rows=2 loops=1) Buckets: 1024 Batches: 1 Memory Usage: 9kB -> Hash Semi Join (actual rows=2 loops=1) Hash Cond: (t.ticket_no = t_5.ticket_no) -> Append (actual rows=829071 loops=1) -> Async Foreign Scan on tickets_0_fdw t_1 (actual rows=207273 loops=1) -> Async Foreign Scan on tickets_1_fdw t_2 (actual rows=207058 loops=1) -> Seq Scan on tickets_2 t_3 (actual rows=207431 loops=1) -> Async Foreign Scan on tickets_3_fdw t_4 (actual rows=207309 loops=1) -> Hash (actual rows=2 loops=1) Buckets: 1024 Batches: 1 Memory Usage: 9kB -> Nested Loop (actual rows=2 loops=1) -> Index Only Scan using tickets_2_pkey on tickets_2 t_5 -> Materialize (actual rows=1 loops=2) -> Index Only Scan using bookings_2_pkey on bookings_2 b
This plan shows that Shardman coped with the WHERE
subquery, then had to request all the rows of the tickets
and ticket_flights
tables and then process them on the query source node. This is a really poor performance. Let's try other variants:
For the ticket_no
sharding key, the query is:
SELECT tf.*, t.* FROM bookings.tickets t JOIN bookings.ticket_flights tf ON tf.ticket_no = t.ticket_no WHERE t.ticket_no IN ('0005435126781','0005435126782');
and the plan is:
Append (actual rows=12 loops=1) Network: FDW bytes sent=1098 received=1656 -> Async Foreign Scan (actual rows=6 loops=1) Relations: (tickets_0_fdw t_1) INNER JOIN (ticket_flights_0_fdw tf_1) Network: FDW bytes sent=549 received=1656 -> Async Foreign Scan (actual rows=6 loops=1) Relations: (tickets_1_fdw t_2) INNER JOIN (ticket_flights_1_fdw tf_2) Network: FDW bytes sent=549
Everything is pretty good here: the query was executed on two shards of four, and Append
of the results received only had to be done.
Let's recall that book_ref
is contained in both tickets
and ticket_flights
tables. So for the book_ref
sharding key, the query is:
SELECT tf.*, t.* FROM bookings.tickets t JOIN bookings.ticket_flights tf ON tf.ticket_no = t.ticket_no AND tf.book_ref = t.book_ref WHERE t.book_ref = '0824C5';
and the plan is:
Foreign Scan (actual rows=12 loops=1) Relations: (tickets_2_fdw t) INNER JOIN (ticket_flights_2_fdw tf) Network: FDW bytes sent=547 received=1717
This is an excellent result — the query was modified to execute well in the distributed schema.
3.3.5. q5
Query #
This is a small analytical query, which returns the names and ticket numbers of the passengers who got registered first.
SELECT t.passenger_name, t.ticket_no FROM bookings.tickets t JOIN bookings.boarding_passes bp ON bp.ticket_no = t.ticket_no GROUP BY t.passenger_name, t.ticket_no HAVING max(bp.boarding_no) = 1 AND count(*) > 1;
This query is executed pretty slowly for both sharding keys. Below is the plan for book_ref
:
HashAggregate (actual rows=424 loops=1) Group Key: t.ticket_no Filter: ((max(bp.boarding_no) = 1) AND (count(*) > 1)) Batches: 85 Memory Usage: 4265kB Disk Usage: 112008kB Rows Removed by Filter: 700748 Network: FDW bytes sent=1215 received=77111136 -> Append (actual rows=1894295 loops=1) Network: FDW bytes sent=1215 received=77111136 -> Async Foreign Scan (actual rows=473327 loops=1) Relations: (tickets_0_fdw t_1) INNER JOIN (boarding_passes_0_fdw bp_1) Network: FDW bytes sent=404 received=813128 -> Async Foreign Scan (actual rows=472632 loops=1) Relations: (tickets_1_fdw t_2) INNER JOIN (boarding_passes_1_fdw bp_2) Network: FDW bytes sent=404 -> Async Foreign Scan (actual rows=475755 loops=1) Relations: (tickets_2_fdw t_3) INNER JOIN (boarding_passes_2_fdw bp_3) Network: FDW bytes sent=407 -> Hash Join (actual rows=472581 loops=1) Hash Cond: (bp_4.ticket_no = t_4.ticket_no) Network: FDW bytes received=28841344 -> Seq Scan on boarding_passes_3 bp_4 (actual rows=472581 loops=1) -> Hash (actual rows=207118 loops=1) Buckets: 65536 Batches: 4 Memory Usage: 3654kB Network: FDW bytes received=9176680 -> Seq Scan on tickets_3 t_4 (actual rows=207118 loops=1) Network: FDW bytes received=9176680
Note a pretty large amount of network data transfer between shards. Let's improve the query by adding book_ref
as one more condition for joining tables:
SELECT t.passenger_name, t.ticket_no FROM bookings.tickets t JOIN bookings.boarding_passes bp ON bp.ticket_no = t.ticket_no AND bp.book_ref=t.book_ref -- <= added book_ref GROUP BY t.passenger_name, t.ticket_no HAVING max(bp.boarding_no) = 1 AND count(*) > 1;
Let's look at the query plan:
GroupAggregate (actual rows=424 loops=1) Group Key: t.passenger_name, t.ticket_no Filter: ((max(bp.boarding_no) = 1) AND (count(*) > 1)) Rows Removed by Filter: 700748 Network: FDW bytes sent=1424 received=77092816 -> Merge Append (actual rows=1894295 loops=1) Sort Key: t.passenger_name, t.ticket_no Network: FDW bytes sent=1424 received=77092816 -> Foreign Scan (actual rows=472757 loops=1) Relations: (tickets_0_fdw t_1) INNER JOIN (boarding_passes_0_fdw bp_1) Network: FDW bytes sent=472 received=2884064 -> Sort (actual rows=472843 loops=1) Sort Key: t_2.passenger_name, t_2.ticket_no Sort Method: external merge Disk: 21152kB Network: FDW bytes received=22753536 -> Hash Join (actual rows=472843 loops=1) Hash Cond: ((bp_2.ticket_no = t_2.ticket_no) AND (bp_2.book_ref = t_2.book_ref)) Network: FDW bytes received=22753536 -> Seq Scan on boarding_passes_1 bp_2 (actual rows=472843 loops=1) -> Hash (actual rows=207058 loops=1) Buckets: 65536 Batches: 8 Memory Usage: 2264kB Network: FDW bytes received=22753536 -> Seq Scan on tickets_1 t_2 (actual rows=207058 loops=1) Network: FDW bytes received=22753536 -> Foreign Scan (actual rows=474715 loops=1) Relations: (tickets_2_fdw t_3) INNER JOIN (boarding_passes_2_fdw bp_3) Network: FDW bytes sent=476 received=2884120 -> Foreign Scan (actual rows=473980 loops=1) Relations: (tickets_3_fdw t_4) INNER JOIN (boarding_passes_3_fdw bp_4) Network: FDW bytes sent=476 received=25745384
The situation considerably improved, the result was received on the query source node, and then final filtering, grouping and joining data were done.
For the ticket_no
sharding key, the source query plan looks like this:
HashAggregate (actual rows=424 loops=1) Group Key: t.ticket_no Filter: ((max(bp.boarding_no) = 1) AND (count(*) > 1)) Batches: 85 Memory Usage: 4265kB Disk Usage: 111824kB Rows Removed by Filter: 700748 Network: FDW bytes sent=1188 received=77103620 -> Append (actual rows=1894295 loops=1) Network: FDW bytes sent=1188 received=77103620 -> Async Foreign Scan (actual rows=473327 loops=1) Relations: (tickets_0_fdw t_1) INNER JOIN (boarding_passes_0_fdw bp_1) Network: FDW bytes sent=394 -> Hash Join (actual rows=472632 loops=1) Hash Cond: (bp_2.ticket_no = t_2.ticket_no) Network: FDW bytes received=77103620 -> Seq Scan on boarding_passes_1 bp_2 (actual rows=472632 loops=1) -> Hash (actual rows=206712 loops=1) Buckets: 65536 Batches: 4 Memory Usage: 3654kB Network: FDW bytes received=23859576 -> Seq Scan on tickets_1 t_2 (actual rows=206712 loops=1) Network: FDW bytes received=23859576 -> Async Foreign Scan (actual rows=475755 loops=1) Relations: (tickets_2_fdw t_3) INNER JOIN (boarding_passes_2_fdw bp_3) Network: FDW bytes sent=397 -> Async Foreign Scan (actual rows=472581 loops=1) Relations: (tickets_3_fdw t_4) INNER JOIN (boarding_passes_3_fdw bp_4) Network: FDW bytes sent=397
We can see that table joining is done on shards, while data filtering, grouping and aggregation are done on the query source node. The source query does not need to be modified in this case.
3.3.6. q6
Query #
For each ticket booked a week ago from now, this query displays all the included flight segments, together with connection time.
SELECT tf.ticket_no,f.departure_airport, f.arrival_airport,f.scheduled_arrival, lead(f.scheduled_departure) OVER w AS next_departure, lead(f.scheduled_departure) OVER w - f.scheduled_arrival AS gap FROM bookings.bookings b JOIN bookings.tickets t ON t.book_ref = b.book_ref JOIN bookings.ticket_flights tf ON tf.ticket_no = t.ticket_no JOIN bookings.flights f ON tf.flight_id = f.flight_id WHERE b.book_date = bookings.now()::date - INTERVAL '7 day' WINDOW w AS ( PARTITION BY tf.ticket_no ORDER BY f.scheduled_departure);
For this query, the type of the book_date
column must be cast from the timestamptz
to date
. When casting types, PostgreSQL casts the column data type to the data type specified in the filtering condition, but not vice versa. Therefore, Shardman must first get all the data from other shards, cast the type and apply filtering only after that. The query plan looks like this:
WindowAgg (actual rows=26 loops=1) Network: FDW bytes sent=1750 received=113339240 -> Sort (actual rows=26 loops=1) Sort Key: tf.ticket_no, f.scheduled_departure Sort Method: quicksort Memory: 27kB Network: FDW bytes sent=1750 received=113339240 -> Append (actual rows=26 loops=1) Network: FDW bytes sent=1750 received=113339240 -> Hash Join (actual rows=10 loops=1) Hash Cond: (t_1.book_ref = b.book_ref) Network: FDW bytes sent=582 received=37717376 -> Hash Join (actual rows=6 loops=1) Hash Cond: (t_2.book_ref = b.book_ref) Network: FDW bytes sent=582 received=37700608 -> Hash Join (actual rows=2 loops=1) Hash Cond: (t_3.book_ref = b.book_ref) Network: FDW bytes sent=586 received=37921256 -> Nested Loop (actual rows=8 loops=1) -> Nested Loop (actual rows=8 loops=1) -> Hash Join (actual rows=2 loops=1) Hash Cond: (t_4.book_ref = b.book_ref) -> Seq Scan on tickets_3 t_4 (actual rows=207118 loops=1) -> Index Scan using flights_pkey on flights f (actual rows=1 loops=8) Index Cond: (flight_id = tf_4.flight_id)
Pay attention to the number of bytes received from other cluster shards and to the sequential scan of the tickets
table. Let's try to rewrite the query to avoid the type cast.
The idea is pretty simple: the interval will be computed at the application level rather than at the database level, and the data of the timestamptz
type will be readily passed to the query. Besides, creation of an additional index can help:
CREATE INDEX if not exists bookings_date_idx ON bookings.bookings(book_date);
For the book_ref
sharding key, the query looks like this:
SELECT tf.ticket_no,f.departure_airport, f.arrival_airport,f.scheduled_arrival, lead(f.scheduled_departure) OVER w AS next_departure, lead(f.scheduled_departure) OVER w - f.scheduled_arrival AS gap FROM bookings.bookings b JOIN bookings.tickets t ON t.book_ref = b.book_ref JOIN bookings.ticket_flights tf ON tf.ticket_no = t.ticket_no AND tf.book_ref = t.book_ref -- <= added book_ref JOIN bookings.flights f ON tf.flight_id = f.flight_id WHERE b.book_date = '2016-10-06 14:00:00+00' WINDOW w AS ( PARTITION BY tf.ticket_no ORDER BY f.scheduled_departure);
This query has a different plan:
WindowAgg (actual rows=18 loops=1) Network: FDW bytes sent=2268 received=892 -> Sort (actual rows=18 loops=1) Sort Key: tf.ticket_no, f.scheduled_departure Sort Method: quicksort Memory: 26kB Network: FDW bytes sent=2268 received=892 -> Append (actual rows=18 loops=1) Network: FDW bytes sent=2268 received=892 -> Nested Loop (actual rows=4 loops=1) -> Nested Loop (actual rows=4 loops=1) -> Nested Loop (actual rows=1 loops=1) -> Bitmap Heap Scan on bookings_0 b_1 Heap Blocks: exact=1 -> Bitmap Index Scan on bookings_0_book_date_idx -> Index Only Scan using tickets_0_pkey on tickets_0 t_1 Index Cond: (book_ref = b_1.book_ref) Heap Fetches: 0 -> Index Only Scan using ticket_flights_0_pkey on ticket_flights_0 tf_1 Heap Fetches: 0 -> Index Scan using flights_pkey on flights f (actual rows=1 loops=4) Index Cond: (flight_id = tf_1.flight_id) -> Async Foreign Scan (actual rows=14 loops=1) Network: FDW bytes sent=754 received=892 -> Async Foreign Scan (actual rows=0 loops=1) Network: FDW bytes sent=757 -- received=0! -> Async Foreign Scan (actual rows=0 loops=1) Network: FDW bytes sent=757 -- received=0!
This is much better. First, the whole table is not scanned, Index Only Scan
is only included. Second, it is clear how much the amount of network data transfer between nodes is reduced.
3.3.7. q7
Query #
Assume that statistics is needed showing how many passengers there are per booking. To find this out, let's first compute the number of passengers in each booking and then the number of bookings with each number of passengers.
SELECT tt.cnt, count(*) FROM ( SELECT count(*) cnt FROM bookings.tickets t GROUP BY t.book_ref ) tt GROUP BY tt.cnt ORDER BY tt.cnt;
This query processes all the data in the tickets
and bookings
tables. So intensive network data exchange between shards cannot be avoided. Also note that the value of the work_mem
parameter must be pretty high to avoid the use of disk when joining tables. So let's change the value of work_mem
in the cluster:
shardmanctl set work_mem='256MB';
The query plan for the ticket_no
sharding key is as follows:
GroupAggregate (actual rows=5 loops=1) Group Key: tt.cnt Network: FDW bytes sent=798 received=18338112 -> Sort (actual rows=593433 loops=1) Sort Key: tt.cnt Sort Method: quicksort Memory: 57030kB Network: FDW bytes sent=798 received=18338112 -> Subquery Scan on tt (actual rows=593433 loops=1) Network: FDW bytes sent=798 received=18338112 -> Finalize HashAggregate (actual rows=593433 loops=1) Group Key: t.book_ref Batches: 1 Memory Usage: 81953kB Network: FDW bytes sent=798 received=18338112 -> Append (actual rows=763806 loops=1) Network: FDW bytes sent=798 received=18338112 -> Async Foreign Scan (actual rows=190886 loops=1) Relations: Aggregate on (tickets_0_fdw t) Network: FDW bytes sent=266 received=1558336 -> Async Foreign Scan (actual rows=190501 loops=1) Relations: Aggregate on (tickets_1_fdw t_1) Network: FDW bytes sent=266 -> Async Foreign Scan (actual rows=191589 loops=1) Relations: Aggregate on (tickets_2_fdw t_2) Network: FDW bytes sent=266 -> Partial HashAggregate (actual rows=190830 loops=1) Group Key: t_3.book_ref Batches: 1 Memory Usage: 36881kB Network: FDW bytes received=4981496 -> Seq Scan on tickets_3 t_3 (actual rows=207118 loops=1) Network: FDW bytes received=4981496
The query plan for the book_ref
sharding key is as follows:
Sort (actual rows=5 loops=1) Sort Key: (count(*)) Sort Method: quicksort Memory: 25kB Network: FDW bytes sent=798 received=14239951 -> HashAggregate (actual rows=5 loops=1) Group Key: (count(*)) Batches: 1 Memory Usage: 40kB Network: FDW bytes sent=798 received=14239951 -> Append (actual rows=593433 loops=1) Network: FDW bytes sent=798 received=14239951 -> GroupAggregate (actual rows=148504 loops=1) Group Key: t.book_ref -> Index Only Scan using tickets_0_book_ref_idx on tickets_0 t (rows=207273) Heap Fetches: 0 -> Async Foreign Scan (actual rows=148256 loops=1) Relations: Aggregate on (tickets_1_fdw t_1) Network: FDW bytes sent=266 received=1917350 -> Async Foreign Scan (actual rows=148270 loops=1) Relations: Aggregate on (tickets_2_fdw t_2) Network: FDW bytes sent=266 -> Async Foreign Scan (actual rows=148403 loops=1) Relations: Aggregate on (tickets_3_fdw t_3) Network: FDW bytes sent=266
The query plans differ first by the order of joining tables and by the computation of aggregates.
For the ticket_no
sharding key, all the partially aggregated data of the joined tables is received (17 Mb), and all the rest of processing is performed on the query source node.
For the book_ref
sharding key, as it is included in the query, most of the computation of aggregates is performed on the nodes and only the result (13 Mb) is returned to the query source node, which is then finalized.
3.3.8. q8
Query #
This query answers the question: which are the most frequent combinations of first and last names in bookings and what is the ratio of the passengers with such names to the total number of passengers. A window function is used to get the result:
SELECT passenger_name, round( 100.0 * cnt / sum(cnt) OVER (), 2) AS percent FROM ( SELECT passenger_name, count(*) cnt FROM bookings.tickets GROUP BY passenger_name ) t ORDER BY percent DESC;
For both sharding keys, the query plan looks like this:
Sort (actual rows=27909 loops=1) Sort Key: (round(((100.0 * ((count(*)))::numeric) / sum((count(*))) OVER (?)), 2)) DESC Sort Method: quicksort Memory: 3076kB Network: FDW bytes sent=816 received=2376448 -> WindowAgg (actual rows=27909 loops=1) Network: FDW bytes sent=816 received=2376448 -> Finalize HashAggregate (actual rows=27909 loops=1) Group Key: tickets.passenger_name Batches: 1 Memory Usage: 5649kB Network: FDW bytes sent=816 received=2376448 -> Append (actual rows=74104 loops=1) Network: FDW bytes sent=816 received=2376448 -> Partial HashAggregate (actual rows=18589 loops=1) Group Key: tickets.passenger_name Batches: 1 Memory Usage: 2833kB -> Seq Scan on tickets_0 tickets (actual rows=207273 loops=1) -> Async Foreign Scan (actual rows=18435 loops=1) Relations: Aggregate on (tickets_1_fdw tickets_1) Network: FDW bytes sent=272 received=2376448 -> Async Foreign Scan (actual rows=18567 loops=1) Relations: Aggregate on (tickets_2_fdw tickets_2) Network: FDW bytes sent=272 -> Async Foreign Scan (actual rows=18513 loops=1) Relations: Aggregate on (tickets_3_fdw tickets_3) Network: FDW bytes sent=272
The plan shows that the data preprocessing, table joins and partial aggregation are performed on shards, while the final processing is performed on the query source node.
3.3.9. q9
Query #
This query answers the question: who traveled from Moscow (SVO) to Novosibirsk (OVB) on seat 1A the day before yesterday, and when was the ticket booked. The day before yesterday is computed from the function booking.now
rather than from the current date. The query in the non-distributed schema is as follows:
SELECT t.passenger_name, b.book_date v FROM bookings b JOIN tickets t ON t.book_ref = b.book_ref JOIN boarding_passes bp ON bp.ticket_no = t.ticket_no JOIN flights f ON f.flight_id = bp.flight_id WHERE f.departure_airport = 'SVO' AND f.arrival_airport = 'OVB' AND f.scheduled_departure::date = bookings.now()::date - INTERVAL '2 day' AND bp.seat_no = '1A';
As explained for the q6
Query, INTERVAL
causes the type cast. Let's get rid of it and rewrite the query for the book_ref
sharding key as follows:
SELECT t.passenger_name, b.book_date v FROM bookings b JOIN tickets t ON t.book_ref = b.book_ref JOIN boarding_passes bp ON bp.ticket_no = t.ticket_no AND bp.book_ref = b.book_ref -- <= added book_ref JOIN flights f ON f.flight_id = bp.flight_id WHERE f.departure_airport = 'SVO' AND f.arrival_airport = 'OVB' AND f.scheduled_departure BETWEEN '2016-10-11 14:00:00+00' AND '2016-10-13 14:00:00+00' AND bp.seat_no = '1A';
Let's also create a couple of additional indexes:
CREATE INDEX idx_boarding_passes_seats ON boarding_passes((seat_no::text)); CREATE INDEX idx_flights_sched_dep ON flights(departure_airport,arrival_airport,scheduled_departure);
As a result, the query plan appears pretty good:
Append (actual rows=1 loops=1) Network: FDW bytes sent=2484 received=102 -> Nested Loop (actual rows=1 loops=1) Join Filter: (bp_1.ticket_no = t_1.ticket_no) Rows Removed by Join Filter: 1 -> Nested Loop (actual rows=1 loops=1) -> Hash Join (actual rows=1 loops=1) Hash Cond: (bp_1.flight_id = f.flight_id) -> Bitmap Heap Scan on boarding_passes_0 bp_1 (actual rows=4919 loops=1) Recheck Cond: ((seat_no)::text = '1A'::text) Heap Blocks: exact=2632 -> Bitmap Index Scan on boarding_passes_0_seat_no_idx (actual rows=4919) Index Cond: ((seat_no)::text = '1A'::text) -> Hash (actual rows=2 loops=1) Buckets: 1024 Batches: 1 Memory Usage: 9kB -> Bitmap Heap Scan on flights f (actual rows=2 loops=1) Recheck Cond: ((departure_airport = 'SVO'::bpchar) AND (arrival_airport = 'OVB'::bpchar) AND (scheduled_departure >= '2016-10-11 14:00:00+00'::timestamp with time zone) AND (scheduled_departure < '2016-10-13 14:00:00+00'::timestamp with time zone)) Heap Blocks: exact=2 -> Bitmap Index Scan on idx_flights_sched_dep (actual rows=2 loops=1) Index Cond: ((departure_airport = 'SVO'::bpchar) AND (arrival_airport = 'OVB'::bpchar) AND (scheduled_departure >= '2016-10-11 14:00:00+00'::timestamp with time zone) AND (scheduled_departure <= '2016-10-13 14:00:00+00'::timestamp with time zone)) -> Index Scan using bookings_0_pkey on bookings_0 b_1 (actual rows=1 loops=1) Index Cond: (book_ref = bp_1.book_ref) -> Index Scan using tickets_0_book_ref_idx on tickets_0 t_1 (actual rows=2 loops=1) Index Cond: (book_ref = b_1.book_ref) -> Async Foreign Scan (actual rows=0 loops=1) Relations: (((boarding_passes_1_fdw bp_2) INNER JOIN (flights f)) INNER JOIN (tickets_1_fdw t_2)) INNER JOIN (bookings_1_fdw b_2) Network: FDW bytes sent=826 received=68 -> Async Foreign Scan (actual rows=0 loops=1) Relations: (((boarding_passes_2_fdw bp_3) INNER JOIN (flights f)) INNER JOIN (tickets_2_fdw t_3)) INNER JOIN (bookings_2_fdw b_3) Network: FDW bytes sent=829 received=34 -> Async Foreign Scan (actual rows=0 loops=1) Relations: (((boarding_passes_3_fdw bp_4) INNER JOIN (flights f)) INNER JOIN (tickets_3_fdw t_4)) INNER JOIN (bookings_3_fdw b_4) Network: FDW bytes sent=829
It is clear from this plan that all the table joining was done on shards and the query source node received the result that did not contain rows as the data was located on one shard where the query was executed.
If this query were executed on a different shard, the plan would be the same, but the data for finalization would be received from the shard with the data.