Thread: Bulk loading/merging
I've set up something similar the 'recommended' way to merge data into the DB, i.e. http://www.postgresql.org/docs/current/static/plpgsql-control-structures.html#PLPGSQL-ERROR-TRAPPING however I did it with a trigger on insert, i.e. (not my schema :) ): CREATE TABLE db (a INT PRIMARY KEY, b TEXT, c INTEGER, d INET); CREATE FUNCTION merge_db() RETURNS TRIGGER AS $$ BEGIN UPDATE db SET b = NEW.data WHERE a = NEW.key AND NOT (c IS DISTINCT FROM NEW.c) AND NOT (d IS DISTINCT FROM NEW.d); IF found THEN RETURN NULL; END IF; RETURN NEW; END; $$ LANGUAGE plpgsql; CREATE TRIGGER merge_db_tr BEFORE INSERT ON db FOR EACH ROW EXECUTE PROCEDURE merge_db(); Is this the best/fastest way to do this sort of thing? I only get about 50 records/second inserts, while without the trigger (inserting unmerged data) I can get more like 1000/second. I'm doing the whole NOT ... IS DISTINCT stuff to handle NULL values that might be in the columns ... I'm only considering two column keys equal if (a,c,d) are all the same (i.e. either the same value or both NULL). I read that there is a race condition with the above method as applied to a normal function ... does this apply to a trigger as well? Optimization Questions: -Can I do better with the trigger function itself? -I realize that I can create indexes on some of the lookup columns ('key' in the above example). This would speed up the location of the update record but slow down the actual update insert, right? Would this be a win? I tested an index on 10000 rows, and it beat out the non-indexed by about 7% (3:31 with index, 3:45 without) ... is this all the benefit that I can expect? -Will moving pg_xlog to a different disk help all that much, if the whole DB is currently on a 4 disk RAID10? What about moving the indexes? I've set up my postgresql.conf according to the docs and Josh Berkus' presentation, i.e. (16GB ram, quad Opteron moachine, not all settings are relevant): shared_buffers = 60000 temp_buffers = 10000 work_mem = 131072 maintenance_work_mem = 524288 effective_cache_size = 120000 random_page_cost = 2 wal_buffers = 128 checkpoint_segments = 128 checkpoint_timeout = 3000 max_fsm_pages = 2000000 max_fsm_relations = 1000000 -If I break up my dataset into smaller chunks and parallelize it, could I get better total performance, or would I most likely be thrashing the disk? -If I sort the data in the COPY file by key (i.e. a,c,d) before inserting it into the database, will this help out the DB at all? -Its cleaner to just be able to insert everything into the database and let the DB aggregate the records, however I could use some of our extra hardware to do aggregation in perl and then output the already aggregated records to the DB ... this has the advantage of being easily parallelizable but requires a bit of extra work to get right. Do you think that this is the best way to go? Also, as a slight aside, without a trigger, COPY seems to process each record very quickly (using Perl DBI, about 7000 records/second) however there is a long pause once the last record has been delivered. Is this just the backend queuing up the insert commands given by perl, or is there extra processing that needs to be done at the end of the COPY that could be taking a while (10s on 500K record COPY). Thanks!
Your best bet is to do this as a single, bulk operation if possible. That way you can simply do an UPDATE ... WHERE EXISTS followed by an INSERT ... SELECT ... WHERE NOT EXISTS. On Fri, May 26, 2006 at 02:48:20PM -0400, Worky Workerson wrote: > I've set up something similar the 'recommended' way to merge data into > the DB, i.e. > > http://www.postgresql.org/docs/current/static/plpgsql-control-structures.html#PLPGSQL-ERROR-TRAPPING > > however I did it with a trigger on insert, i.e. (not my schema :) ): -- Jim C. Nasby, Sr. Engineering Consultant jnasby@pervasive.com Pervasive Software http://pervasive.com work: 512-231-6117 vcard: http://jim.nasby.net/pervasive.vcf cell: 512-569-9461
Your best bet is to do this as a single, bulk operation if possible.
That way you can simply do an UPDATE ... WHERE EXISTS followed by an
INSERT ... SELECT ... WHERE NOT EXISTS.
On 06/02/2006, Michael Artz wrote:
hmm, I don't quite understand what you are saying and I think my basic misunderstanding is how to use the UPDATE ... WHERE EXISTS to merge data in bulk. Assuming that I bulk COPYed the data into a temporary table, I'd need to issue an UPDATE for each row in the newly created table, right?
For example, for a slightly different key,count schema:
CREATE TABLE kc (key integer, count integer);
and wanting to merge the following data by just updating the count for a given key to the equivalent of OLD.count + NEW.count:
1,10
2,15
3,45
1,30
How would I go about using UPDATE ... WHERE EXISTS to update the "master" kc table from a (temporary) table loaded with the above data?
May be, this method could help you:
CREATE TEMP TABLE clip_temp (
cids int8 NOT NULL,
clip_id int8 NOT NULL,
mentions int4 DEFAULT 0,
CONSTRAINT pk_clip_temp PRIMARY KEY (cids, clip_id))
)
insert data into this temporary table...
then do:
UPDATE clip_category SET mentions=clip_temp.mentions
FROM clip_temp
WHERE clip_category.cids=clip_temp.cids
AND clip_category.clip_id=clip_temp.clip_id
DELETE FROM clip_temp USING clip_category
WHERE clip_temp.cids=clip_category.cids
AND clip_temp.clip_id=clip_category.clip_id
INSERT INTO clip_category (cids, clip_id, mentions)
SELECT * FROM clip_temp
DROP TABLE clip_temp;
Best regards,
ahmad fajar,
On Thu, Jun 01, 2006 at 02:04:46PM -0400, Michael Artz wrote: > On 5/30/06, Jim C. Nasby <jnasby@pervasive.com> wrote: > > >Your best bet is to do this as a single, bulk operation if possible. > >That way you can simply do an UPDATE ... WHERE EXISTS followed by an > >INSERT ... SELECT ... WHERE NOT EXISTS. > > > > hmm, I don't quite understand what you are saying and I think my > basic misunderstanding is how to use the UPDATE ... WHERE EXISTS to merge > data in bulk. Assuming that I bulk COPYed the data into a temporary > table, I'd need to issue an UPDATE for each row in the newly created table, > right? > > For example, for a slightly different key,count schema: > > CREATE TABLE kc (key integer, count integer); > > and wanting to merge the following data by just updating the count for a > given key to the equivalent of OLD.count + NEW.count: > > 1,10 > 2,15 > 3,45 > 1,30 > > How would I go about using UPDATE ... WHERE EXISTS to update the "master" kc > table from a (temporary) table loaded with the above data? CREATE TEMP TABLE moo () LIKE kc; COPY ... moo; BEGIN; UPDATE kc SET count=kc.count + moo.count FROM moo WHERE moo.key = kc.key ; INSERT INTO kc(key, count) SELECT key, count FROM moo WHERE NOT EXISTS ( SELECT 1 FROM kc WHERE kc.key = moo.key ) ; COMMIT; -- Jim C. Nasby, Sr. Engineering Consultant jnasby@pervasive.com Pervasive Software http://pervasive.com work: 512-231-6117 vcard: http://jim.nasby.net/pervasive.vcf cell: 512-569-9461
Here are two ways to phrase a query... the planner choses very different plans as you will see. Everything is freshly ANALYZEd. EXPLAIN ANALYZE SELECT r.* FROM raw_annonces r LEFT JOIN annonces a ON a.id=r.id LEFT JOIN archive_data d ON d.id=r.id WHERE a.id IS NULL AND d.id IS NULL AND r.id >1130306 order by id limit 1; QUERY PLAN -------------------------------------------------------------------------------------------------------------------------------------------------------------- Limit (cost=0.00..2.54 rows=1 width=627) (actual time=708.167..708.168 rows=1 loops=1) -> Merge Left Join (cost=0.00..128497.77 rows=50539 width=627) (actual time=708.165..708.165 rows=1 loops=1) Merge Cond: ("outer".id = "inner".id) Filter: ("inner".id IS NULL) -> Merge Left Join (cost=0.00..27918.92 rows=50539 width=627) (actual time=144.519..144.519 rows=1 loops=1) Merge Cond: ("outer".id = "inner".id) Filter: ("inner".id IS NULL) -> Index Scan using raw_annonces_pkey on raw_annonces r (cost=0.00..11222.32 rows=50539 width=627) (actual time=0.040..0.040 rows=1 loops=1) Index Cond: (id > 1130306) -> Index Scan using annonces_pkey on annonces a (cost=0.00..16118.96 rows=65376 width=4) (actual time=0.045..133.272 rows=65376 loops=1) -> Index Scan using archive_data_pkey on archive_data d (cost=0.00..98761.01 rows=474438 width=4) (actual time=0.060..459.995 rows=474438 loops=1) Total runtime: 708.316 ms EXPLAIN ANALYZE SELECT * FROM raw_annonces r WHERE r.id>1130306 AND NOT EXISTS( SELECT id FROM annonces WHERE id=r.id ) AND NOT EXISTS( SELECT id FROM archive_data WHERE id=r.id ) ORDER BY id LIMIT 1; QUERY PLAN --------------------------------------------------------------------------------------------------------------------------------------------------- Limit (cost=0.00..38.12 rows=1 width=627) (actual time=0.040..0.041 rows=1 loops=1) -> Index Scan using raw_annonces_pkey on raw_annonces r (cost=0.00..481652.07 rows=12635 width=627) (actual time=0.039..0.039 rows=1 loops=1) Index Cond: (id > 1130306) Filter: ((NOT (subplan)) AND (NOT (subplan))) SubPlan -> Index Scan using archive_data_pkey on archive_data (cost=0.00..3.66 rows=1 width=4) (actual time=0.007..0.007 rows=0 loops=1) Index Cond: (id = $0) -> Index Scan using annonces_pkey on annonces (cost=0.00..5.65 rows=1 width=4) (actual time=0.006..0.006 rows=0 loops=1) Index Cond: (id = $0) Total runtime: 0.121 ms Ideas ?
PFC <lists@peufeu.com> writes: > Here are two ways to phrase a query... the planner choses very different > plans as you will see. Everything is freshly ANALYZEd. Usually we get complaints the other way around (that the NOT EXISTS approach is a lot slower). You did not show any statistics, but I suspect the key point here is that the condition id > 1130306 excludes most or all of the A and D tables. The planner is not smart about making transitive inequality deductions, but you could help it along by adding the implied clauses yourself: EXPLAIN ANALYZE SELECT r.* FROM raw_annonces r LEFT JOIN annonces a ON (a.id=r.id AND a.id > 1130306) LEFT JOIN archive_data d ON (d.id=r.id AND d.id > 1130306) WHERE a.id IS NULL AND d.id IS NULL AND r.id > 1130306 order by id limit 1; Whether this is worth doing in your app depends on how often you do searches at the end of the ID range ... regards, tom lane
> Usually we get complaints the other way around (that the NOT EXISTS > approach is a lot slower). Yes, I know ;) (I rephrased the query this way to exploit the fact that the planner would choose a nested loop) > You did not show any statistics, but I > suspect the key point here is that the condition id > 1130306 excludes > most or all of the A and D tables. Right. Actually : - Table r (raw_annonces) contains raw data waiting to be processed - Table a (annonces) contains processed data ready for display on the website (active data) - Table d (archive) contains old archived data which can be displayed on request but is normally excluded from the searches, which normally only hit recent records. This is to get speedy searches. So, records are added into the "raw" table, these have a SERIAL primary key. Then a script processes them and inserts the results into the active table. 15 days of "raw" records are kept, then they are deleted. Periodically old records from "annonces" are moved to the archive. The promary key stays the same in the 3 tables. The script knows at which id it stopped last time it was run, hence the (id > x) condition. Normally this excludes the entire "annonces" table, because we process only new records. > The planner is not smart about > making transitive inequality deductions, but you could help it along > by adding the implied clauses yourself: > EXPLAIN ANALYZE SELECT r.* FROM raw_annonces r LEFT JOIN annonces a ON (a.id=r.id AND a.id > 1180726) LEFT JOIN archive_data d ON (d.id=r.id AND d.id > 1180726) WHERE a.id IS NULL AND d.id IS NULL AND r.id > 1180726 order by id limit 1; > > Whether this is worth doing in your app depends on how often you do > searches at the end of the ID range ... Quite often actually, so I did the mod. The interesting part is that, yesterday after ANALYZE the query plan was horrible, and today, after adding new data I ANALYZED and retried the slow query, and it was fast again : EXPLAIN ANALYZE SELECT r.* FROM raw_annonces r LEFT JOIN annonces a ON (a.id=r.id) LEFT JOIN archive_data d ON (d.id=r.id) WHERE a.id IS NULL AND d.id IS NULL AND r.id > 1180726 order by id limit 1; QUERY PLAN ---------------------------------------------------------------------------------------------------------------------------------------------------------- Limit (cost=0.00..10.42 rows=1 width=631) (actual time=0.076..0.076 rows=1 loops=1) -> Nested Loop Left Join (cost=0.00..7129.11 rows=684 width=631) (actual time=0.074..0.074 rows=1 loops=1) Filter: ("inner".id IS NULL) -> Nested Loop Left Join (cost=0.00..4608.71 rows=684 width=631) (actual time=0.064..0.064 rows=1 loops=1) Filter: ("inner".id IS NULL) -> Index Scan using raw_annonces_pkey on raw_annonces r (cost=0.00..667.56 rows=684 width=631) (actual time=0.013..0.013 rows=1 loops=1) Index Cond: (id > 1180726) -> Index Scan using annonces_pkey on annonces a (cost=0.00..5.75 rows=1 width=4) (actual time=0.046..0.046 rows=0 loops=1) Index Cond: (a.id = "outer".id) -> Index Scan using archive_data_pkey on archive_data d (cost=0.00..3.67 rows=1 width=4) (actual time=0.008..0.008 rows=0 loops=1) Index Cond: (d.id = "outer".id) Total runtime: 0.197 ms So I did a few tests... CREATE TABLE test.raw (id INTEGER PRIMARY KEY); CREATE TABLE test.active (id INTEGER PRIMARY KEY); CREATE TABLE test.archive (id INTEGER PRIMARY KEY); INSERT INTO test.archive SELECT * FROM generate_series( 1, 1000000 ); INSERT INTO test.active SELECT * FROM generate_series( 1000001, 1100000 ); INSERT INTO test.raw SELECT * FROM generate_series( 1050000, 1101000 ); VACUUM ANALYZE; So we have 1M archived records, 100K active, 51K in the "raw" table of which 1000 are new. Query 1: EXPLAIN ANALYZE SELECT * FROM test.raw AS raw LEFT JOIN test.active AS active ON (active.id=raw.id) LEFT JOIN test.archive AS archive ON (archive.id=raw.id) WHERE raw.id>1100000 AND active.id IS NULL AND archive.id IS NULL LIMIT 1; QUERY PLAN ---------------------------------------------------------------------------------------------------------------------------------------------------- Limit (cost=0.00..5.29 rows=1 width=12) (actual time=94.478..94.478 rows=1 loops=1) -> Nested Loop Left Join (cost=0.00..5400.09 rows=1021 width=12) (actual time=94.477..94.477 rows=1 loops=1) Filter: ("inner".id IS NULL) -> Merge Left Join (cost=0.00..2310.55 rows=1021 width=8) (actual time=94.458..94.458 rows=1 loops=1) Merge Cond: ("outer".id = "inner".id) Filter: ("inner".id IS NULL) -> Index Scan using raw_pkey on raw (cost=0.00..24.78 rows=1021 width=4) (actual time=0.016..0.016 rows=1 loops=1) Index Cond: (id > 1100000) -> Index Scan using active_pkey on active (cost=0.00..2023.00 rows=100000 width=4) (actual time=0.005..76.572 rows=100000 loops=1) -> Index Scan using archive_pkey on archive (cost=0.00..3.01 rows=1 width=4) (actual time=0.013..0.013 rows=0 loops=1) Index Cond: (archive.id = "outer".id) Total runtime: 94.550 ms Query 2: EXPLAIN ANALYZE SELECT * FROM test.raw AS raw LEFT JOIN test.active AS active ON (active.id=raw.id AND active.id>1100000) LEFT JOIN test.archive AS archive ON (archive.id=raw.id AND archive.id > 1100000) WHERE raw.id>1100000 AND active.id IS NULL AND archive.id IS NULL LIMIT 1; QUERY PLAN --------------------------------------------------------------------------------------------------------------------------------------- Limit (cost=0.00..0.04 rows=1 width=12) (actual time=0.035..0.035 rows=1 loops=1) -> Merge Left Join (cost=0.00..37.67 rows=1021 width=12) (actual time=0.034..0.034 rows=1 loops=1) Merge Cond: ("outer".id = "inner".id) Filter: ("inner".id IS NULL) -> Merge Left Join (cost=0.00..30.51 rows=1021 width=8) (actual time=0.026..0.026 rows=1 loops=1) Merge Cond: ("outer".id = "inner".id) Filter: ("inner".id IS NULL) -> Index Scan using raw_pkey on raw (cost=0.00..24.78 rows=1021 width=4) (actual time=0.016..0.016 rows=1 loops=1) Index Cond: (id > 1100000) -> Index Scan using active_pkey on active (cost=0.00..3.14 rows=10 width=4) (actual time=0.006..0.006 rows=0 loops=1) Index Cond: (id > 1100000) -> Index Scan using archive_pkey on archive (cost=0.00..4.35 rows=100 width=4) (actual time=0.007..0.007 rows=0 loops=1) Index Cond: (id > 1100000) Total runtime: 0.101 ms OK, you were right ;) Query 3: EXPLAIN ANALYZE SELECT * FROM test.raw AS raw WHERE raw.id > 1100000 AND NOT EXISTS (SELECT 1 FROM test.active AS a WHERE a.id=raw.id) AND NOT EXISTS (SELECT 1 FROM test.archive AS a WHERE a.id=raw.id) LIMIT 1; QUERY PLAN -------------------------------------------------------------------------------------------------------------------------------------- Limit (cost=0.00..24.23 rows=1 width=4) (actual time=0.036..0.036 rows=1 loops=1) -> Index Scan using raw_pkey on raw (cost=0.00..6178.35 rows=255 width=4) (actual time=0.035..0.035 rows=1 loops=1) Index Cond: (id > 1100000) Filter: ((NOT (subplan)) AND (NOT (subplan))) SubPlan -> Index Scan using archive_pkey on archive a (cost=0.00..3.01 rows=1 width=0) (actual time=0.007..0.007 rows=0 loops=1) Index Cond: (id = $0) -> Index Scan using active_pkey on active a (cost=0.00..3.01 rows=1 width=0) (actual time=0.008..0.008 rows=0 loops=1) Index Cond: (id = $0) Total runtime: 0.086 ms I see a problem with Query 1: The Merge Join goes through tables "raw" and "active" in sorted order. "archive" contains values 1-1000000 "active" contains values 1000001-1100000 "raw" contains values 1050000-1101000 However it starts at the beginning of "active" ; it would be smarter to start the index scan of "active" at the lowest value in "raw", ie. to seek into the right position into the index before beginning to scan it. This is achieved by your advice on manually adding the "id > x" conditions in the query. However, if I want to join the full tables, dropping the id>x condition : EXPLAIN ANALYZE SELECT * FROM test.raw AS raw LEFT JOIN test.active AS active ON (active.id=raw.id) LEFT JOIN test.archive AS archive ON (archive.id=raw.id) WHERE active.id IS NULL AND archive.id IS NULL; QUERY PLAN ---------------------------------------------------------------------------------------------------------------------------------------------- Merge Left Join (cost=0.00..27305.04 rows=51001 width=12) (actual time=837.196..838.099 rows=1000 loops=1) Merge Cond: ("outer".id = "inner".id) Filter: ("inner".id IS NULL) -> Merge Left Join (cost=0.00..3943.52 rows=51001 width=8) (actual time=153.495..154.190 rows=1000 loops=1) Merge Cond: ("outer".id = "inner".id) Filter: ("inner".id IS NULL) -> Index Scan using raw_pkey on raw (cost=0.00..1033.01 rows=51001 width=4) (actual time=0.012..23.085 rows=51001 loops=1) -> Index Scan using active_pkey on active (cost=0.00..2023.00 rows=100000 width=4) (actual time=0.004..47.333 rows=100000 loops=1) -> Index Scan using archive_pkey on archive (cost=0.00..20224.00 rows=1000000 width=4) (actual time=0.043..501.953 rows=1000000 loops=1) Total runtime: 838.272 ms This is very slow : the Index Scans on "active" and "archive" have to skip a huge number of rows before getting to the first interesting row. We know that rows in "active" and "archive" will be of no use if their id is < (SELECT min(id) FROM test.raw) which is 1050000. Let's rephrase : EXPLAIN ANALYZE SELECT * FROM test.raw AS raw LEFT JOIN test.active AS active ON (active.id=raw.id AND active.id >= 1050000) LEFT JOIN test.archive AS archive ON (archive.id=raw.id AND archive.id >= 1050000) WHERE active.id IS NULL AND archive.id IS NULL; QUERY PLAN -------------------------------------------------------------------------------------------------------------------------------------------- Merge Left Join (cost=0.00..2837.93 rows=51001 width=12) (actual time=114.590..115.451 rows=1000 loops=1) Merge Cond: ("outer".id = "inner".id) Filter: ("inner".id IS NULL) -> Merge Left Join (cost=0.00..2705.78 rows=51001 width=8) (actual time=114.576..115.239 rows=1000 loops=1) Merge Cond: ("outer".id = "inner".id) Filter: ("inner".id IS NULL) -> Index Scan using raw_pkey on raw (cost=0.00..1033.01 rows=51001 width=4) (actual time=0.012..51.505 rows=51001 loops=1) -> Index Scan using active_pkey on active (cost=0.00..1158.32 rows=50913 width=4) (actual time=0.009..22.312 rows=50001 loops=1) Index Cond: (id >= 1050000) -> Index Scan using archive_pkey on archive (cost=0.00..4.35 rows=100 width=4) (actual time=0.012..0.012 rows=0 loops=1) Index Cond: (id >= 1050000) Total runtime: 115.601 ms So here's my point : the first operation in the Index Scan in a merge join could be to seek to the right position in the index before scanning it. This value is known : it is the first value yielded by the index scan on "raw". This would remove the need for teaching the planner about transitivity, and also optimize this case where transitivity is useless.