Can psycopg2 copy_expert read from an io.StringIO() buffer? - Mailing list psycopg

From Jeff Ross
Subject Can psycopg2 copy_expert read from an io.StringIO() buffer?
Date
Msg-id 6dfed0bf-a52a-66c0-43a9-6258d7b9843b@openvistas.net
Whole thread Raw
Responses Re: Can psycopg2 copy_expert read from an io.StringIO() buffer?  (Daniele Varrazzo <daniele.varrazzo@gmail.com>)
List psycopg
Hi all,

Bit of explanation first...

I'm working on a script to "heal" a logically replicated database after 
a replication outage.  We get these outages periodically, especially to 
the several we have hosted in RDS.  Replication either stops outright or 
slows so much it might as well be stopped.  When that happens, the only 
fix I've found is to drop the subscription and then start it back up 
with (copy_data = False).

Once started again replication proceeds from that point on at full 
speed.  However, we now have a nice sized whole in the data that needs 
to be backfilled.

I've been doing this by joining the publisher and subscriber with 
postgres_fdw and comparing tables. Left joins between publisher and 
subscriber will show me the rows on the publisher that are not on the 
subscriber, and then flip the tables to find rows on the subscriber that 
are not on the publisher and need to be deleted.  To find rows that need 
updated are found with select * from publisher.table except select * 
from subscriber.table.

I had been doing this in a plpy function both reading and writing to the 
fdw tables.  That works but is excruciatingly slow--one table we have 
has 170,000,000 rows and takes about 3 hours to sync.  A 325G database 
takes about 6.5 hours total.

Enter psycopg2.  My plan now is to only query the fdw tables to identify 
the rows, then use psycopg2 connections to both databases to directly 
insert/delete/update (by deleting from and then inserting to) the 
subscriber table.

Working on inserts now, with this code (queries freshly ported over from 
the plpy function):

subscriber_connection.set_session(autocommit=True)
csv_buf = io.StringIO()
size = size = 3**20 #3GB

copy_query = """
    copy (
        select a.* from %s.%s a
        join %s.pkeys b on a.%s = b.%s
        order by a.%s
    ) to stdout with csv
""" % (schema,table_name,schema,pkey,pkey_column,pkey_column)

insert_query = """
    copy %s.%s from stdin with csv;
""" % (schema,table_name)

try:
    publisher_copy_cursor.copy_expert(copy_query,csv_buf,size)
         subscriber_copy_cursor.copy_expert(insert_query,csv_buf)
         subscriber_copy_cursor.close()
except Exception as e:
    debug_output("insert error! %s" % (e))

So, that most of that works pretty well.  csv_buf is filled with pkeys 
of the rows missing from the subscriber.  I've verified that works from 
the python shell.

On the subscriber I see the copy command hit the logs:

2022-11-03 14:27:15.357 
EDT,"postgres","dr_metroarchive",43810,"172.26.27.10:39614",636407fc.ab22,30,"COPY",2022-11-03 
14:27:08 EDT,4/0,0,LOG,00000,"duration: 53.782 ms  statement: copy 
metro.client_profile from stdin with csv;",,,,,,,,,""

But no rows ever actually get inserted.

The only thing I can think of so far is that copy_expert isn't reading 
csv_buf.  And now that I write that I wonder if it's because csv_buf is 
in memory on the publisher and not on the subscriber.

If that's the case maybe this all boils down to "how do I pipe stdout 
from the publisher to the subscriber across the subscriber cursor?"

That's a lot of email for one short question--apologies and thanks in 
advance for any clue by fours!

Jeff Ross




psycopg by date:

Previous
From: Daniele Varrazzo
Date:
Subject: Re: Error while trying to install in 3.11
Next
From: Daniele Varrazzo
Date:
Subject: Re: Can psycopg2 copy_expert read from an io.StringIO() buffer?