2.3. Rebalancing the Data #

2.3.1. Automatically Rebalancing the Data #

Automatic rebalancing is used as the default mode. A rebalance process starts automatically after adding nodes (by default if --no-rebalance is not set) or before deleting a node. Rebalance can also be started manually. The essence of the rebalancing process is to evenly distribute partitions for each sharded table between replication groups.

The rebalancing process for each sharded table iteratively determines the replication group with the maximum and minimum number of partitions and creates a task to move one partition to the replication group with the minimum number of partitions. This process is repeated while max - min > 1. To move partitions, we use logical replication. Partitions of colocated tables are moved together with partitions of the sharded tables to which they refer.

It is important to remember that max_logical_replication_workers should be rather high since the rebalance process uses up to max(max_replication_slots, max_logical_replication_workers, max_worker_processes, max_wal_senders)/3 concurrent threads. In practice, you can use max_logical_replication_workers = Repfactor + 3 * task_num (task_num is the number of parallel rebalance tasks).

To rebalance sharded tables in the cluster0 cluster manually, run the command (where etcd1, etcd2, etcd3 are etcd cluster nodes):

                    $ shardmanctl --store-endpoints http://etcd1:2379,http://etcd2:2379,http://etcd3:2379 rebalance
                

If the process ends with an error, then you need to call the shardmanctl cleanup command with the --after-rebalance option.

2.3.2. Manually Rebalancing the Data #

There are times when you need to place partitions of sharded tables in a specific way across the cluster nodes. To solve this problem, Shardman supports the manual data rebalancing mode.

How it works:

  1. Get a list of sharded tables using the shardmanctl tables sharded list command. As a result, we get an answer similar to the following:

    $ shardmanctl shardmanctl tables sharded list
    
    Sharded tables:
    
    public.doc
    public.resolution
    public.users
    
                                

  2. Request information about the selected sharded tables. Example:

    $ shardmanctl shardmanctl tables sharded info -t public.users
    
    Table public.users
    
    Partitions:
                                        
    Partition    RgID     Shard                            Master
    0            1        clover-1-shrn1                   shrn1:5432
    1            2        clover-2-shrn2                   shrn2:5432
    2            3        clover-3-shrn3                   shrn3:5432
    3            1        clover-1-shrn1                   shrn1:5432
    4            2        clover-2-shrn2                   shrn2:5432
    5            3        clover-3-shrn3                   shrn3:5432
    6            1        clover-1-shrn1                   shrn1:5432
    7            2        clover-2-shrn2                   shrn2:5432
    8            3        clover-3-shrn3                   shrn3:5432
    9            1        clover-1-shrn1                   shrn1:5432
    10           2        clover-2-shrn2                   shrn2:5432
    11           3        clover-3-shrn3                   shrn3:5432
    12           1        clover-1-shrn1                   shrn1:5432
    13           2        clover-2-shrn2                   shrn2:5432
    14           3        clover-3-shrn3                   shrn3:5432
    15           1        clover-1-shrn1                   shrn1:5432
    16           2        clover-2-shrn2                   shrn2:5432
    17           3        clover-3-shrn3                   shrn3:5432
    18           1        clover-1-shrn1                   shrn1:5432
    19           2        clover-2-shrn2                   shrn2:5432
    20           3        clover-3-shrn3                   shrn3:5432
    21           1        clover-1-shrn1                   shrn1:5432
    22           2        clover-2-shrn2                   shrn2:5432
    23           3        clover-3-shrn3                   shrn3:5432
    
                                

  3. Move a partition to a new shard, as shown below:

    $ shardmanctl --log-level debug tables sharded partmove -t public.users --partnum 1 --shard clover-1-shrn1
    
    2023-07-26T06:00:36.900Z        DEBUG   cmd/common.go:105       Waiting for metadata lock...
    2023-07-26T06:00:36.936Z        DEBUG   rebalance/service.go:256        take extension lock
    2023-07-26T06:00:36.938Z        DEBUG   broadcaster/worker.go:33        start broadcaster worker for repgroup id=3
    2023-07-26T06:00:36.938Z        DEBUG   broadcaster/worker.go:33        start broadcaster worker for repgroup id=2
    2023-07-26T06:00:36.938Z        DEBUG   broadcaster/worker.go:33        start broadcaster worker for repgroup id=1
    2023-07-26T06:00:36.951Z        DEBUG   broadcaster/worker.go:51        repgroup 3 connect established
    2023-07-26T06:00:36.951Z        DEBUG   broadcaster/worker.go:51        repgroup 2 connect established
    2023-07-26T06:00:36.952Z        DEBUG   broadcaster/worker.go:51        repgroup 1 connect established
    2023-07-26T06:00:36.952Z        DEBUG   extension/lock.go:35    Waiting for extension lock...
    2023-07-26T06:00:36.976Z        INFO    rebalance/service.go:276        Performing move partition...
    2023-07-26T06:00:36.977Z        DEBUG   broadcaster/worker.go:33        start broadcaster worker for repgroup id=3
    2023-07-26T06:00:36.978Z        DEBUG   broadcaster/worker.go:33        start broadcaster worker for repgroup id=2
    2023-07-26T06:00:36.978Z        DEBUG   broadcaster/worker.go:33        start broadcaster worker for repgroup id=1
    2023-07-26T06:00:36.987Z        DEBUG   broadcaster/worker.go:51        repgroup 1 connect established
    2023-07-26T06:00:36.989Z        DEBUG   broadcaster/worker.go:51        repgroup 2 connect established
    2023-07-26T06:00:36.992Z        DEBUG   broadcaster/worker.go:51        repgroup 3 connect established
    2023-07-26T06:00:36.992Z        DEBUG   rebalance/service.go:71 Performing cleanup after possible rebalance operation failure
    2023-07-26T06:00:37.077Z        DEBUG   broadcaster/worker.go:75        finish broadcaster worker for repgroup id=3
    2023-07-26T06:00:37.077Z        DEBUG   broadcaster/worker.go:75        finish broadcaster worker for repgroup id=1
    2023-07-26T06:00:37.077Z        DEBUG   broadcaster/worker.go:75        finish broadcaster worker for repgroup id=2
    2023-07-26T06:00:37.082Z        DEBUG   rebalance/service.go:422        Rebalance will run 1 tasks
    2023-07-26T06:00:37.095Z        DEBUG   rebalance/service.go:452        Guessing that rebalance() can use 3 workers
    2023-07-26T06:00:37.096Z        DEBUG   rebalance/job.go:352    state: Idle     {"worker_id": 1, "table": "users", "partition num": 1, "source rgid": 2, "dest rgid": 1, "kind": "move"}
    2023-07-26T06:00:37.111Z        DEBUG   rebalance/job.go:352    state: ConnsEstablished {"worker_id": 1, "table": "users", "partition num": 1, "source rgid": 2, "dest rgid": 1, "kind": "move"}
    2023-07-26T06:00:37.171Z        DEBUG   rebalance/job.go:352    state: WaitInitCopy     {"worker_id": 1, "table": "users", "partition num": 1, "source rgid": 2, "dest rgid": 1, "kind": "move"}
    2023-07-26T06:00:38.073Z        DEBUG   rebalance/job.go:347    current state   {"worker_id": 1, "table": "users", "partition num": 1, "source rgid": 2, "dest rgid": 1, "kind": "move", "state": "WaitInitialCatchup"}
    2023-07-26T06:00:38.073Z        DEBUG   rebalance/job.go:352    state: WaitInitialCatchup       {"worker_id": 1, "table": "users", "partition num": 1, "source rgid": 2, "dest rgid": 1, "kind": "move"}
    2023-07-26T06:00:38.084Z        DEBUG   rebalance/job.go:347    current state   {"worker_id": 1, "table": "users", "partition num": 1, "source rgid": 2, "dest rgid": 1, "kind": "move", "state": "WaitFullSync"}
    2023-07-26T06:00:38.084Z        DEBUG   rebalance/job.go:352    state: WaitFullSync     {"worker_id": 1, "table": "users", "partition num": 1, "source rgid": 2, "dest rgid": 1, "kind": "move"}
    2023-07-26T06:00:38.108Z        DEBUG   rebalance/job.go:347    current state   {"worker_id": 1, "table": "users", "partition num": 1, "source rgid": 2, "dest rgid": 1, "kind": "move", "state": "Committing"}
    2023-07-26T06:00:38.108Z        DEBUG   rebalance/job.go:352    state: Committing       {"worker_id": 1, "table": "users", "partition num": 1, "source rgid": 2, "dest rgid": 1, "kind": "move"}
    2023-07-26T06:00:38.254Z        DEBUG   rebalance/job.go:352    state: Complete {"worker_id": 1, "table": "users", "partition num": 1, "source rgid": 2, "dest rgid": 1, "kind": "move"}
    2023-07-26T06:00:38.258Z        DEBUG   rebalance/service.go:583        Produce and process tasks on destination replication groups...
    2023-07-26T06:00:38.258Z        DEBUG   rebalance/service.go:594        Produce and process tasks on source replication groups...
    2023-07-26T06:00:38.258Z        DEBUG   rebalance/service.go:606        wait all tasks finish
    2023-07-26T06:00:38.258Z        DEBUG   rebalance/service.go:531        Analyzing table public.users in rg 1    {"table": "public.users", "rgid": 1, "action": "analyze"}
    2023-07-26T06:00:38.573Z        DEBUG   rebalance/service.go:531        Analyzing table public.users in rg 2    {"table": "public.users", "rgid": 2, "action": "analyze"}
    2023-07-26T06:00:38.833Z        DEBUG   broadcaster/worker.go:75        finish broadcaster worker for repgroup id=1
    2023-07-26T06:00:38.833Z        DEBUG   broadcaster/worker.go:75        finish broadcaster worker for repgroup id=2
    2023-07-26T06:00:38.833Z        DEBUG   broadcaster/worker.go:75        finish broadcaster worker for repgroup id=3
    
                                

    In this example, partition number 1 of the public.users table will be moved to the clover-1-shrn1 shard.

    After manually moving a partition of a sharded table and for all tables collocated with it, automatic data rebalancing for these tables will be disabled.

To get the list of tables with disabled automatic rebalancing, call the shardmanctl tables sharded norebalance command. Example:

$ shardmanctl tables sharded norebalance

public.users

                

To enable automatic data rebalancing for a selected sharded table, call the shardmanctl tables sharded rebalance command, as shown in the example below:

$ shardmanctl tables sharded rebalance -t public.users

2023-07-26T07:07:00.657Z        DEBUG   cmd/common.go:105       Waiting for metadata lock...
2023-07-26T07:07:00.687Z        DEBUG   broadcaster/worker.go:33        start broadcaster worker for repgroup id=1
2023-07-26T07:07:00.687Z        DEBUG   broadcaster/worker.go:33        start broadcaster worker for repgroup id=2
2023-07-26T07:07:00.687Z        DEBUG   broadcaster/worker.go:33        start broadcaster worker for repgroup id=3
2023-07-26T07:07:00.697Z        DEBUG   broadcaster/worker.go:51        repgroup 1 connect established
2023-07-26T07:07:00.698Z        DEBUG   broadcaster/worker.go:51        repgroup 2 connect established
2023-07-26T07:07:00.698Z        DEBUG   broadcaster/worker.go:51        repgroup 3 connect established
2023-07-26T07:07:00.698Z        DEBUG   extension/lock.go:35    Waiting for extension lock...
2023-07-26T07:07:00.719Z        DEBUG   rebalance/service.go:381        Planned moving pnum 21 for table users from rg 1 to rg 2
2023-07-26T07:07:00.719Z        INFO    rebalance/service.go:244        Performing rebalance...
2023-07-26T07:07:00.720Z        DEBUG   broadcaster/worker.go:33        start broadcaster worker for repgroup id=1
2023-07-26T07:07:00.720Z        DEBUG   broadcaster/worker.go:33        start broadcaster worker for repgroup id=2
2023-07-26T07:07:00.720Z        DEBUG   broadcaster/worker.go:33        start broadcaster worker for repgroup id=3
2023-07-26T07:07:00.732Z        DEBUG   broadcaster/worker.go:51        repgroup 3 connect established
2023-07-26T07:07:00.732Z        DEBUG   broadcaster/worker.go:51        repgroup 1 connect established
2023-07-26T07:07:00.734Z        DEBUG   broadcaster/worker.go:51        repgroup 2 connect established
2023-07-26T07:07:00.734Z        DEBUG   rebalance/service.go:71 Performing cleanup after possible rebalance operation failure
2023-07-26T07:07:00.791Z        DEBUG   broadcaster/worker.go:75        finish broadcaster worker for repgroup id=1
2023-07-26T07:07:00.791Z        DEBUG   broadcaster/worker.go:75        finish broadcaster worker for repgroup id=2
2023-07-26T07:07:00.791Z        DEBUG   broadcaster/worker.go:75        finish broadcaster worker for repgroup id=3
2023-07-26T07:07:00.795Z        DEBUG   rebalance/service.go:422        Rebalance will run 1 tasks
2023-07-26T07:07:00.809Z        DEBUG   rebalance/service.go:452        Guessing that rebalance() can use 3 workers
2023-07-26T07:07:00.809Z        DEBUG   rebalance/job.go:352    state: Idle     {"worker_id": 1, "table": "users", "partition num": 21, "source rgid": 1, "dest rgid": 2, "kind": "move"}
2023-07-26T07:07:00.823Z        DEBUG   rebalance/job.go:352    state: ConnsEstablished {"worker_id": 1, "table": "users", "partition num": 21, "source rgid": 1, "dest rgid": 2, "kind": "move"}
2023-07-26T07:07:00.880Z        DEBUG   rebalance/job.go:352    state: WaitInitCopy     {"worker_id": 1, "table": "users", "partition num": 21, "source rgid": 1, "dest rgid": 2, "kind": "move"}
2023-07-26T07:07:01.886Z        DEBUG   rebalance/job.go:347    current state   {"worker_id": 1, "table": "users", "partition num": 21, "source rgid": 1, "dest rgid": 2, "kind": "move", "state": "WaitInitialCatchup"}
2023-07-26T07:07:01.886Z        DEBUG   rebalance/job.go:352    state: WaitInitialCatchup       {"worker_id": 1, "table": "users", "partition num": 21, "source rgid": 1, "dest rgid": 2, "kind": "move"}
2023-07-26T07:07:01.904Z        DEBUG   rebalance/job.go:347    current state   {"worker_id": 1, "table": "users", "partition num": 21, "source rgid": 1, "dest rgid": 2, "kind": "move", "state": "WaitFullSync"}
2023-07-26T07:07:01.905Z        DEBUG   rebalance/job.go:352    state: WaitFullSync     {"worker_id": 1, "table": "users", "partition num": 21, "source rgid": 1, "dest rgid": 2, "kind": "move"}
2023-07-26T07:07:01.932Z        DEBUG   rebalance/job.go:347    current state   {"worker_id": 1, "table": "users", "partition num": 21, "source rgid": 1, "dest rgid": 2, "kind": "move", "state": "Committing"}
2023-07-26T07:07:01.932Z        DEBUG   rebalance/job.go:352    state: Committing       {"worker_id": 1, "table": "users", "partition num": 21, "source rgid": 1, "dest rgid": 2, "kind": "move"}
2023-07-26T07:07:02.057Z        DEBUG   rebalance/job.go:352    state: Complete {"worker_id": 1, "table": "users", "partition num": 21, "source rgid": 1, "dest rgid": 2, "kind": "move"}
2023-07-26T07:07:02.060Z        DEBUG   rebalance/service.go:583        Produce and process tasks on destination replication groups...
2023-07-26T07:07:02.060Z        DEBUG   rebalance/service.go:594        Produce and process tasks on source replication groups...
2023-07-26T07:07:02.060Z        DEBUG   rebalance/service.go:531        Analyzing table public.users in rg 2    {"table": "public.users", "rgid": 2, "action": "analyze"}
2023-07-26T07:07:02.060Z        DEBUG   rebalance/service.go:606        wait all tasks finish
2023-07-26T07:07:02.321Z        DEBUG   rebalance/service.go:531        Analyzing table public.users in rg 1    {"table": "public.users", "rgid": 1, "action": "analyze"}
2023-07-26T07:07:02.587Z        DEBUG   broadcaster/worker.go:75        finish broadcaster worker for repgroup id=3
2023-07-26T07:07:02.587Z        DEBUG   broadcaster/worker.go:75        finish broadcaster worker for repgroup id=2
2023-07-26T07:07:02.587Z        DEBUG   broadcaster/worker.go:75        finish broadcaster worker for repgroup id=1

                

To enable automatic data rebalancing for all sharded tables, run the shardmanctl rebalance command with the --force option.

$ shardmanctl rebalance --force
                

pdf