Using Skytools PGQ for targeted copying of data in cluster - Mailing list pgsql-announce

From Sergei Sheinin
Subject Using Skytools PGQ for targeted copying of data in cluster
Date
Msg-id CA+dTRwnbJ5fNdYSpGab=O=9yd6tZeFjPUauM6=DtczOLxqMzXA@mail.gmail.com
Whole thread Raw
List pgsql-announce
In a clustered database where some data on one node is shared with some other nodes there is a need to provide targeted replication on row level. It is not possible with Slon replication for replication copies all data. Using PGQ in a star schema where all nodes are interconnected would require <number of nodes>^2 processes to run (high number for even a 256-node cluster).

Solution: Push-Pull queue schema using pgq.RemoteConsumer to send copy messages of object data. 

There are 2 ticker processes (push_queue and pull_queue) and two consumer processes per node making total number of processes <number of nodes>X2+2. 

On push queue side sending node inserts one event per target node:
pgq.insert_event('push_queue','copy', '', <node number>,'action=ins&id=1&name=test', '' ,'' );

Push queue consumer calls function to relay same event into pull queue
pgq.insert_event('pull_queue','copy', '', <node number>,'action=ins&id=1&name=test', '' ,'' )


Pull queue consumer before issuing "ev.done" checks that the node it is running on is the event's intended recipient. It ensures against data loss should a node become unresponsive as failed event is kept in queue until it reconnects.

## pull.py
import sys, os, pgq, skytools
import psycopg2

class Copier(pgq.RemoteConsumer):
    def __init__(self, args):
        pgq.RemoteConsumer.__init__(self, "pull_ticker", "src_db", "dst_db", args)
### GET "node" SETTING FROM .ini
### MUST ADD "node = <node number>" TO .ini
        self.node = self.cf.get("node")

    def process_remote_batch(self, db, batch_id, event_list, dst_db):
        for ev in event_list:
            if ev.type <> 'copy':
                ev.tag_done()
### CHECK TARGET IS CURRENT NODE
            if self.node == ev.ev_extra1:
### COMMIT EVENT IF IT IS PROCESSING ON TARGET SERVER
                ev.tag_done()
                cur = dst_db.cursor()
                cur.execute("select __data_copier('%s', '%s')" % (ev.ev_extra1, ev.ev_extra2))
                records = cur.fetchall()
                dst_db.commit()

if __name__ == '__main__':
    script = Copier(sys.argv[1:])
    script.start()

pgsql-announce by date:

Previous
From: David Fetter
Date:
Subject: == PostgreSQL Weekly News - November 18 2012 ==
Next
From: David Fetter
Date:
Subject: == PostgreSQL Weekly News - November 25 2012 ==