17.3. Migration #

17.3.1. Migrating to Postgres Pro Shardman #

Different major versions of Postgres Pro Shardman, as well as different PostgreSQL-based products based on the same major version, can have binary incompatible databases, so you cannot replace the server binary and continue running.

For upgrade instructions specific to a particular release, see the Release Notes for the corresponding Postgres Pro Shardman version.

17.3.2. Migration of a Postgres Pro Shardman Database Schema #

Let's use the demo database Airlines as an example for development. The detailed description of the database schema is available at https://postgrespro.ru/education/demodb. This schema is used as a demo in training courses of Postgres Professional, for example, in QPT. Query Optimization.

The schema authors characterized it like this: We tried to make the database schema as simple as possible, without overloading it with unnecessary details, but not too simple to allow writing interesting and meaningful queries.

The database schema contains several tables with meaningful contents. For example, let's take the demo database version of 13.10.2016. You can find a link to downloading the database and schema dump (in Russian) following the link https://postgrespro.ru/education/courses/QPT. In addition to query examples provided below, you can find more examples from the above course and in the Postgres. The First Experience book.

This section shows two examples of schema modification and query adaptation:

  • Naive approach. It is simple, with minimal transformations to the schema, and it aims to add clarity to how queries work in a distributed schema.

  • Complex approach It is more complex, provided for better understanding of problems and processes that a developer may confront when migrating to a distributed schema and adapting applications to such a schema.

17.3.2.1. Database Source Schema #

Figure 17.2. Database Source Schema

Database Source Schema


The authors describe the Airlines database as follows:

The main entity is a booking (bookings).

One booking can include several passengers, with a separate ticket (tickets) issued to each passenger. A ticket has a unique number and includes information about the passenger. As such, the passenger is not a separate entity. Both the passenger's name and identity document number can change over time, so it is impossible to uniquely identify all the tickets of a particular person; for simplicity, we can assume that all passengers are unique.

The ticket includes one or more flight segments (ticket_flights). Several flight segments can be included into a single ticket if there are no non-stop flights between the points of departure and destination (connecting flights), or if it is a round-trip ticket. Although there is no constraint in the schema, it is assumed that all tickets in the booking have the same flight segments.

Each flight (flights) goes from one airport (airports) to another. Flights with the same flight number have the same points of departure and destination, but differ in departure date.

At flight check-in, the passenger is issued a boarding pass (boarding_passes), where the seat number is specified. The passenger can check in for the flight only if this flight is included into the ticket. The flight-seat combination must be unique to avoid issuing two boarding passes for the same seat.

The number of seats (seats) in the aircraft and their distribution between different travel classes depends on the model of the aircraft (aircrafts) performing the flight. It is assumed that every aircraft model has only one cabin configuration. Database schema does not check that seat numbers in boarding passes have the corresponding seats in the aircraft (such verification can be done using table triggers, or at the application level).

Let's look at the common entities and sizes of tables in the above schema. It is clear that ticket_flights, boarding_passes and tickets tables are linked by the ticket_no field. Additionally, the data size in these tables is 95% the total DB size.

Let's look at the bookings table. Although it seems to have a pretty compact structure, it can reach a considerable size over time.

Migration examples are provided for a Postgres Pro Shardman cluster that contains four shards. Sharded tables are divided into four parts, so that one part of a sharded table is only located in one shard. This is done on purpose, to more clearly display query plans. In real life, the number of partitions should be determined by the maximum number of cluster nodes.

When migrating a real-life DB schema, you should think over in advance the number of partitions to partition data in distributed tables. Also bear in mind that the best migration approach is to use SQL transformations that impose minimal limitations on database objects.

17.3.2.2. Postgres Pro Shardman Cluster Configuration #

The Postgres Pro Shardman cluster consists of four nodes — node1, node2, node3 and node4. Each cluster node is a shard.

The examples assume that the tables are divided into four partitions by the sharding key (num_parts = 4) and distributed across cluster nodes. Each table part with the data is located in the corresponding shard:

  • shard-1 is located on the cluster node node1

  • shard-2 is located on the cluster node node2

  • shard-3 is located on the cluster node node3

  • shard-4 is located on the cluster node node4

The cluster is intentionally presented in a simplified configuration. Cluster nodes have no replicas, and the configuration is not fault-tolerant.

17.3.2.3. Selecting the Sharding Key #

17.3.2.3.1. Naive[15] Approach — ticket_no Sharding Key #

With this approach, the choice of the sharding key is pretty evident. It is the ticket number ticket_no. The ticket number is the primary key of the tickets table, and it is a foreign key of the ticket_flights and boarding_passes tables.

The primary key of the ticket_flights and boarding_passes tables is composite. It is a unique index composed of ticket_no and flight_id.

So if ticket_no is chosen to be a sharding key, the data of the three tables is distributed across cluster shards and partitions that contain linked data are located in the same shards.

The rest of the tables — airports, flights, aircrafts and seats are small enough and rarely change. This allows making them global tables, or dictionary tables.

Figure 17.3. Naive Approach Schema

Naive Approach Schema


The main advantage of this approach from the point of view of creating the schema and queries to the DB is that no changes are needed except those that are inherent to working with distributed systems, that is, explicitly declaring tables, sequences etc. as distributed when creating them.

Once the sharding key is selected, we can proceed to creation of the distributed schema.

[15]

17.3.2.3.1.1. Creating the Schema Distributed by ticket_no #

First, turn on broadcasting DDL statements to all cluster shards:

    SET shardman.broadcast_ddl TO on;
    

Let's create the bookings schema on all shards:

    CREATE SCHEMA bookings;
    

As tables in the schema are linked with one another by a foreign key, the order of creating them, as well as auxiliary objects, matters.

The demo database contains snapshots of data, similar to a backup copy of a real system captured at some point in time. For example, if a flight has the Departed status, it means that the aircraft had already departed and was airborne at the time of the backup copy. The snapshot time is saved in the bookings.now() function. You can use this function in demo queries for cases where you would use the now() function in a real database. In addition, the return value of this function determines the version of the demo database. The latest version available is of 13.10.2016:

    SELECT bookings.now() as now;
              now
    -----------------------
    2016-10-13 17:00:00+03
    

In relation to this moment, all flights are classified as past and future flights.

Let's create the utility function bookings.now():

        CREATE FUNCTION bookings.now() RETURNS timestamp with time zone
        LANGUAGE sql IMMUTABLE COST 0.00999999978
        AS
        $sql$
        SELECT $qq$2016-10-13 17:00:00$qq$::TIMESTAMP AT TIME ZONE
    $zz$Europe/Moscow$zz$;
        $sql$;
    

In addition to tables, a global sequence is needed for generating IDs for data insertion in the flights table. In this example, we create the sequence explicitly and link it with a column of this table by assigning the generated values by default.

Let's create the sequence using the following DDL statement:

    CREATE SEQUENCE bookings.flights_flight_id_seq
        INCREMENT BY 1
        NO MINVALUE
        NO MAXVALUE
        CACHE 1 with(global);
    

with(global) creates a single distributed sequence available on all cluster nodes, which assigns values in a certain range for each shard, and the ranges for different shards do not intersect. See CREATE SEQUENCE and SQL Commands for more details of global sequences.

Under the hood of global sequences, there are regular sequences on each shard, and they are allocated by sequential blocks (of 65536 numbers by default). When all the numbers in a block are over, the next block is allocated to the local sequence of the shard. I.e., numbers from the global sequences are unique, but there is no strict monotony, and there may be "holes" in the values given by the sequencer[16].

The sequences can have the bigserial, smallserial, or serial type. Sequences are applicable both for sharded and global tables.

You should not create local sequences in each shard as their values may be duplicated.

[16]

Now, we create global tables. As explained above, they are small-size, their data changes rarely, so they are actually dictionary tables, which must contain the same data in all cluster shards. It is required that each global table has a primary key.

Let's create global tables using the following DDL statements:

    CREATE TABLE bookings.aircrafts (
        aircraft_code character(3) NOT NULL primary key,
        model text NOT NULL,
        range integer NOT NULL,
        CONSTRAINT aircrafts_range_check CHECK ((range > 0))
    ) with (global);

    CREATE TABLE bookings.seats (
        aircraft_code character(3) references bookings.aircrafts(aircraft_code),
        seat_no character varying(4) NOT NULL,
        fare_conditions character varying(10) NOT NULL,
        CONSTRAINT seats_fare_conditions_check CHECK (((fare_conditions)::text = ANY (ARRAY[('Economy'::character varying)::text, ('Comfort'::character varying)::text, ('Business'::character varying)::text]))),
        PRIMARY KEY (aircraft_code, seat_no)
    ) with (global);

    CREATE TABLE bookings.airports (
        airport_code character(3) NOT NULL primary key,
        airport_name text NOT NULL,
        city text NOT NULL,
        longitude double precision NOT NULL,
        latitude double precision NOT NULL,
        timezone text NOT NULL
    )  with (global);

    CREATE TABLE bookings.bookings (
        book_ref character(6) NOT NULL,
        book_date timestamp with time zone NOT NULL,
        total_amount numeric(10,2) NOT NULL,
        PRIMARY KEY (book_ref)
    ) with (global);

    CREATE TABLE bookings.flights (
        flight_id bigint NOT NULL PRIMARY KEY,-- <= a sequence will be assigned
        flight_no character(6) NOT NULL,
        scheduled_departure timestamp with time zone NOT NULL,
        scheduled_arrival timestamp with time zone NOT NULL,
        departure_airport character(3) REFERENCES bookings.airports(airport_code),
        arrival_airport character(3) REFERENCES bookings.airports(airport_code),
        status character varying(20) NOT NULL,
        aircraft_code character(3) references bookings.aircrafts(aircraft_code),
        actual_departure timestamp with time zone,
        actual_arrival timestamp with time zone,
        CONSTRAINT flights_check CHECK ((scheduled_arrival > scheduled_departure)),
        CONSTRAINT flights_check1 CHECK (((actual_arrival IS NULL) OR ((actual_departure IS NOT NULL) AND (actual_arrival IS NOT NULL) AND (actual_arrival > actual_departure)))),
        CONSTRAINT flights_status_check CHECK (((status)::text = ANY (ARRAY[('On Time'::character varying)::text, ('Delayed'::character varying)::text, ('Departed'::character varying)::text, ('Arrived'::character varying)::text, ('Scheduled'::character varying)::text, ('Cancelled'::character varying)::text])))
    ) with (global);

    -- associate the sequence with table column
    ALTER SEQUENCE bookings.flights_flight_id_seq OWNED BY bookings.flights.flight_id;


    -- assign the default value to the column
    ALTER TABLE bookings.flights ALTER COLUMN flight_id SET DEFAULT nextval('bookings.flights_flight_id_seq');

    ALTER TABLE bookings.flights ADD CONSTRAINT flights_flight_no_scheduled_departure_key UNIQUE (flight_no, scheduled_departure);
    

Next, we create sharded tables tickets, ticket_flights and boarding_passes in the bookings schema:

    CREATE TABLE bookings.tickets (
        ticket_no character(13) PRIMARY KEY,
        book_ref character(6) REFERENCES bookings.bookings(book_ref),
        passenger_id character varying(20) NOT NULL,
        passenger_name text NOT NULL,
        contact_data jsonb
    ) with (distributed_by='ticket_no', num_parts=4);

    CREATE TABLE bookings.ticket_flights (
        ticket_no character(13) NOT NULL,
        flight_id bigint references bookings.flights(flight_id),
        fare_conditions character varying(10) NOT NULL,
        amount numeric(10,2) NOT NULL,
        CONSTRAINT ticket_flights_amount_check CHECK ((amount >= (0)::numeric)),
        CONSTRAINT ticket_flights_fare_conditions_check CHECK (((fare_conditions)::text = ANY (ARRAY[('Economy'::character varying)::text, ('Comfort'::character varying)::text, ('Business'::character varying)::text]))),
        PRIMARY KEY (ticket_no, flight_id)
    ) with (distributed_by='ticket_no', colocate_with='bookings.tickets');

    CREATE TABLE bookings.boarding_passes (
        ticket_no character(13) NOT NULL,
        flight_id bigint NOT NULL,
        boarding_no integer NOT NULL,
        seat_no character varying(4) NOT NULL,
        FOREIGN KEY (ticket_no, flight_id) REFERENCES bookings.ticket_flights(ticket_no, flight_id),
        PRIMARY KEY (ticket_no, flight_id)
    ) with (distributed_by='ticket_no', colocate_with='bookings.tickets');

    -- constraints must contain sharding key
    ALTER TABLE bookings.boarding_passes ADD CONSTRAINT boarding_passes_flight_id_boarding_no_key UNIQUE (ticket_no, flight_id, boarding_no);

    ALTER TABLE bookings.boarding_passes ADD CONSTRAINT boarding_passes_flight_id_seat_no_key UNIQUE (ticket_no, flight_id, seat_no);
    

Additionally, when creating sharded tables, the num_parts parameter can be specified, which defines the number of sharded table partitions. In this example, it equals 4 to minimize the output of query plans. The default value is 20. This parameter may be important if in future you are going to add shards to a cluster and scale horizontally.

Based on the assumed future load and data size, num_parts should be sufficient for data rebalancing when new shards are added (num_parts must be greater than or equal to the number of cluster nodes). On the other hand, too many partitions cause a considerable increase of the query planning time. Therefore, an optimal balance should be achieved between the number of partitions and number of cluster nodes.

The last thing to do is to create a view that is needed to execute some queries:

    CREATE VIEW bookings.flights_v AS
    SELECT f.flight_id,
        f.flight_no,
        f.scheduled_departure,
        timezone(dep.timezone, f.scheduled_departure) AS scheduled_departure_local,
        f.scheduled_arrival,
        timezone(arr.timezone, f.scheduled_arrival) AS scheduled_arrival_local,
        (f.scheduled_arrival - f.scheduled_departure) AS scheduled_duration,
        f.departure_airport,
        dep.airport_name AS departure_airport_name,
        dep.city AS departure_city,
        f.arrival_airport,
        arr.airport_name AS arrival_airport_name,
        arr.city AS arrival_city,
        f.status,
        f.aircraft_code,
        f.actual_departure,
        timezone(dep.timezone, f.actual_departure) AS actual_departure_local,
        f.actual_arrival,
        timezone(arr.timezone, f.actual_arrival) AS actual_arrival_local,
        (f.actual_arrival - f.actual_departure) AS actual_duration
      FROM bookings.flights f,
        bookings.airports dep,
        bookings.airports arr
      WHERE ((f.departure_airport = dep.airport_code) AND (f.arrival_airport = arr.airport_code));
    

Now creation of the distributed schema is complete. Let's turn off broadcasting of DDL statements:

    SET shardman.broadcast_ddl TO off;
    

17.3.2.3.2. Complex Approach — book_ref Sharding Key #

A more complex approach to the sharding key choice involves the source schema modification, inclusion of new parameters in queries and other important changes.

What if an airline is in the market for over 10 years and the bookings table reaches the size that does not allow you to continue having it global anymore? But distributing its data is impossible either as it does not contain fields contained in other tables that it can be distributed among (as in variant 1).

When modifying the source schema, another field can be appropriate for use as a sharding key.

Looking at the bookings table, we can notice that values of the book_ref field are unique and this field is a primary key. book_ref is also a foreign key to the tickets table. So this field seems suitable for being the sharding key. However, book_ref is missing from the ticket_flights and boarding_passes tables.

If we add book_ref to the ticket_flights and boarding_passes tables, distributing of all the tables bookings, tickets, ticket_flights and boarding_passes with the book_ref sharding key becomes possible.

book_ref should be added to ticket_flights and boarding_passes in the source schema, and book_ref must be filled with data from the bookings table.

Figure 17.4. Source Schema Modification

Source Schema Modification


17.3.2.3.2.1. Modifying the Source Schema #

To properly transfer data from the source schema to the distributed one, the schema should be modified as follows:

  1. Add the book_ref field to the ticket_flights and boarding_passes tables:

        ALTER TABLE ticket_flights
          ADD COLUMN book_ref char(6);
    
        ALTER TABLE boarding_passes
          ADD COLUMN book_ref char(6);
        

  2. In these tables, fill the added book_ref field with data:

        WITH batch AS (SELECT book_ref,
                              ticket_no
                        FROM tickets)
        UPDATE ticket_flights
          SET book_ref = batch.book_ref
          FROM batch
        WHERE ticket_flights.ticket_no = batch.ticket_no
          AND ticket_flights.book_ref IS NULL;
    
    
        WITH batch AS (SELECT book_ref,
                              ticket_no
                        FROM tickets)
        UPDATE boarding_passes
          SET book_ref = batch.book_ref
          FROM batch
        WHERE boarding_passes.ticket_no = batch.ticket_no
          AND boarding_passes.book_ref IS NULL;
        

    Avoid using this example in a loaded production system as this approach blocks entire tables, that is, all rows in the tables. In production systems, data should be transferred incrementally, by parts.

Now the database schema is ready for data transferring.

17.3.2.3.2.2. Creating a Schema Distributed by book_ref #

Here the Postgres Pro Shardman shardman.broadcast_all_sql function is used to broadcast DDL statements on all cluster nodes. Let's create the bookings schema on all shards:

    SELECT shardman.broadcast_all_sql('CREATE SCHEMA bookings');
    

As tables in the schema are linked with an external key, the order of creating tables matters.

First we create a utility function bookings.now():

    SELECT shardman.broadcast_all_sql(
        $sql$
        CREATE FUNCTION bookings.now() RETURNS timestamp with time zone
        LANGUAGE sql IMMUTABLE COST 0.00999999978
        AS 
        $q$
        SELECT $qq$2016-10-13 17:00:00$qq$::TIMESTAMP 
                        AT TIME ZONE $zz$Europe/Moscow$zz$;
        $q$;
        $sql$
    );
    

Tables, users and sequences are created with the regular SQL. This function is not needed for that.

In this example, the global sequence is not explicitly created as for the bigserial type, Postgres Pro Shardman creates a global sequence automatically.

Now let's create global tables using the following DDL statements:

    CREATE TABLE bookings.aircrafts (
        aircraft_code character(3) NOT NULL PRIMARY KEY,
        model text NOT NULL,
        range integer NOT NULL,
        CONSTRAINT aircrafts_range_check CHECK ((range > 0))
    ) WITH (global);

    CREATE TABLE bookings.seats (
        aircraft_code character(3) REFERENCES bookings.aircrafts(aircraft_code),
        seat_no character varying(4) NOT NULL,
        fare_conditions character varying(10) NOT NULL,
        CONSTRAINT seats_fare_conditions_check CHECK ((
              (fare_conditions)::text = ANY (ARRAY[
                ('Economy'::character varying)::text,
                ('Comfort'::character varying)::text,
                ('Business'::character varying)::text])
              )),
        PRIMARY KEY (aircraft_code, seat_no)
    ) WITH (global);

    CREATE TABLE bookings.airports (
        airport_code character(3) NOT NULL PRIMARY KEY,
        airport_name text NOT NULL,
        city text NOT NULL,
        longitude double precision NOT NULL,
        latitude double precision NOT NULL,
        timezone text NOT NULL
    )  WITH (global);

    CREATE TABLE bookings.flights (
    -- the global sequence will be created automatically
    -- the default value will be assigned
        flight_id bigserial NOT NULL PRIMARY KEY,
        flight_no character(6) NOT NULL,
        scheduled_departure timestamp with time zone NOT NULL,
        scheduled_arrival timestamp with time zone NOT NULL,
        departure_airport character(3) REFERENCES bookings.airports(airport_code),
        arrival_airport character(3) REFERENCES bookings.airports(airport_code),
        status character varying(20) NOT NULL,
        aircraft_code character(3) REFERENCES bookings.aircrafts(aircraft_code),
        actual_departure timestamp with time zone,
        actual_arrival timestamp with time zone,
        CONSTRAINT flights_check CHECK ((scheduled_arrival > scheduled_departure)),
        CONSTRAINT flights_check1 CHECK ((
                      (actual_arrival IS NULL) 
                  OR ((actual_departure IS NOT NULL) 
                  AND (actual_arrival IS NOT NULL) 
                  AND (actual_arrival > actual_departure)))),
        CONSTRAINT flights_status_check CHECK (
              ((status)::text = ANY (
          ARRAY[('On Time'::character varying)::text,
                ('Delayed'::character varying)::text,
                ('Departed'::character varying)::text,
                ('Arrived'::character varying)::text,
                ('Scheduled'::character varying)::text,
                ('Cancelled'::character varying)::text])))
    ) WITH (global);

    ALTER TABLE bookings.flights 
      ADD CONSTRAINT flights_flight_no_scheduled_departure_key 
      UNIQUE (flight_no, scheduled_departure);
    

Now let's create sharded tables bookings, tickets, ticket_flights and boarding_passes in the bookings schema, as in the previous example:

    -- no modifications to these tables are done except distributing them
    CREATE TABLE bookings.bookings (
      book_ref character(6) NOT NULL PRIMARY KEY,
      book_date timestamp with time zone NOT NULL,
      total_amount numeric(10,2) NOT NULL
    ) WITH (distributed_by='book_ref', num_parts=4);


    CREATE TABLE bookings.tickets (
      ticket_no character(13) NOT NULL,
      book_ref character(6) REFERENCES bookings.bookings(book_ref),
      passenger_id character varying(20) NOT NULL,
      passenger_name text NOT NULL,
      contact_data jsonb,
      PRIMARY KEY (book_ref, ticket_no)
    ) WITH (distributed_by='book_ref', colocate_with='bookings.bookings');

    -- adding the book_ref foreign key to these tables
    CREATE TABLE bookings.ticket_flights (
      ticket_no character(13) NOT NULL,
      flight_id bigint NOT NULL,
      fare_conditions character varying(10) NOT NULL,
      amount numeric(10,2) NOT NULL,
      book_ref character(6) NOT NULL, -- <= added book_ref
      CONSTRAINT ticket_flights_amount_check 
            CHECK ((amount >= (0)::numeric)),
      CONSTRAINT ticket_flights_fare_conditions_check 
            CHECK (((fare_conditions)::text = ANY (
            ARRAY[('Economy'::character varying)::text,
                  ('Comfort'::character varying)::text,
                  ('Business'::character varying)::text]))),
      FOREIGN KEY (book_ref, ticket_no) 
            REFERENCES bookings.tickets(book_ref, ticket_no),
      PRIMARY KEY (book_ref, ticket_no, flight_id) -- <= changed the primary key
    ) with (distributed_by='book_ref', colocate_with='bookings.bookings');


    CREATE TABLE bookings.boarding_passes (
        ticket_no character(13) NOT NULL,
        flight_id bigint NOT NULL,
        boarding_no integer NOT NULL,
        seat_no character varying(4) NOT NULL,
        book_ref character(6) NOT NULL, – <= added book_ref
        FOREIGN KEY (book_ref, ticket_no, flight_id)
        REFERENCES bookings.ticket_flights(book_ref, ticket_no, flight_id),
        PRIMARY KEY (book_ref, ticket_no, flight_id)
        ) WITH (distributed_by='book_ref', colocate_with='bookings.bookings');
    -- constraints must contain the sharding key
    ALTER TABLE bookings.boarding_passes 
      ADD CONSTRAINT boarding_passes_flight_id_boarding_no_key 
      UNIQUE (book_ref, ticket_no, flight_id, boarding_no);

    ALTER TABLE bookings.boarding_passes 
      ADD CONSTRAINT boarding_passes_flight_id_seat_no_key 
      UNIQUE (book_ref, ticket_no, flight_id, seat_no);
    

Let's create the bookings.flights view:

    SELECT shardman.broadcast_all_sql($$
    CREATE VIEW bookings.flights_v AS
    SELECT f.flight_id,
          f.flight_no,
          f.scheduled_departure,
          timezone(dep.timezone, f.scheduled_departure) AS scheduled_departure_local,
          f.scheduled_arrival,
          timezone(arr.timezone, f.scheduled_arrival)   AS scheduled_arrival_local,
          (f.scheduled_arrival - f.scheduled_departure) AS scheduled_duration,
          f.departure_airport,
          dep.airport_name                              AS departure_airport_name,
          dep.city                                      AS departure_city,
          f.arrival_airport,
          arr.airport_name                              AS arrival_airport_name,
          arr.city                                      AS arrival_city,
          f.status,
          f.aircraft_code,
          f.actual_departure,
          timezone(dep.timezone, f.actual_departure)    AS actual_departure_local,
          f.actual_arrival,
          timezone(arr.timezone, f.actual_arrival)      AS actual_arrival_local,
          (f.actual_arrival - f.actual_departure)       AS actual_duration
    FROM bookings.flights f,
        bookings.airports dep,
        bookings.airports arr
    WHERE ((f.departure_airport = dep.airport_code) AND (f.arrival_airport = arr.airport_code));
    $$);
    

The schema creation is now complete. Let's proceed to data migration.

17.3.3. Data Migration #

When migrating data, the order of fields in the source and target schema is important. The order and types of fields in the non-distributed and distributed databases must be the same.

The migration utility does exactly what is requested by the user, who does not interfere with data migration processes except, maybe, distributing the data directly to the shard where it must be stored.

Postgres Pro Shardman provides convenient migration tools. Once the distributed schema is created and the sharding key chosen, it is now needed to define the data migration rules. The data source can be either export CSV data files or a single DBMS server.

It is not always convenient to use CSV files as they can reach a pretty large size and require additional resources for storage and transfer.

Migrating data directly from DB to DB without an intermediate storage phase is much more convenient.

The order of loading data during migration must be taken into account. Tables can be linked with a foreign key, so the data in tables that other tables will reference must be loaded first. To follow such an order, in the migration file, you should establish the priority that defines tables whose data must be loaded first. The higher the value of the priority parameter, the higher the priority. For example, if the priorities 1, 2 and 3 are defined, tables with the priority 3 will be loaded first, then those with the priority 2, and last with the priority 1.

The shardmanctl load command lets you define the order of migrating tables, which can be specified in the configuration YML file.

17.3.3.1. Naive Approach #

The following is an example of the migrate.yml file:

    version: "1.0"
    migrate:
    connstr: "dbname=demo host=single-pg-instance port=5432 user=postgres password=******"
    jobs: 8
    batch: 2000
    options:
    schemas:
      - name: bookings
        # the all parameter set to false turns off automatic creation of pages
        # tables are already created, at the Schema Migration phase
        all: false
        tables:
          - name: airports
            # defining a global table
            type: global
            # as tables are linked, data migration priority must be defined
            # setting highest priority to tables whose data
            # must be copied first
            priority: 3
          - name: aircrafts
            type: global
            priority: 3
          - name: seats
            type: global
            priority: 3
          - name: bookings
            type: global
            priority: 3
          - name: flights
            type: global
            priority: 3
          - name: tickets
            type: sharded
            # defining a sharded table
            # specifying the sharding key
            distributedby: ticket_no
            partitions: 4
            priority: 2
          - name: ticket_flights
            type: sharded
            distributedby: ticket_no
            # defining a sharded and colocated table
            # specifying the name of the table that ticket_flights table will be colocated with
            colocatewith: tickets
            partitions: 4
            priority: 2
          - name: boarding_passes
            type: sharded
            distributedby: ticket_no
            colocatewith: tickets
            partitions: 4
            priority: 1
    

This file defines the data source, that is, the single-pg-instance node, its connection port, user name and password, and data source DB name. Some parameters of the migration utility operation are also defined (there can be quite a few of them, as explained in the section called “Loading Data with a Schema from PostgreSQL). The file also defines the number of threads — 8, batch size, that is, the number of rows organized into batches for processing during migration, as well as table processing priorities. The data for the global tables is migrated first, then the data for the sharded tables tickets and ticket_flights, and migration of the boarding_passes table completes the migration. The value of priority defines the priority of data loading, data for tables with higher value will be loaded earlier than with the lower value. The following command performs the migration:

    shardmanctl load --schema migrate.yml
    

If the utility completes with the message data loading completed successfully, it means that the migration was a success.

17.3.3.2. Complex Approach #

With this approach, the launch and operation of the shardmanctl utility in the load mode is the same as with the naive approach. However, the file that defines the order of loading tables will slightly differ as the sharding key has changed:

    ---
    version: "1.0"
    migrate:
    connstr: "dbname=demo host=single-pg-instance port=5432 user=postgres password=postgres"
    jobs: 8
    batch: 2000
    options:
    schemas:
      - name: bookings
        all: false
        tables:
          - name: airports
            type: global
            priority: 5
          - name: aircrafts
            type: global
            priority: 5
          - name: seats
            type: global
            priority: 5
          - name: flights
            type: global
            priority: 5
          - name: bookings
            type: sharded
            priority: 4
            partitions: 4
            distributedby: book_ref
          - name: tickets
            type: sharded
            distributedby: book_ref
            colocatewith: bookings
            partitions: 4
            priority: 3
          - name: ticket_flights
            type: sharded
            distributedby: book_ref
            colocatewith: bookings
            partitions: 4
            priority: 2
          - name: boarding_passes
            type: sharded
            distributedby: book_ref
            colocatewith: bookings
            partitions: 4
            priority: 1
    

17.3.4. Checking Postgres Pro Shardman Migration #

When all the migration operations were performed successfully, it's time to check how queries are executed in the distributed schema.

17.3.4.1. q1 Query #

The q1 query is pretty simple, it selects the booking with the specified number:

    SELECT *
      FROM bookings.bookings b
    WHERE b.book_ref = '0824C5';
    

For the regular PostgreSQL and for the ticket_no sharding key, this query runs comparably fast. How fast the query is for the book_ref sharding key, depends on the shard where it is executed. If it is executed in a shard where there is physically no data, Postgres Pro Shardman sends the query to another shard, which causes a time delay due to network communication.

17.3.4.2. q2 Query #

This q2 query selects all the tickets from the specified booking:

    SELECT t.*
    FROM bookings.bookings b
    JOIN bookings.tickets t
      ON t.book_ref = b.book_ref
    WHERE b.book_ref = '0824C5';
    

With the book_ref sharding key, the query is pushed down to shards and the global table is joined with partitions of a sharded table:

    Foreign Scan (actual rows=2 loops=1)
      Relations: (bookings_2_fdw b) INNER JOIN (tickets_2_fdw t)
      Network: FDW bytes sent=433 received=237
    

Let's look at the query plan for the ticket_no sharding key:

    Append (actual rows=2 loops=1)
      Network: FDW bytes sent=1263 received=205
      ->  Nested Loop (actual rows=1 loops=1)
            ->  Seq Scan on tickets_0 t_1 (actual rows=1 loops=1)
                  Filter: (book_ref = '0824C5'::bpchar)
                  Rows Removed by Filter: 207092
            ->  Index Only Scan using bookings_pkey on bookings b (actual rows=1 loops=1)
                  Index Cond: (book_ref = '0824C5'::bpchar)
                  Heap Fetches: 0
      ->  Async Foreign Scan (actual rows=1 loops=1)
            Relations: (tickets_1_fdw t_2) INNER JOIN (bookings b)
            Network: FDW bytes sent=421 received=205
      ->  Async Foreign Scan (actual rows=0 loops=1)
            Relations: (tickets_2_fdw t_3) INNER JOIN (bookings b)
            Network: FDW bytes sent=421
      ->  Async Foreign Scan (actual rows=0 loops=1)
            Relations: (tickets_3_fdw t_4) INNER JOIN (bookings b)
            Network: FDW bytes sent=421
    

The plan contains Async Foreign Scan nodes, which mean network data exchange between the query source node and shards, that is, data is received from shards and final processing is done on the query source node.

Look at the Network line. A good criterion of whether query execution on shards is optimal is the value of received. The lower its value, the better shards execute distributed queries. Most processing is done remotely, and the query source node gets the result that is ready for further processing.

The case where the sharding key is book_ref looks much better as the table with ticket numbers already contains book_ref.

The plan of the query to be executed on an arbitrary node is as follows:

    Foreign Scan (actual rows=2 loops=1)
      Relations: (bookings_2_fdw b) INNER JOIN (tickets_2_fdw t)
      Network: FDW bytes sent=433 received=237
    

The network data exchange is only done with one shard, in which the query is executed. It is shard-3, and the tickets_2 partition of the tickets table is on the fourth node.

If this query is executed in the shard where the data is physically located, the query will be executed yet faster.

Let's look at the plan:

    Nested Loop (actual rows=2 loops=1)
        ->  Index Only Scan using bookings_2_pkey on bookings_2
        ->  Bitmap Heap Scan on tickets_2
              ->  Bitmap Index Scan on tickets_2_book_ref_idx
    

Network data exchange is not needed here as the requested data is located within the shard in which the query is executed.

In some cases, the choice of the shard for query execution matters. Being aware of the distribution logic, you can implement it at the application level and send some queries immediately to the shard where the needed data is located based on the sharding key.

17.3.4.3. q3 Query #

The q3 query finds all the flights for one of the tickets in the booking selected earlier:

    SELECT tf.*, t.*
    FROM bookings.tickets t
    JOIN bookings.ticket_flights tf
      ON tf.ticket_no = t.ticket_no
    WHERE t.ticket_no = '0005435126781';
    

To choose a specific shard for query execution, as discussed in Section 17.3.4.2, note that with the ticket_no sharding key, the query execution will be more optimal in the shard that contains the partition with the data. The planner knows that the shard contains all the data needed for joining tables, so no network communication between shards will occur.

For the book_ref sharding key, note that from the booking number you can compute the ticket number and request it right from the proper shard.

So the query is as follows:

    SELECT tf.*, t.*
    FROM bookings.tickets t
    JOIN bookings.ticket_flights tf
      ON tf.ticket_no = t.ticket_no
      AND t.book_ref = tf.book_ref
    WHERE t.ticket_no = '0005435126781'
    AND tf.book_ref = '0824C5';
    

The query is executed more slowly in the shard that does not contain the partition with the data sought:

    Foreign Scan (actual rows=6 loops=1)
      Relations: (tickets_1_fdw t) INNER JOIN (ticket_flights_1_fdw tf)
      Network: FDW bytes sent=434 received=369
    

Network communication between shards is present in the plan, as it contains the Foreign Scan node.

The importance of including the sharding key in a query can be illustrated with the following query for the book_ref sharding key:

    SELECT tf.*, t.*
    FROM bookings.tickets t
    JOIN bookings.ticket_flights tf
      ON tf.ticket_no = t.ticket_no
    WHERE t.ticket_no = '0005435126781'
    AND tf.book_ref = '0824C5';
    

Here the sharding key is not included in join on purpose. Let's look at the plan:

    Nested Loop (actual rows=6 loops=1)
      Network: FDW bytes sent=1419 received=600
      ->  Foreign Scan on ticket_flights_2_fdw tf (actual rows=6 loops=1)
            Network: FDW bytes sent=381 received=395
      ->  Append (actual rows=1 loops=6)
            Network: FDW bytes sent=1038 received=205
            ->  Seq Scan on tickets_0 t_1 (actual rows=0 loops=6)
                  Filter: (ticket_no = '0005435126781'::bpchar)
                  Rows Removed by Filter: 207273
            ->  Async Foreign Scan on tickets_1_fdw t_2 (actual rows=0 loops=6)
                  Network: FDW bytes sent=346 received=205
            ->  Async Foreign Scan on tickets_2_fdw t_3 (actual rows=1 loops=6)
                  Network: FDW bytes sent=346
            ->  Async Foreign Scan on tickets_3_fdw t_4 (actual rows=0 loops=6)
                  Network: FDW bytes sent=346
    

We can notice differences from previous examples. Here the query was executed on all nodes and index was not used, so to return as few as 6 rows, Postgres Pro Shardman had to sequentially scan whole partitions of the tickets table, return the result to the query source node and after that perform join with the ticket_flights table. Async Foreign Scan nodes indicate the sequential scan of the tickets table on shards.

17.3.4.4. q4 Query #

This query returns all the flights for all the tickets included in a booking. There are several ways to do this: include a subquery in a WHERE clause with the booking number, in the IN clause, explicitly list ticket numbers or use the WHERE...OR clause. Let's check execution of the query for all these variants.

    SELECT tf.*, t.*
    FROM bookings.tickets t
    JOIN bookings.ticket_flights tf
      ON tf.ticket_no = t.ticket_no
    WHERE t.ticket_no IN (
      SELECT t.ticket_no
        FROM bookings.bookings b
        JOIN bookings.tickets  t
          ON t.book_ref = b.book_ref
        WHERE b.book_ref = '0824C5'
    );
    

This is just the query from the non-distributed database that we tried to execute. But its execution is equally poor for both sharding keys.

The query plan is like this:

    Hash Join (actual rows=12 loops=1)
      Hash Cond: (tf.ticket_no = t.ticket_no)
      ->  Append (actual rows=2360335 loops=1)
            ->  Async Foreign Scan on ticket_flights_0_fdw tf_1 (actual rows=589983 loops=1)
            ->  Async Foreign Scan on ticket_flights_1_fdw tf_2 (actual rows=590175 loops=1)
            ->  Seq Scan on ticket_flights_2 tf_3 (actual rows=590174 loops=1)
            ->  Async Foreign Scan on ticket_flights_3_fdw tf_4 (actual rows=590003 loops=1)
      ->  Hash (actual rows=2 loops=1)
            Buckets: 1024  Batches: 1  Memory Usage: 9kB
            ->  Hash Semi Join (actual rows=2 loops=1)
                  Hash Cond: (t.ticket_no = t_5.ticket_no)
                  ->  Append (actual rows=829071 loops=1)
                        ->  Async Foreign Scan on tickets_0_fdw t_1 (actual rows=207273 loops=1)
                        ->  Async Foreign Scan on tickets_1_fdw t_2 (actual rows=207058 loops=1)
                        ->  Seq Scan on tickets_2 t_3 (actual rows=207431 loops=1)
                        ->  Async Foreign Scan on tickets_3_fdw t_4 (actual rows=207309 loops=1)
                  ->  Hash (actual rows=2 loops=1)
                        Buckets: 1024  Batches: 1  Memory Usage: 9kB
                        ->  Nested Loop (actual rows=2 loops=1)
                              ->  Index Only Scan using tickets_2_pkey on tickets_2 t_5
                              ->  Materialize (actual rows=1 loops=2)
                                    ->  Index Only Scan using bookings_2_pkey on bookings_2 b
    

This plan shows that Postgres Pro Shardman coped with the WHERE subquery, then had to request all the rows of the tickets and ticket_flights tables and then process them on the query source node. This is a really poor performance. Let's try other variants:

For the ticket_no sharding key, the query is:

        SELECT tf.*, t.*
        FROM bookings.tickets t
        JOIN bookings.ticket_flights tf
          ON tf.ticket_no = t.ticket_no
        WHERE t.ticket_no IN ('0005435126781','0005435126782');
        

and the plan is:

        Append (actual rows=12 loops=1)
          Network: FDW bytes sent=1098 received=1656
          ->  Async Foreign Scan (actual rows=6 loops=1)
                Relations: (tickets_0_fdw t_1) INNER JOIN (ticket_flights_0_fdw tf_1)
                Network: FDW bytes sent=549 received=1656
          ->  Async Foreign Scan (actual rows=6 loops=1)
                Relations: (tickets_1_fdw t_2) INNER JOIN (ticket_flights_1_fdw tf_2)
                Network: FDW bytes sent=549
        

Everything is pretty good here: the query was executed on two shards of four, and Append of the results received only had to be done.

Let's recall that book_ref is contained in both tickets and ticket_flights tables. So for the book_ref sharding key, the query is:

    SELECT tf.*, t.*
    FROM bookings.tickets t
    JOIN bookings.ticket_flights tf
    ON tf.ticket_no = t.ticket_no
    AND tf.book_ref = t.book_ref
    WHERE t.book_ref = '0824C5';
    

and the plan is:

    Foreign Scan (actual rows=12 loops=1)
      Relations: (tickets_2_fdw t) INNER JOIN (ticket_flights_2_fdw tf)
      Network: FDW bytes sent=547 received=1717
    

This is an excellent result — the query was modified to execute well in the distributed schema.

17.3.4.5. q5 Query #

This is a small analytical query, which returns the names and ticket numbers of the passengers who got registered first.

    SELECT t.passenger_name, t.ticket_no
    FROM bookings.tickets t
    JOIN bookings.boarding_passes bp
      ON bp.ticket_no = t.ticket_no
    GROUP BY t.passenger_name, t.ticket_no
    HAVING max(bp.boarding_no) = 1
    AND count(*) > 1;
    

This query is executed pretty slowly for both sharding keys. Below is the plan for book_ref:

    HashAggregate (actual rows=424 loops=1)
      Group Key: t.ticket_no
      Filter: ((max(bp.boarding_no) = 1) AND (count(*) > 1))
      Batches: 85  Memory Usage: 4265kB  Disk Usage: 112008kB
      Rows Removed by Filter: 700748
      Network: FDW bytes sent=1215 received=77111136
      ->  Append (actual rows=1894295 loops=1)
            Network: FDW bytes sent=1215 received=77111136
            ->  Async Foreign Scan (actual rows=473327 loops=1)
                  Relations: (tickets_0_fdw t_1) INNER JOIN (boarding_passes_0_fdw bp_1)
                  Network: FDW bytes sent=404 received=813128
            ->  Async Foreign Scan (actual rows=472632 loops=1)
                  Relations: (tickets_1_fdw t_2) INNER JOIN (boarding_passes_1_fdw bp_2)
                  Network: FDW bytes sent=404
            ->  Async Foreign Scan (actual rows=475755 loops=1)
                  Relations: (tickets_2_fdw t_3) INNER JOIN (boarding_passes_2_fdw bp_3)
                  Network: FDW bytes sent=407
            ->  Hash Join (actual rows=472581 loops=1)
                  Hash Cond: (bp_4.ticket_no = t_4.ticket_no)
                  Network: FDW bytes received=28841344
                  ->  Seq Scan on boarding_passes_3 bp_4 (actual rows=472581 loops=1)
                  ->  Hash (actual rows=207118 loops=1)
                        Buckets: 65536  Batches: 4  Memory Usage: 3654kB
                        Network: FDW bytes received=9176680
                        ->  Seq Scan on tickets_3 t_4 (actual rows=207118 loops=1)
                              Network: FDW bytes received=9176680
    

Note a pretty large amount of network data transfer between shards. Let's improve the query by adding book_ref as one more condition for joining tables:

    SELECT t.passenger_name, t.ticket_no
    FROM bookings.tickets t
    JOIN bookings.boarding_passes bp
      ON bp.ticket_no = t.ticket_no
      AND bp.book_ref=t.book_ref -- <= added book_ref
    GROUP BY t.passenger_name, t.ticket_no
    HAVING max(bp.boarding_no) = 1
    AND count(*) > 1;
    

Let's look at the query plan:

    GroupAggregate (actual rows=424 loops=1)
      Group Key: t.passenger_name, t.ticket_no
      Filter: ((max(bp.boarding_no) = 1) AND (count(*) > 1))
      Rows Removed by Filter: 700748
      Network: FDW bytes sent=1424 received=77092816
      ->  Merge Append (actual rows=1894295 loops=1)
            Sort Key: t.passenger_name, t.ticket_no
            Network: FDW bytes sent=1424 received=77092816
            ->  Foreign Scan (actual rows=472757 loops=1)
                  Relations: (tickets_0_fdw t_1) INNER JOIN (boarding_passes_0_fdw bp_1)
                  Network: FDW bytes sent=472 received=2884064
            ->  Sort (actual rows=472843 loops=1)
                  Sort Key: t_2.passenger_name, t_2.ticket_no
                  Sort Method: external merge  Disk: 21152kB
                  Network: FDW bytes received=22753536
                  ->  Hash Join (actual rows=472843 loops=1)
                        Hash Cond: ((bp_2.ticket_no = t_2.ticket_no) AND (bp_2.book_ref = t_2.book_ref))
                        Network: FDW bytes received=22753536
                        ->  Seq Scan on boarding_passes_1 bp_2 (actual rows=472843 loops=1)
                        ->  Hash (actual rows=207058 loops=1)
                              Buckets: 65536  Batches: 8  Memory Usage: 2264kB
                              Network: FDW bytes received=22753536
                              ->  Seq Scan on tickets_1 t_2 (actual rows=207058 loops=1)
                                    Network: FDW bytes received=22753536
            ->  Foreign Scan (actual rows=474715 loops=1)
                  Relations: (tickets_2_fdw t_3) INNER JOIN (boarding_passes_2_fdw bp_3)
                  Network: FDW bytes sent=476 received=2884120
            ->  Foreign Scan (actual rows=473980 loops=1)
                  Relations: (tickets_3_fdw t_4) INNER JOIN (boarding_passes_3_fdw bp_4)
                  Network: FDW bytes sent=476 received=25745384
    

The situation considerably improved, the result was received on the query source node, and then final filtering, grouping and joining data were done.

For the ticket_no sharding key, the source query plan looks like this:

    HashAggregate (actual rows=424 loops=1)
      Group Key: t.ticket_no
      Filter: ((max(bp.boarding_no) = 1) AND (count(*) > 1))
      Batches: 85  Memory Usage: 4265kB  Disk Usage: 111824kB
      Rows Removed by Filter: 700748
      Network: FDW bytes sent=1188 received=77103620
      ->  Append (actual rows=1894295 loops=1)
            Network: FDW bytes sent=1188 received=77103620
            ->  Async Foreign Scan (actual rows=473327 loops=1)
                  Relations: (tickets_0_fdw t_1) INNER JOIN (boarding_passes_0_fdw bp_1)
                  Network: FDW bytes sent=394
            ->  Hash Join (actual rows=472632 loops=1)
                  Hash Cond: (bp_2.ticket_no = t_2.ticket_no)
                  Network: FDW bytes received=77103620
                  ->  Seq Scan on boarding_passes_1 bp_2 (actual rows=472632 loops=1)
                  ->  Hash (actual rows=206712 loops=1)
                        Buckets: 65536  Batches: 4  Memory Usage: 3654kB
                        Network: FDW bytes received=23859576
                        ->  Seq Scan on tickets_1 t_2 (actual rows=206712 loops=1)
                              Network: FDW bytes received=23859576
            ->  Async Foreign Scan (actual rows=475755 loops=1)
                  Relations: (tickets_2_fdw t_3) INNER JOIN (boarding_passes_2_fdw bp_3)
                  Network: FDW bytes sent=397
            ->  Async Foreign Scan (actual rows=472581 loops=1)
                  Relations: (tickets_3_fdw t_4) INNER JOIN (boarding_passes_3_fdw bp_4)
                  Network: FDW bytes sent=397
    

We can see that table joining is done on shards, while data filtering, grouping and aggregation are done on the query source node. The source query does not need to be modified in this case.

17.3.4.6. q6 Query #

For each ticket booked a week ago from now, this query displays all the included flight segments, together with connection time.

    SELECT tf.ticket_no,f.departure_airport,
          f.arrival_airport,f.scheduled_arrival,
          lead(f.scheduled_departure) OVER w AS next_departure,
          lead(f.scheduled_departure) OVER w - f.scheduled_arrival AS gap
    FROM bookings.bookings b
    JOIN bookings.tickets t
    ON t.book_ref = b.book_ref
    JOIN bookings.ticket_flights tf
    ON tf.ticket_no = t.ticket_no
    JOIN bookings.flights f
    ON tf.flight_id = f.flight_id
    WHERE b.book_date = bookings.now()::date - INTERVAL '7 day'

    WINDOW w AS (
    PARTITION BY tf.ticket_no
    ORDER BY f.scheduled_departure);
    

For this query, the type of the book_date column must be cast from the timestamptz to date. When casting types, PostgreSQL casts the column data type to the data type specified in the filtering condition, but not vice versa. Therefore, Postgres Pro Shardman must first get all the data from other shards, cast the type and apply filtering only after that. The query plan looks like this:

    WindowAgg (actual rows=26 loops=1)
      Network: FDW bytes sent=1750 received=113339240
      ->  Sort (actual rows=26 loops=1)
            Sort Key: tf.ticket_no, f.scheduled_departure
            Sort Method: quicksort  Memory: 27kB
            Network: FDW bytes sent=1750 received=113339240
            ->  Append (actual rows=26 loops=1)
                  Network: FDW bytes sent=1750 received=113339240
                  ->  Hash Join (actual rows=10 loops=1)
                        Hash Cond: (t_1.book_ref = b.book_ref)
                        Network: FDW bytes sent=582 received=37717376
                  ->  Hash Join (actual rows=6 loops=1)
                        Hash Cond: (t_2.book_ref = b.book_ref)
                        Network: FDW bytes sent=582 received=37700608
                  ->  Hash Join (actual rows=2 loops=1)
                        Hash Cond: (t_3.book_ref = b.book_ref)
                        Network: FDW bytes sent=586 received=37921256
                  ->  Nested Loop (actual rows=8 loops=1)
                        ->  Nested Loop (actual rows=8 loops=1)
                              ->  Hash Join (actual rows=2 loops=1)
                                    Hash Cond: (t_4.book_ref = b.book_ref)
                                    ->  Seq Scan on tickets_3 t_4 (actual rows=207118 loops=1)
                        ->  Index Scan using flights_pkey on flights f (actual rows=1 loops=8)
                              Index Cond: (flight_id = tf_4.flight_id)
    

Pay attention to the number of bytes received from other cluster shards and to the sequential scan of the tickets table. Let's try to rewrite the query to avoid the type cast.

The idea is pretty simple: the interval will be computed at the application level rather than at the database level, and the data of the timestamptz type will be readily passed to the query. Besides, creation of an additional index can help:

    CREATE INDEX if not exists bookings_date_idx ON bookings.bookings(book_date);
    

For the book_ref sharding key, the query looks like this:

    SELECT tf.ticket_no,f.departure_airport,
          f.arrival_airport,f.scheduled_arrival,
          lead(f.scheduled_departure) OVER w AS next_departure,
          lead(f.scheduled_departure) OVER w - f.scheduled_arrival AS gap
    FROM bookings.bookings b
    JOIN bookings.tickets t
    ON t.book_ref = b.book_ref
    JOIN bookings.ticket_flights tf
    ON tf.ticket_no = t.ticket_no
    AND tf.book_ref = t.book_ref -- <= added book_ref
    JOIN bookings.flights f
    ON tf.flight_id = f.flight_id
    WHERE b.book_date = '2016-10-06 14:00:00+00'
    WINDOW w AS (
    PARTITION BY tf.ticket_no
    ORDER BY f.scheduled_departure);
    

This query has a different plan:

    WindowAgg (actual rows=18 loops=1)
      Network: FDW bytes sent=2268 received=892
      ->  Sort (actual rows=18 loops=1)
            Sort Key: tf.ticket_no, f.scheduled_departure
            Sort Method: quicksort  Memory: 26kB
            Network: FDW bytes sent=2268 received=892
            ->  Append (actual rows=18 loops=1)
                  Network: FDW bytes sent=2268 received=892
                  ->  Nested Loop (actual rows=4 loops=1)
                        ->  Nested Loop (actual rows=4 loops=1)
                              ->  Nested Loop (actual rows=1 loops=1)
                                    ->  Bitmap Heap Scan on bookings_0 b_1
                                          Heap Blocks: exact=1
                                          ->  Bitmap Index Scan on bookings_0_book_date_idx
                                    ->  Index Only Scan using tickets_0_pkey on tickets_0 t_1
                                          Index Cond: (book_ref = b_1.book_ref)
                                          Heap Fetches: 0
                              ->  Index Only Scan using ticket_flights_0_pkey on ticket_flights_0 tf_1
                                    Heap Fetches: 0
                        ->  Index Scan using flights_pkey on flights f (actual rows=1 loops=4)
                              Index Cond: (flight_id = tf_1.flight_id)
                  ->  Async Foreign Scan (actual rows=14 loops=1)
                        Network: FDW bytes sent=754 received=892
                  ->  Async Foreign Scan (actual rows=0 loops=1)
                        Network: FDW bytes sent=757 -- received=0!
                  ->  Async Foreign Scan (actual rows=0 loops=1)
                        Network: FDW bytes sent=757 -- received=0!
    

This is much better. First, the whole table is not scanned, Index Only Scan is only included. Second, it is clear how much the amount of network data transfer between nodes is reduced.

17.3.4.7. q7 Query #

Assume that statistics is needed showing how many passengers there are per booking. To find this out, let's first compute the number of passengers in each booking and then the number of bookings with each number of passengers.

    SELECT tt.cnt, count(*)
    FROM (
      SELECT count(*) cnt
      FROM bookings.tickets t
      GROUP BY t.book_ref
      ) tt
    GROUP BY tt.cnt
    ORDER BY tt.cnt;
    

This query processes all the data in the tickets and bookings tables. So intensive network data exchange between shards cannot be avoided. Also note that the value of the work_mem parameter must be pretty high to avoid the use of disk when joining tables. So let's change the value of work_mem in the cluster:

    shardmanctl set work_mem='256MB';
    

The query plan for the ticket_no sharding key is as follows:

    GroupAggregate (actual rows=5 loops=1)
      Group Key: tt.cnt
      Network: FDW bytes sent=798 received=18338112
      ->  Sort (actual rows=593433 loops=1)
            Sort Key: tt.cnt
            Sort Method: quicksort  Memory: 57030kB
            Network: FDW bytes sent=798 received=18338112
            ->  Subquery Scan on tt (actual rows=593433 loops=1)
                  Network: FDW bytes sent=798 received=18338112
                  ->  Finalize HashAggregate (actual rows=593433 loops=1)
                        Group Key: t.book_ref
                        Batches: 1  Memory Usage: 81953kB
                        Network: FDW bytes sent=798 received=18338112
                        ->  Append (actual rows=763806 loops=1)
                              Network: FDW bytes sent=798 received=18338112
                              ->  Async Foreign Scan (actual rows=190886 loops=1)
                                    Relations: Aggregate on (tickets_0_fdw t)
                                    Network: FDW bytes sent=266 received=1558336
                              ->  Async Foreign Scan (actual rows=190501 loops=1)
                                    Relations: Aggregate on (tickets_1_fdw t_1)
                                    Network: FDW bytes sent=266
                              ->  Async Foreign Scan (actual rows=191589 loops=1)
                                    Relations: Aggregate on (tickets_2_fdw t_2)
                                    Network: FDW bytes sent=266
                              ->  Partial HashAggregate (actual rows=190830 loops=1)
                                    Group Key: t_3.book_ref
                                    Batches: 1  Memory Usage: 36881kB
                                    Network: FDW bytes received=4981496
                                    ->  Seq Scan on tickets_3 t_3 (actual rows=207118 loops=1)
                                          Network: FDW bytes received=4981496
    

The query plan for the book_ref sharding key is as follows:

    Sort (actual rows=5 loops=1)
      Sort Key: (count(*))
      Sort Method: quicksort  Memory: 25kB
      Network: FDW bytes sent=798 received=14239951
      ->  HashAggregate (actual rows=5 loops=1)
            Group Key: (count(*))
            Batches: 1  Memory Usage: 40kB
            Network: FDW bytes sent=798 received=14239951
            ->  Append (actual rows=593433 loops=1)
                  Network: FDW bytes sent=798 received=14239951
                  ->  GroupAggregate (actual rows=148504 loops=1)
                        Group Key: t.book_ref
                        ->  Index Only Scan using tickets_0_book_ref_idx on tickets_0 t (rows=207273)
                              Heap Fetches: 0
                  ->  Async Foreign Scan (actual rows=148256 loops=1)
                        Relations: Aggregate on (tickets_1_fdw t_1)
                        Network: FDW bytes sent=266 received=1917350
                  ->  Async Foreign Scan (actual rows=148270 loops=1)
                        Relations: Aggregate on (tickets_2_fdw t_2)
                        Network: FDW bytes sent=266
                  ->  Async Foreign Scan (actual rows=148403 loops=1)
                        Relations: Aggregate on (tickets_3_fdw t_3)
                        Network: FDW bytes sent=266
    

The query plans differ first by the order of joining tables and by the computation of aggregates.

For the ticket_no sharding key, all the partially aggregated data of the joined tables is received (17 Mb), and all the rest of processing is performed on the query source node.

For the book_ref sharding key, as it is included in the query, most of the computation of aggregates is performed on the nodes and only the result (13 Mb) is returned to the query source node, which is then finalized.

17.3.4.8. q8 Query #

This query answers the question: which are the most frequent combinations of first and last names in bookings and what is the ratio of the passengers with such names to the total number of passengers. A window function is used to get the result:

    SELECT passenger_name,
          round( 100.0 * cnt / sum(cnt) OVER (), 2)
      AS percent
    FROM (
    SELECT passenger_name,
            count(*) cnt
    FROM bookings.tickets
    GROUP BY passenger_name
    ) t
    ORDER BY percent DESC;
    

For both sharding keys, the query plan looks like this:

    Sort (actual rows=27909 loops=1)
      Sort Key: (round(((100.0 * ((count(*)))::numeric) / sum((count(*))) OVER (?)), 2)) DESC
      Sort Method: quicksort  Memory: 3076kB
      Network: FDW bytes sent=816 received=2376448
      ->  WindowAgg (actual rows=27909 loops=1)
            Network: FDW bytes sent=816 received=2376448
            ->  Finalize HashAggregate (actual rows=27909 loops=1)
                  Group Key: tickets.passenger_name
                  Batches: 1  Memory Usage: 5649kB
                  Network: FDW bytes sent=816 received=2376448
                  ->  Append (actual rows=74104 loops=1)
                        Network: FDW bytes sent=816 received=2376448
                        ->  Partial HashAggregate (actual rows=18589 loops=1)
                              Group Key: tickets.passenger_name
                              Batches: 1  Memory Usage: 2833kB
                              ->  Seq Scan on tickets_0 tickets (actual rows=207273 loops=1)
                        ->  Async Foreign Scan (actual rows=18435 loops=1)
                              Relations: Aggregate on (tickets_1_fdw tickets_1)
                              Network: FDW bytes sent=272 received=2376448
                        ->  Async Foreign Scan (actual rows=18567 loops=1)
                              Relations: Aggregate on (tickets_2_fdw tickets_2)
                              Network: FDW bytes sent=272
                        ->  Async Foreign Scan (actual rows=18513 loops=1)
                              Relations: Aggregate on (tickets_3_fdw tickets_3)
                              Network: FDW bytes sent=272
    

The plan shows that the data preprocessing, table joins and partial aggregation are performed on shards, while the final processing is performed on the query source node.

17.3.4.9. q9 Query #

This query answers the question: who traveled from Moscow (SVO) to Novosibirsk (OVB) on seat 1A the day before yesterday, and when was the ticket booked. The day before yesterday is computed from the function booking.now rather than from the current date. The query in the non-distributed schema is as follows:

    SELECT
      t.passenger_name,
      b.book_date v
    FROM bookings b
    JOIN tickets t ON
      t.book_ref = b.book_ref
    JOIN boarding_passes bp
      ON bp.ticket_no = t.ticket_no
    JOIN flights f ON
      f.flight_id = bp.flight_id
    WHERE f.departure_airport = 'SVO'
    AND f.arrival_airport = 'OVB'
    AND f.scheduled_departure::date = bookings.now()::date - INTERVAL '2 day'
    AND bp.seat_no = '1A';
    

As explained for the q6 Query, INTERVAL causes the type cast. Let's get rid of it and rewrite the query for the book_ref sharding key as follows:

    SELECT
      t.passenger_name,
      b.book_date v
    FROM bookings b
    JOIN tickets t ON
      t.book_ref = b.book_ref
    JOIN boarding_passes bp
      ON bp.ticket_no = t.ticket_no
      AND bp.book_ref = b.book_ref -- <= added book_ref
    JOIN flights f ON
      f.flight_id = bp.flight_id
    WHERE f.departure_airport = 'SVO'
    AND f.arrival_airport = 'OVB'
    AND f.scheduled_departure
      BETWEEN '2016-10-11 14:00:00+00' AND '2016-10-13 14:00:00+00'
    AND bp.seat_no = '1A';
    

Let's also create a couple of additional indexes:

    CREATE INDEX idx_boarding_passes_seats
        ON boarding_passes((seat_no::text));
    CREATE INDEX idx_flights_sched_dep
        ON flights(departure_airport,arrival_airport,scheduled_departure);
    

As a result, the query plan appears pretty good:

    Append (actual rows=1 loops=1)
      Network: FDW bytes sent=2484 received=102
      ->  Nested Loop (actual rows=1 loops=1)
            Join Filter: (bp_1.ticket_no = t_1.ticket_no)
            Rows Removed by Join Filter: 1
            ->  Nested Loop (actual rows=1 loops=1)
                  ->  Hash Join (actual rows=1 loops=1)
                        Hash Cond: (bp_1.flight_id = f.flight_id)
                        ->  Bitmap Heap Scan on boarding_passes_0 bp_1 (actual rows=4919 loops=1)
                              Recheck Cond: ((seat_no)::text = '1A'::text)
                              Heap Blocks: exact=2632
                              ->  Bitmap Index Scan on boarding_passes_0_seat_no_idx (actual rows=4919)
                                    Index Cond: ((seat_no)::text = '1A'::text)
                        ->  Hash (actual rows=2 loops=1)
                              Buckets: 1024  Batches: 1  Memory Usage: 9kB
                              ->  Bitmap Heap Scan on flights f (actual rows=2 loops=1)
                                    Recheck Cond:
                        ((departure_airport = 'SVO'::bpchar) AND (arrival_airport = 'OVB'::bpchar) AND
                        (scheduled_departure >= '2016-10-11 14:00:00+00'::timestamp with time zone) AND
                        (scheduled_departure < '2016-10-13 14:00:00+00'::timestamp with time zone))
                                    Heap Blocks: exact=2
                                    ->  Bitmap Index Scan on idx_flights_sched_dep (actual rows=2 loops=1)
                                          Index Cond:
                                      ((departure_airport = 'SVO'::bpchar) AND
                                      (arrival_airport = 'OVB'::bpchar) AND
                          (scheduled_departure >= '2016-10-11 14:00:00+00'::timestamp with time zone) AND
                          (scheduled_departure <= '2016-10-13 14:00:00+00'::timestamp with time zone))
                  ->  Index Scan using bookings_0_pkey on bookings_0 b_1 (actual rows=1 loops=1)
                        Index Cond: (book_ref = bp_1.book_ref)
            ->  Index Scan using tickets_0_book_ref_idx on tickets_0 t_1 (actual rows=2 loops=1)
                  Index Cond: (book_ref = b_1.book_ref)
      ->  Async Foreign Scan (actual rows=0 loops=1)
            Relations: (((boarding_passes_1_fdw bp_2) INNER JOIN (flights f)) INNER JOIN (tickets_1_fdw t_2)) INNER JOIN (bookings_1_fdw b_2)
            Network: FDW bytes sent=826 received=68
      ->  Async Foreign Scan (actual rows=0 loops=1)
            Relations: (((boarding_passes_2_fdw bp_3) INNER JOIN (flights f)) INNER JOIN (tickets_2_fdw t_3)) INNER JOIN (bookings_2_fdw b_3)
            Network: FDW bytes sent=829 received=34
      ->  Async Foreign Scan (actual rows=0 loops=1)
            Relations: (((boarding_passes_3_fdw bp_4) INNER JOIN (flights f)) INNER JOIN (tickets_3_fdw t_4)) INNER JOIN (bookings_3_fdw b_4)
            Network: FDW bytes sent=829
    

It is clear from this plan that all the table joining was done on shards and the query source node received the result that did not contain rows as the data was located on one shard where the query was executed.

If this query were executed on a different shard, the plan would be the same, but the data for finalization would be received from the shard with the data.



[15] In the context of computer science, the expression naïve approach (verbatim: naive method, naive approach) means something very similar to brute-force approach and means the first basic idea that occurs in one's mind and often takes no account of the complexity, corner cases and of some requirements. On one hand, this is a coarse and direct method that only aims to get a working solution. On the other hand, such solutions are easy to understand and implement, but system resources may be used inefficiently.

[16] As values from different ranges can be assigned, the value can leap. For example, the value of 5 may be assigned in the first shard, the value of 140003 — in the second one, 70003 — in the third one etc.