2.3. Rebalancing the Data #
2.3.1. Automatically Rebalancing the Data #
Automatic rebalancing is used as the default mode. A rebalance process starts automatically after adding nodes (by default if --no-rebalance
is not set) or before deleting a node. Rebalance can also be started manually. The essence of the rebalancing process is to evenly distribute partitions for each sharded table between replication groups.
The rebalancing process for each sharded table iteratively determines the replication group with the maximum and minimum number of partitions and creates a task to move one partition to the replication group with the minimum number of partitions. This process is repeated while max - min > 1
. To move partitions, we use logical replication. Partitions of colocated tables are moved together with partitions of the sharded tables to which they refer.
It is important to remember that max_logical_replication_workers
should be rather high since the rebalance process uses up to max(
concurrent threads. In practice, you can use max_replication_slots
, max_logical_replication_workers
, max_worker_processes
, max_wal_senders
)/3max_logical_replication_workers = Repfactor + 3 * task_num
(task_num
is the number of parallel rebalance tasks).
To rebalance sharded tables in the cluster0
cluster manually, run the command (where etcd1, etcd2, etcd3 are etcd cluster nodes):
$
shardmanctl --store-endpoints http://etcd1:2379,http://etcd2:2379,http://etcd3:2379 rebalance
If the process ends with an error, then you need to call the shardmanctl cleanup
command with the --after-rebalance
option.
2.3.2. Manually Rebalancing the Data #
There are times when you need to place partitions of sharded tables in a specific way across the cluster nodes. To solve this problem, Shardman supports the manual data rebalancing mode.
How it works:
Get a list of sharded tables using the
shardmanctl tables sharded list
command. As a result, we get an answer similar to the following:$
shardmanctl shardmanctl tables sharded list
Sharded tables: public.doc public.resolution public.users
Request information about the selected sharded tables. Example:
$
shardmanctl shardmanctl tables sharded info -t public.users
Table public.users Partitions: Partition RgID Shard Master 0 1 clover-1-shrn1 shrn1:5432 1 2 clover-2-shrn2 shrn2:5432 2 3 clover-3-shrn3 shrn3:5432 3 1 clover-1-shrn1 shrn1:5432 4 2 clover-2-shrn2 shrn2:5432 5 3 clover-3-shrn3 shrn3:5432 6 1 clover-1-shrn1 shrn1:5432 7 2 clover-2-shrn2 shrn2:5432 8 3 clover-3-shrn3 shrn3:5432 9 1 clover-1-shrn1 shrn1:5432 10 2 clover-2-shrn2 shrn2:5432 11 3 clover-3-shrn3 shrn3:5432 12 1 clover-1-shrn1 shrn1:5432 13 2 clover-2-shrn2 shrn2:5432 14 3 clover-3-shrn3 shrn3:5432 15 1 clover-1-shrn1 shrn1:5432 16 2 clover-2-shrn2 shrn2:5432 17 3 clover-3-shrn3 shrn3:5432 18 1 clover-1-shrn1 shrn1:5432 19 2 clover-2-shrn2 shrn2:5432 20 3 clover-3-shrn3 shrn3:5432 21 1 clover-1-shrn1 shrn1:5432 22 2 clover-2-shrn2 shrn2:5432 23 3 clover-3-shrn3 shrn3:5432
Move a partition to a new shard, as shown below:
$
shardmanctl --log-level debug tables sharded partmove -t public.users --partnum 1 --shard clover-1-shrn1
2023-07-26T06:00:36.900Z DEBUG cmd/common.go:105 Waiting for metadata lock... 2023-07-26T06:00:36.936Z DEBUG rebalance/service.go:256 take extension lock 2023-07-26T06:00:36.938Z DEBUG broadcaster/worker.go:33 start broadcaster worker for repgroup id=3 2023-07-26T06:00:36.938Z DEBUG broadcaster/worker.go:33 start broadcaster worker for repgroup id=2 2023-07-26T06:00:36.938Z DEBUG broadcaster/worker.go:33 start broadcaster worker for repgroup id=1 2023-07-26T06:00:36.951Z DEBUG broadcaster/worker.go:51 repgroup 3 connect established 2023-07-26T06:00:36.951Z DEBUG broadcaster/worker.go:51 repgroup 2 connect established 2023-07-26T06:00:36.952Z DEBUG broadcaster/worker.go:51 repgroup 1 connect established 2023-07-26T06:00:36.952Z DEBUG extension/lock.go:35 Waiting for extension lock... 2023-07-26T06:00:36.976Z INFO rebalance/service.go:276 Performing move partition... 2023-07-26T06:00:36.977Z DEBUG broadcaster/worker.go:33 start broadcaster worker for repgroup id=3 2023-07-26T06:00:36.978Z DEBUG broadcaster/worker.go:33 start broadcaster worker for repgroup id=2 2023-07-26T06:00:36.978Z DEBUG broadcaster/worker.go:33 start broadcaster worker for repgroup id=1 2023-07-26T06:00:36.987Z DEBUG broadcaster/worker.go:51 repgroup 1 connect established 2023-07-26T06:00:36.989Z DEBUG broadcaster/worker.go:51 repgroup 2 connect established 2023-07-26T06:00:36.992Z DEBUG broadcaster/worker.go:51 repgroup 3 connect established 2023-07-26T06:00:36.992Z DEBUG rebalance/service.go:71 Performing cleanup after possible rebalance operation failure 2023-07-26T06:00:37.077Z DEBUG broadcaster/worker.go:75 finish broadcaster worker for repgroup id=3 2023-07-26T06:00:37.077Z DEBUG broadcaster/worker.go:75 finish broadcaster worker for repgroup id=1 2023-07-26T06:00:37.077Z DEBUG broadcaster/worker.go:75 finish broadcaster worker for repgroup id=2 2023-07-26T06:00:37.082Z DEBUG rebalance/service.go:422 Rebalance will run 1 tasks 2023-07-26T06:00:37.095Z DEBUG rebalance/service.go:452 Guessing that rebalance() can use 3 workers 2023-07-26T06:00:37.096Z DEBUG rebalance/job.go:352 state: Idle {"worker_id": 1, "table": "users", "partition num": 1, "source rgid": 2, "dest rgid": 1, "kind": "move"} 2023-07-26T06:00:37.111Z DEBUG rebalance/job.go:352 state: ConnsEstablished {"worker_id": 1, "table": "users", "partition num": 1, "source rgid": 2, "dest rgid": 1, "kind": "move"} 2023-07-26T06:00:37.171Z DEBUG rebalance/job.go:352 state: WaitInitCopy {"worker_id": 1, "table": "users", "partition num": 1, "source rgid": 2, "dest rgid": 1, "kind": "move"} 2023-07-26T06:00:38.073Z DEBUG rebalance/job.go:347 current state {"worker_id": 1, "table": "users", "partition num": 1, "source rgid": 2, "dest rgid": 1, "kind": "move", "state": "WaitInitialCatchup"} 2023-07-26T06:00:38.073Z DEBUG rebalance/job.go:352 state: WaitInitialCatchup {"worker_id": 1, "table": "users", "partition num": 1, "source rgid": 2, "dest rgid": 1, "kind": "move"} 2023-07-26T06:00:38.084Z DEBUG rebalance/job.go:347 current state {"worker_id": 1, "table": "users", "partition num": 1, "source rgid": 2, "dest rgid": 1, "kind": "move", "state": "WaitFullSync"} 2023-07-26T06:00:38.084Z DEBUG rebalance/job.go:352 state: WaitFullSync {"worker_id": 1, "table": "users", "partition num": 1, "source rgid": 2, "dest rgid": 1, "kind": "move"} 2023-07-26T06:00:38.108Z DEBUG rebalance/job.go:347 current state {"worker_id": 1, "table": "users", "partition num": 1, "source rgid": 2, "dest rgid": 1, "kind": "move", "state": "Committing"} 2023-07-26T06:00:38.108Z DEBUG rebalance/job.go:352 state: Committing {"worker_id": 1, "table": "users", "partition num": 1, "source rgid": 2, "dest rgid": 1, "kind": "move"} 2023-07-26T06:00:38.254Z DEBUG rebalance/job.go:352 state: Complete {"worker_id": 1, "table": "users", "partition num": 1, "source rgid": 2, "dest rgid": 1, "kind": "move"} 2023-07-26T06:00:38.258Z DEBUG rebalance/service.go:583 Produce and process tasks on destination replication groups... 2023-07-26T06:00:38.258Z DEBUG rebalance/service.go:594 Produce and process tasks on source replication groups... 2023-07-26T06:00:38.258Z DEBUG rebalance/service.go:606 wait all tasks finish 2023-07-26T06:00:38.258Z DEBUG rebalance/service.go:531 Analyzing table public.users in rg 1 {"table": "public.users", "rgid": 1, "action": "analyze"} 2023-07-26T06:00:38.573Z DEBUG rebalance/service.go:531 Analyzing table public.users in rg 2 {"table": "public.users", "rgid": 2, "action": "analyze"} 2023-07-26T06:00:38.833Z DEBUG broadcaster/worker.go:75 finish broadcaster worker for repgroup id=1 2023-07-26T06:00:38.833Z DEBUG broadcaster/worker.go:75 finish broadcaster worker for repgroup id=2 2023-07-26T06:00:38.833Z DEBUG broadcaster/worker.go:75 finish broadcaster worker for repgroup id=3
In this example, partition number
1
of thepublic.users
table will be moved to theclover-1-shrn1
shard.After manually moving a partition of a sharded table and for all tables collocated with it, automatic data rebalancing for these tables will be disabled.
To get the list of tables with disabled automatic rebalancing, call the shardmanctl tables sharded norebalance
command. Example:
$
shardmanctl tables sharded norebalance
public.users
To enable automatic data rebalancing for a selected sharded table, call the shardmanctl tables sharded rebalance
command, as shown in the example below:
$
shardmanctl tables sharded rebalance -t public.users
2023-07-26T07:07:00.657Z DEBUG cmd/common.go:105 Waiting for metadata lock... 2023-07-26T07:07:00.687Z DEBUG broadcaster/worker.go:33 start broadcaster worker for repgroup id=1 2023-07-26T07:07:00.687Z DEBUG broadcaster/worker.go:33 start broadcaster worker for repgroup id=2 2023-07-26T07:07:00.687Z DEBUG broadcaster/worker.go:33 start broadcaster worker for repgroup id=3 2023-07-26T07:07:00.697Z DEBUG broadcaster/worker.go:51 repgroup 1 connect established 2023-07-26T07:07:00.698Z DEBUG broadcaster/worker.go:51 repgroup 2 connect established 2023-07-26T07:07:00.698Z DEBUG broadcaster/worker.go:51 repgroup 3 connect established 2023-07-26T07:07:00.698Z DEBUG extension/lock.go:35 Waiting for extension lock... 2023-07-26T07:07:00.719Z DEBUG rebalance/service.go:381 Planned moving pnum 21 for table users from rg 1 to rg 2 2023-07-26T07:07:00.719Z INFO rebalance/service.go:244 Performing rebalance... 2023-07-26T07:07:00.720Z DEBUG broadcaster/worker.go:33 start broadcaster worker for repgroup id=1 2023-07-26T07:07:00.720Z DEBUG broadcaster/worker.go:33 start broadcaster worker for repgroup id=2 2023-07-26T07:07:00.720Z DEBUG broadcaster/worker.go:33 start broadcaster worker for repgroup id=3 2023-07-26T07:07:00.732Z DEBUG broadcaster/worker.go:51 repgroup 3 connect established 2023-07-26T07:07:00.732Z DEBUG broadcaster/worker.go:51 repgroup 1 connect established 2023-07-26T07:07:00.734Z DEBUG broadcaster/worker.go:51 repgroup 2 connect established 2023-07-26T07:07:00.734Z DEBUG rebalance/service.go:71 Performing cleanup after possible rebalance operation failure 2023-07-26T07:07:00.791Z DEBUG broadcaster/worker.go:75 finish broadcaster worker for repgroup id=1 2023-07-26T07:07:00.791Z DEBUG broadcaster/worker.go:75 finish broadcaster worker for repgroup id=2 2023-07-26T07:07:00.791Z DEBUG broadcaster/worker.go:75 finish broadcaster worker for repgroup id=3 2023-07-26T07:07:00.795Z DEBUG rebalance/service.go:422 Rebalance will run 1 tasks 2023-07-26T07:07:00.809Z DEBUG rebalance/service.go:452 Guessing that rebalance() can use 3 workers 2023-07-26T07:07:00.809Z DEBUG rebalance/job.go:352 state: Idle {"worker_id": 1, "table": "users", "partition num": 21, "source rgid": 1, "dest rgid": 2, "kind": "move"} 2023-07-26T07:07:00.823Z DEBUG rebalance/job.go:352 state: ConnsEstablished {"worker_id": 1, "table": "users", "partition num": 21, "source rgid": 1, "dest rgid": 2, "kind": "move"} 2023-07-26T07:07:00.880Z DEBUG rebalance/job.go:352 state: WaitInitCopy {"worker_id": 1, "table": "users", "partition num": 21, "source rgid": 1, "dest rgid": 2, "kind": "move"} 2023-07-26T07:07:01.886Z DEBUG rebalance/job.go:347 current state {"worker_id": 1, "table": "users", "partition num": 21, "source rgid": 1, "dest rgid": 2, "kind": "move", "state": "WaitInitialCatchup"} 2023-07-26T07:07:01.886Z DEBUG rebalance/job.go:352 state: WaitInitialCatchup {"worker_id": 1, "table": "users", "partition num": 21, "source rgid": 1, "dest rgid": 2, "kind": "move"} 2023-07-26T07:07:01.904Z DEBUG rebalance/job.go:347 current state {"worker_id": 1, "table": "users", "partition num": 21, "source rgid": 1, "dest rgid": 2, "kind": "move", "state": "WaitFullSync"} 2023-07-26T07:07:01.905Z DEBUG rebalance/job.go:352 state: WaitFullSync {"worker_id": 1, "table": "users", "partition num": 21, "source rgid": 1, "dest rgid": 2, "kind": "move"} 2023-07-26T07:07:01.932Z DEBUG rebalance/job.go:347 current state {"worker_id": 1, "table": "users", "partition num": 21, "source rgid": 1, "dest rgid": 2, "kind": "move", "state": "Committing"} 2023-07-26T07:07:01.932Z DEBUG rebalance/job.go:352 state: Committing {"worker_id": 1, "table": "users", "partition num": 21, "source rgid": 1, "dest rgid": 2, "kind": "move"} 2023-07-26T07:07:02.057Z DEBUG rebalance/job.go:352 state: Complete {"worker_id": 1, "table": "users", "partition num": 21, "source rgid": 1, "dest rgid": 2, "kind": "move"} 2023-07-26T07:07:02.060Z DEBUG rebalance/service.go:583 Produce and process tasks on destination replication groups... 2023-07-26T07:07:02.060Z DEBUG rebalance/service.go:594 Produce and process tasks on source replication groups... 2023-07-26T07:07:02.060Z DEBUG rebalance/service.go:531 Analyzing table public.users in rg 2 {"table": "public.users", "rgid": 2, "action": "analyze"} 2023-07-26T07:07:02.060Z DEBUG rebalance/service.go:606 wait all tasks finish 2023-07-26T07:07:02.321Z DEBUG rebalance/service.go:531 Analyzing table public.users in rg 1 {"table": "public.users", "rgid": 1, "action": "analyze"} 2023-07-26T07:07:02.587Z DEBUG broadcaster/worker.go:75 finish broadcaster worker for repgroup id=3 2023-07-26T07:07:02.587Z DEBUG broadcaster/worker.go:75 finish broadcaster worker for repgroup id=2 2023-07-26T07:07:02.587Z DEBUG broadcaster/worker.go:75 finish broadcaster worker for repgroup id=1
To enable automatic data rebalancing for all sharded tables, run the shardmanctl rebalance
command with the --force
option.
$
shardmanctl rebalance --force