3.1. Migration of a Database Schema #

Let's use the demo database Airlines as an example for development. The detailed description of the database schema is available at https://postgrespro.ru/education/demodb. This schema is used as a demo in training courses of Postgres Professional, for example, in QPT. Query Optimization.

The schema authors characterized it like this: We tried to make the database schema as simple as possible, without overloading it with unnecessary details, but not too simple to allow writing interesting and meaningful queries.

The database schema contains several tables with meaningful contents. For example, let's take the demo database version of 13.10.2016. You can find a link to downloading the database and schema dump (in Russian) following the link https://postgrespro.ru/education/courses/QPT. In addition to query examples provided below, you can find more examples from the above course and in the Postgres. The First Experience book.

This section shows two examples of schema modification and query adaptation:

  • Naive approach. It is simple, with minimal transformations to the schema, and it aims to add clarity to how queries work in a distributed schema.

  • Complex approach It is more complex, provided for better understanding of problems and processes that a developer may confront when migrating to a distributed schema and adapting applications to such a schema.

3.1.1. Database Source Schema #

Figure 3.1. Database Source Schema

Database Source Schema


The authors describe the Airlines database as follows:

The main entity is a booking (bookings).

One booking can include several passengers, with a separate ticket (tickets) issued to each passenger. A ticket has a unique number and includes information about the passenger. As such, the passenger is not a separate entity. Both the passenger's name and identity document number can change over time, so it is impossible to uniquely identify all the tickets of a particular person; for simplicity, we can assume that all passengers are unique.

The ticket includes one or more flight segments (ticket_flights). Several flight segments can be included into a single ticket if there are no non-stop flights between the points of departure and destination (connecting flights), or if it is a round-trip ticket. Although there is no constraint in the schema, it is assumed that all tickets in the booking have the same flight segments.

Each flight (flights) goes from one airport (airports) to another. Flights with the same flight number have the same points of departure and destination, but differ in departure date.

At flight check-in, the passenger is issued a boarding pass (boarding_passes), where the seat number is specified. The passenger can check in for the flight only if this flight is included into the ticket. The flight-seat combination must be unique to avoid issuing two boarding passes for the same seat.

The number of seats (seats) in the aircraft and their distribution between different travel classes depends on the model of the aircraft (aircrafts) performing the flight. It is assumed that every aircraft model has only one cabin configuration. Database schema does not check that seat numbers in boarding passes have the corresponding seats in the aircraft (such verification can be done using table triggers, or at the application level).

Let's look at the common entities and sizes of tables in the above schema. It is clear that ticket_flights, boarding_passes and tickets tables are linked by the ticket_no field. Additionally, the data size in these tables is 95% the total DB size.

Let's look at the bookings table. Although it seems to have a pretty compact structure, it can reach a considerable size over time.

Migration examples are provided for a Shardman cluster that contains four shards. Sharded tables are divided into four parts, so that one part of a sharded table is only located in one shard. This is done on purpose, to more clearly display query plans. In real life, the number of partitions should be determined by the maximum number of cluster nodes.

When migrating a real-life DB schema, you should think over in advance the number of partitions to partition data in distributed tables. Also bear in mind that the best migration approach is to use SQL transformations that impose minimal limitations on database objects.

3.1.2. Shardman Cluster Configuration #

The Shardman cluster consists of four nodes — node1, node2, node3 and node4. Each cluster node is a shard.

The examples assume that the tables are divided into four partitions by the sharding key (num_parts = 4) and distributed across cluster nodes. Each table part with the data is located in the corresponding shard:

  • shard-1 is located on the cluster node node1

  • shard-2 is located on the cluster node node2

  • shard-3 is located on the cluster node node3

  • shard-4 is located on the cluster node node4

The cluster is intentionally presented in a simplified configuration. Cluster nodes have no replicas, and the configuration is not fault-tolerant.

3.1.3. Selecting the Sharding Key #

3.1.3.1. Naive[1] Approach — ticket_no Sharding Key #

With this approach, the choice of the sharding key is pretty evident. It is the ticket number ticket_no. The ticket number is the primary key of the tickets table, and it is a foreign key of the ticket_flights and boarding_passes tables.

The primary key of the ticket_flights and boarding_passes tables is composite. It is a unique index composed of ticket_no and flight_id.

So if ticket_no is chosen to be a sharding key, the data of the three tables is distributed across cluster shards and partitions that contain linked data are located in the same shards.

The rest of the tables — airports, flights, aircrafts and seats are small enough and rarely change. This allows making them global tables, or dictionary tables.

Figure 3.2. Naive Approach Schema

Naive Approach Schema


The main advantage of this approach from the point of view of creating the schema and queries to the DB is that no changes are needed except those that are inherent to working with distributed systems, that is, explicitly declaring tables, sequences etc. as distributed when creating them.

Once the sharding key is selected, we can proceed to creation of the distributed schema.

[1]

3.1.3.1.1. Creating the Schema Distributed by ticket_no #

First, turn on broadcasting DDL statements to all cluster shards:

SET shardman.broadcast_ddl TO on;

Let's create the bookings schema on all shards:

CREATE SCHEMA bookings;

As tables in the schema are linked with one another by a foreign key, the order of creating them, as well as auxiliary objects, matters.

The demo database contains snapshots of data, similar to a backup copy of a real system captured at some point in time. For example, if a flight has the Departed status, it means that the aircraft had already departed and was airborne at the time of the backup copy. The snapshot time is saved in the bookings.now() function. You can use this function in demo queries for cases where you would use the now() function in a real database. In addition, the return value of this function determines the version of the demo database. The latest version available is of 13.10.2016:

SELECT bookings.now() as now;
          now
-----------------------
2016-10-13 17:00:00+03

In relation to this moment, all flights are classified as past and future flights.

Let's create the utility function bookings.now():

    CREATE FUNCTION bookings.now() RETURNS timestamp with time zone
    LANGUAGE sql IMMUTABLE COST 0.00999999978
    AS
    $sql$
    SELECT $qq$2016-10-13 17:00:00$qq$::TIMESTAMP AT TIME ZONE
$zz$Europe/Moscow$zz$;
    $sql$;

In addition to tables, a global sequence is needed for generating IDs for data insertion in the flights table. In this example, we create the sequence explicitly and link it with a column of this table by assigning the generated values by default.

Let's create the sequence using the following DDL statement:

CREATE SEQUENCE bookings.flights_flight_id_seq
    INCREMENT BY 1
    NO MINVALUE
    NO MAXVALUE
    CACHE 1 with(global);

with(global) creates a single distributed sequence available on all cluster nodes, which assigns values in a certain range for each shard, and the ranges for different shards do not intersect. See Section 7.6 and Section 6.5 for more details of global sequences.

Under the hood of global sequences, there are regular sequences on each shard, and they are allocated by sequential blocks (of 65536 numbers by default). When all the numbers in a block are over, the next block is allocated to the local sequence of the shard. I.e., numbers from the global sequences are unique, but there is no strict monotony, and there may be "holes" in the values given by the sequencer[2].

The sequences can have the bigserial, smallserial, or serial type. Sequences are applicable both for sharded and global tables.

You should not create local sequences in each shard as their values may be duplicated.

[2]

Now, we create global tables. As explained above, they are small-size, their data changes rarely, so they are actually dictionary tables, which must contain the same data in all cluster shards. It is required that each global table has a primary key.

Let's create global tables using the following DDL statements:

CREATE TABLE bookings.aircrafts (
    aircraft_code character(3) NOT NULL primary key,
    model text NOT NULL,
    range integer NOT NULL,
    CONSTRAINT aircrafts_range_check CHECK ((range > 0))
) with (global);

CREATE TABLE bookings.seats (
    aircraft_code character(3) references bookings.aircrafts(aircraft_code),
    seat_no character varying(4) NOT NULL,
    fare_conditions character varying(10) NOT NULL,
    CONSTRAINT seats_fare_conditions_check CHECK (((fare_conditions)::text = ANY (ARRAY[('Economy'::character varying)::text, ('Comfort'::character varying)::text, ('Business'::character varying)::text]))),
    PRIMARY KEY (aircraft_code, seat_no)
) with (global);

CREATE TABLE bookings.airports (
    airport_code character(3) NOT NULL primary key,
    airport_name text NOT NULL,
    city text NOT NULL,
    longitude double precision NOT NULL,
    latitude double precision NOT NULL,
    timezone text NOT NULL
)  with (global);

CREATE TABLE bookings.bookings (
    book_ref character(6) NOT NULL,
    book_date timestamp with time zone NOT NULL,
    total_amount numeric(10,2) NOT NULL,
    PRIMARY KEY (book_ref)
) with (global);

CREATE TABLE bookings.flights (
    flight_id bigint NOT NULL PRIMARY KEY,-- <= a sequence will be assigned
    flight_no character(6) NOT NULL,
    scheduled_departure timestamp with time zone NOT NULL,
    scheduled_arrival timestamp with time zone NOT NULL,
    departure_airport character(3) REFERENCES bookings.airports(airport_code),
    arrival_airport character(3) REFERENCES bookings.airports(airport_code),
    status character varying(20) NOT NULL,
    aircraft_code character(3) references bookings.aircrafts(aircraft_code),
    actual_departure timestamp with time zone,
    actual_arrival timestamp with time zone,
    CONSTRAINT flights_check CHECK ((scheduled_arrival > scheduled_departure)),
    CONSTRAINT flights_check1 CHECK (((actual_arrival IS NULL) OR ((actual_departure IS NOT NULL) AND (actual_arrival IS NOT NULL) AND (actual_arrival > actual_departure)))),
    CONSTRAINT flights_status_check CHECK (((status)::text = ANY (ARRAY[('On Time'::character varying)::text, ('Delayed'::character varying)::text, ('Departed'::character varying)::text, ('Arrived'::character varying)::text, ('Scheduled'::character varying)::text, ('Cancelled'::character varying)::text])))
) with (global);

-- associate the sequence with table column
ALTER SEQUENCE bookings.flights_flight_id_seq OWNED BY bookings.flights.flight_id;


-- assign the default value to the column
ALTER TABLE bookings.flights ALTER COLUMN flight_id SET DEFAULT nextval('bookings.flights_flight_id_seq');

ALTER TABLE bookings.flights ADD CONSTRAINT flights_flight_no_scheduled_departure_key UNIQUE (flight_no, scheduled_departure);

Next, we create sharded tables tickets, ticket_flights and boarding_passes in the bookings schema:

CREATE TABLE bookings.tickets (
    ticket_no character(13) PRIMARY KEY,
    book_ref character(6) REFERENCES bookings.bookings(book_ref),
    passenger_id character varying(20) NOT NULL,
    passenger_name text NOT NULL,
    contact_data jsonb
) with (distributed_by='ticket_no', num_parts=4);

CREATE TABLE bookings.ticket_flights (
    ticket_no character(13) NOT NULL,
    flight_id bigint references bookings.flights(flight_id),
    fare_conditions character varying(10) NOT NULL,
    amount numeric(10,2) NOT NULL,
    CONSTRAINT ticket_flights_amount_check CHECK ((amount >= (0)::numeric)),
    CONSTRAINT ticket_flights_fare_conditions_check CHECK (((fare_conditions)::text = ANY (ARRAY[('Economy'::character varying)::text, ('Comfort'::character varying)::text, ('Business'::character varying)::text]))),
    PRIMARY KEY (ticket_no, flight_id)
) with (distributed_by='ticket_no', colocate_with='bookings.tickets');

CREATE TABLE bookings.boarding_passes (
    ticket_no character(13) NOT NULL,
    flight_id bigint NOT NULL,
    boarding_no integer NOT NULL,
    seat_no character varying(4) NOT NULL,
    FOREIGN KEY (ticket_no, flight_id) REFERENCES bookings.ticket_flights(ticket_no, flight_id),
    PRIMARY KEY (ticket_no, flight_id)
) with (distributed_by='ticket_no', colocate_with='bookings.tickets');

-- constraints must contain sharding key
ALTER TABLE bookings.boarding_passes ADD CONSTRAINT boarding_passes_flight_id_boarding_no_key UNIQUE (ticket_no, flight_id, boarding_no);

ALTER TABLE bookings.boarding_passes ADD CONSTRAINT boarding_passes_flight_id_seat_no_key UNIQUE (ticket_no, flight_id, seat_no);

Additionally, when creating sharded tables, the num_parts parameter can be specified, which defines the number of sharded table partitions. In this example, it equals 4 to minimize the output of query plans. The default value is 20. This parameter may be important if in future you are going to add shards to a cluster and scale horizontally.

Based on the assumed future load and data size, num_parts should be sufficient for data rebalancing when new shards are added (num_parts must be greater than or equal to the number of cluster nodes). On the other hand, too many partitions cause a considerable increase of the query planning time. Therefore, an optimal balance should be achieved between the number of partitions and number of cluster nodes.

The last thing to do is to create a view that is needed to execute some queries:

CREATE VIEW bookings.flights_v AS
 SELECT f.flight_id,
    f.flight_no,
    f.scheduled_departure,
    timezone(dep.timezone, f.scheduled_departure) AS scheduled_departure_local,
    f.scheduled_arrival,
    timezone(arr.timezone, f.scheduled_arrival) AS scheduled_arrival_local,
    (f.scheduled_arrival - f.scheduled_departure) AS scheduled_duration,
    f.departure_airport,
    dep.airport_name AS departure_airport_name,
    dep.city AS departure_city,
    f.arrival_airport,
    arr.airport_name AS arrival_airport_name,
    arr.city AS arrival_city,
    f.status,
    f.aircraft_code,
    f.actual_departure,
    timezone(dep.timezone, f.actual_departure) AS actual_departure_local,
    f.actual_arrival,
    timezone(arr.timezone, f.actual_arrival) AS actual_arrival_local,
    (f.actual_arrival - f.actual_departure) AS actual_duration
   FROM bookings.flights f,
    bookings.airports dep,
    bookings.airports arr
  WHERE ((f.departure_airport = dep.airport_code) AND (f.arrival_airport = arr.airport_code));

Now creation of the distributed schema is complete. Let's turn off broadcasting of DDL statements:

SET shardman.broadcast_ddl TO off;

3.1.3.2. Complex Approach — book_ref Sharding Key #

A more complex approach to the sharding key choice involves the source schema modification, inclusion of new parameters in queries and other important changes.

What if an airline is in the market for over 10 years and the bookings table reaches the size that does not allow you to continue having it global anymore? But distributing its data is impossible either as it does not contain fields contained in other tables that it can be distributed among (as in variant 1).

When modifying the source schema, another field can be appropriate for use as a sharding key.

Looking at the bookings table, we can notice that values of the book_ref field are unique and this field is a primary key. book_ref is also a foreign key to the tickets table. So this field seems suitable for being the sharding key. However, book_ref is missing from the ticket_flights and boarding_passes tables.

If we add book_ref to the ticket_flights and boarding_passes tables, distributing of all the tables bookings, tickets, ticket_flights and boarding_passes with the book_ref sharding key becomes possible.

book_ref should be added to ticket_flights and boarding_passes in the source schema, and book_ref must be filled with data from the bookings table.

Figure 3.3. Source Schema Modification

Source Schema Modification


3.1.3.2.1. Modifying the Source Schema #

To properly transfer data from the source schema to the distributed one, the schema should be modified as follows:

  1. Add the book_ref field to the ticket_flights and boarding_passes tables:

    ALTER TABLE ticket_flights
       ADD COLUMN book_ref char(6);
    
    ALTER TABLE boarding_passes
       ADD COLUMN book_ref char(6);
    

  2. In these tables, fill the added book_ref field with data:

    WITH batch AS (SELECT book_ref,
                           ticket_no
                     FROM tickets)
    UPDATE ticket_flights
       SET book_ref = batch.book_ref
      FROM batch
     WHERE ticket_flights.ticket_no = batch.ticket_no
       AND ticket_flights.book_ref IS NULL;
    
    
    WITH batch AS (SELECT book_ref,
                          ticket_no
                     FROM tickets)
    UPDATE boarding_passes
       SET book_ref = batch.book_ref
      FROM batch
     WHERE boarding_passes.ticket_no = batch.ticket_no
       AND boarding_passes.book_ref IS NULL;
    

    Avoid using this example in a loaded production system as this approach blocks entire tables, that is, all rows in the tables. In production systems, data should be transferred incrementally, by parts.

Now the database schema is ready for data transferring.

3.1.3.2.2. Creating a Schema Distributed by book_ref #

Here the Shardman shardman.broadcast_all_sql() function is used to broadcast DDL statements on all cluster nodes. Let's create the bookings schema on all shards:

SELECT shardman.broadcast_all_sql('CREATE SCHEMA bookings');

As tables in the schema are linked with an external key, the order of creating tables matters.

First we create a utility function bookings.now():

SELECT shardman.broadcast_all_sql(
    $sql$
    CREATE FUNCTION bookings.now() RETURNS timestamp with time zone
    LANGUAGE sql IMMUTABLE COST 0.00999999978
    AS 
    $q$
    SELECT $qq$2016-10-13 17:00:00$qq$::TIMESTAMP 
                    AT TIME ZONE $zz$Europe/Moscow$zz$;
    $q$;
    $sql$
);

Tables, users and sequences are created with the regular SQL. This function is not needed for that.

In this example, the global sequence is not explicitly created as for the bigserial type, Shardman creates a global sequence automatically.

Now let's create global tables using the following DDL statements:

CREATE TABLE bookings.aircrafts (
    aircraft_code character(3) NOT NULL PRIMARY KEY,
    model text NOT NULL,
    range integer NOT NULL,
    CONSTRAINT aircrafts_range_check CHECK ((range > 0))
) WITH (global);

CREATE TABLE bookings.seats (
    aircraft_code character(3) REFERENCES bookings.aircrafts(aircraft_code),
    seat_no character varying(4) NOT NULL,
    fare_conditions character varying(10) NOT NULL,
    CONSTRAINT seats_fare_conditions_check CHECK ((
          (fare_conditions)::text = ANY (ARRAY[
             ('Economy'::character varying)::text,
             ('Comfort'::character varying)::text,
             ('Business'::character varying)::text])
           )),
    PRIMARY KEY (aircraft_code, seat_no)
) WITH (global);

CREATE TABLE bookings.airports (
    airport_code character(3) NOT NULL PRIMARY KEY,
    airport_name text NOT NULL,
    city text NOT NULL,
    longitude double precision NOT NULL,
    latitude double precision NOT NULL,
    timezone text NOT NULL
)  WITH (global);

CREATE TABLE bookings.flights (
-- the global sequence will be created automatically
-- the default value will be assigned
    flight_id bigserial NOT NULL PRIMARY KEY,
    flight_no character(6) NOT NULL,
    scheduled_departure timestamp with time zone NOT NULL,
    scheduled_arrival timestamp with time zone NOT NULL,
    departure_airport character(3) REFERENCES bookings.airports(airport_code),
    arrival_airport character(3) REFERENCES bookings.airports(airport_code),
    status character varying(20) NOT NULL,
    aircraft_code character(3) REFERENCES bookings.aircrafts(aircraft_code),
    actual_departure timestamp with time zone,
    actual_arrival timestamp with time zone,
    CONSTRAINT flights_check CHECK ((scheduled_arrival > scheduled_departure)),
    CONSTRAINT flights_check1 CHECK ((
                  (actual_arrival IS NULL) 
               OR ((actual_departure IS NOT NULL) 
              AND (actual_arrival IS NOT NULL) 
              AND (actual_arrival > actual_departure)))),
    CONSTRAINT flights_status_check CHECK (
           ((status)::text = ANY (
      ARRAY[('On Time'::character varying)::text,
            ('Delayed'::character varying)::text,
            ('Departed'::character varying)::text,
            ('Arrived'::character varying)::text,
            ('Scheduled'::character varying)::text,
            ('Cancelled'::character varying)::text])))
) WITH (global);

ALTER TABLE bookings.flights 
   ADD CONSTRAINT flights_flight_no_scheduled_departure_key 
   UNIQUE (flight_no, scheduled_departure);

Now let's create sharded tables bookings, tickets, ticket_flights and boarding_passes in the bookings schema, as in the previous example:

-- no modifications to these tables are done except distributing them
CREATE TABLE bookings.bookings (
   book_ref character(6) NOT NULL PRIMARY KEY,
   book_date timestamp with time zone NOT NULL,
   total_amount numeric(10,2) NOT NULL
) WITH (distributed_by='book_ref', num_parts=4);


CREATE TABLE bookings.tickets (
   ticket_no character(13) NOT NULL,
   book_ref character(6) REFERENCES bookings.bookings(book_ref),
   passenger_id character varying(20) NOT NULL,
   passenger_name text NOT NULL,
   contact_data jsonb,
   PRIMARY KEY (book_ref, ticket_no)
) WITH (distributed_by='book_ref', colocate_with='bookings.bookings');

-- adding the book_ref foreign key to these tables
CREATE TABLE bookings.ticket_flights (
   ticket_no character(13) NOT NULL,
   flight_id bigint NOT NULL,
   fare_conditions character varying(10) NOT NULL,
   amount numeric(10,2) NOT NULL,
   book_ref character(6) NOT NULL, -- <= added book_ref
   CONSTRAINT ticket_flights_amount_check 
        CHECK ((amount >= (0)::numeric)),
   CONSTRAINT ticket_flights_fare_conditions_check 
        CHECK (((fare_conditions)::text = ANY (
        ARRAY[('Economy'::character varying)::text,
              ('Comfort'::character varying)::text,
              ('Business'::character varying)::text]))),
   FOREIGN KEY (book_ref, ticket_no) 
        REFERENCES bookings.tickets(book_ref, ticket_no),
   PRIMARY KEY (book_ref, ticket_no, flight_id) -- <= changed the primary key
) with (distributed_by='book_ref', colocate_with='bookings.bookings');


CREATE TABLE bookings.boarding_passes (
    ticket_no character(13) NOT NULL,
    flight_id bigint NOT NULL,
    boarding_no integer NOT NULL,
    seat_no character varying(4) NOT NULL,
    FOREIGN KEY (book_ref, ticket_no, flight_id) 
         REFERENCES bookings.ticket_flights(book_ref, ticket_no, flight_id),
    PRIMARY KEY (book_ref, ticket_no, flight_id)
) WITH (distributed_by='book_ref', colocate_with='bookings.bookings');

-- constraints must contain the sharding key
ALTER TABLE bookings.boarding_passes 
  ADD CONSTRAINT boarding_passes_flight_id_boarding_no_key 
  UNIQUE (book_ref, ticket_no, flight_id, boarding_no);

ALTER TABLE bookings.boarding_passes 
  ADD CONSTRAINT boarding_passes_flight_id_seat_no_key 
  UNIQUE (book_ref, ticket_no, flight_id, seat_no);

Let's create the bookings.flights view:

SELECT shardman.broadcast_all_sql($$
CREATE VIEW bookings.flights_v AS
SELECT f.flight_id,
      f.flight_no,
      f.scheduled_departure,
      timezone(dep.timezone, f.scheduled_departure) AS scheduled_departure_local,
      f.scheduled_arrival,
      timezone(arr.timezone, f.scheduled_arrival)   AS scheduled_arrival_local,
      (f.scheduled_arrival - f.scheduled_departure) AS scheduled_duration,
      f.departure_airport,
      dep.airport_name                              AS departure_airport_name,
      dep.city                                      AS departure_city,
      f.arrival_airport,
      arr.airport_name                              AS arrival_airport_name,
      arr.city                                      AS arrival_city,
      f.status,
      f.aircraft_code,
      f.actual_departure,
      timezone(dep.timezone, f.actual_departure)    AS actual_departure_local,
      f.actual_arrival,
      timezone(arr.timezone, f.actual_arrival)      AS actual_arrival_local,
      (f.actual_arrival - f.actual_departure)       AS actual_duration
FROM bookings.flights f,
    bookings.airports dep,
    bookings.airports arr
WHERE ((f.departure_airport = dep.airport_code) AND (f.arrival_airport = arr.airport_code));
$$);

The schema creation is now complete. Let's proceed to data migration.



[1] In the context of computer science, the expression naïve approach (verbatim: naive method, naive approach) means something very similar to brute-force approach and means the first basic idea that occurs in one's mind and often takes no account of the complexity, corner cases and of some requirements. On one hand, this is a coarse and direct method that only aims to get a working solution. On the other hand, such solutions are easy to understand and implement, but system resources may be used inefficiently.

[2] As values from different ranges can be assigned, the value can leap. For example, the value of 5 may be assigned in the first shard, the value of 140003 — in the second one, 70003 — in the third one etc.

pdf