Re: Duplicate deletion optimizations - Mailing list pgsql-performance
From | Pierre C |
---|---|
Subject | Re: Duplicate deletion optimizations |
Date | |
Msg-id | op.v7pz2lppeorkce@apollo13 Whole thread Raw |
In response to | Re: Duplicate deletion optimizations (Marc Eberhard <eberhardma@googlemail.com>) |
List | pgsql-performance |
> It's a fairly tricky problem. I have a number of sensors producing > energy data about every 5 minutes, but at random times between 1 and > 15 minutes. I can't change that as that's the way the hardware of the > sensors works. These feed into another unit, which accumulates them > and forwards them in batches over the Internet to my PostgreSQL > database server every few minutes (again at random times outside my > control and with random batch sizes). To make things worse, if the > Internet connection between the unit and the database server fails, it > will send the latest data first to provide a quick update to the > current values and then send the backlog of stored values. Thus, data > do not always arrive in correct time order. I'm stuck home with flu, so I'm happy to help ;) I'll build an example setup to make it clearer... -- A list of all sensors create table sensors( sensor_id integer primary key ); insert into sensors select generate_series(1,100); -- A table to contain raw sensor data create table log( sensor_id integer not null references sensors(sensor_id), time integer not null, value float not null ); -- Fill it up with test data insert into log select sensor_id, time, time from ( select distinct sensor_id, (n+random()*10)::INTEGER as time from generate_series(0,50000,5) n cross join sensors ) d; -- index it alter table log add primary key( time, sensor_id ); create index log_sensor_time on log( sensor_id, time ); select * from log where sensor_id=1 order by time; sensor_id | time | value -----------+-------+------- 1 | 12 | 12 1 | 14 | 14 1 | 21 | 21 1 | 29 | 29 1 | 30 | 30 (....) 1 | 49996 | 49996 1 | 50001 | 50001 -- create a table which will contain the time ticks -- which will be used as x-axis for interpolation -- (in this example, one tick every 10 time units) create table ticks( time integer primary key, check( time%10 = 0 ) ); insert into ticks select generate_series( 0, (select max(time) from log), 10 ); -- create interpolated values table create table interp( sensor_id integer not null references sensors( sensor_id ), time integer not null references ticks( time ), value float, distance integer not null ); -- fill interpolated values table -- (pretty slow) insert into interp select sensor_id, t.time, start_value + (end_value-start_value)*(t.time-start_time)/(end_time-start_time), greatest( t.time - start_time, end_time-t.time ) from (select sensor_id, lag(time) over (partition by sensor_id order by time) as start_time, time as end_time, lag(value) over (partition by sensor_id order by time) as start_value, value as end_value from log ) as l join ticks t on (t.time >= start_time and t.time < end_time); -- alternate query if you don't like the ticks table (same sesult) : insert into interp select sensor_id, time, start_value + (end_value-start_value)*(time-start_time)/(end_time-start_time), greatest( time - start_time, end_time-time ) from (select *, generate_series( ((start_time+9)/10)*10, ((end_time-1)/10)*10, 10 ) AS time from (select sensor_id, lag(time) over (partition by sensor_id order by time) as start_time, time as end_time, lag(value) over (partition by sensor_id order by time) as start_value, value as end_value from log ) as l ) l; alter table interp add primary key( time,sensor_id ); create index interp_sensor_time on interp( sensor_id, time ); For each interval in the log table that contains a time tick, this query generates the interpolated data at that tick. Note that the "distance" field represents the distance (in time) between the interpolated value and the farthest real data point that was used to calculate it. Therefore, it can be used as a measure of the quality of the interpolated point ; if the distance is greater than some threshold, the value might not be that precise. Now, suppose we receive a bunch of data. The data isn't ordered according to time. There are two possibilities : - the new data starts right where we left off (ie, just after the last time for each sensor in table log) - the new data starts later in time, and we want to process the results right away, expecting to receive, at some later point, older data to fill the holes The second one is hairier, lets' do that. Anyway, let's create a packet : -- A table to contain raw sensor data create temporary table packet( sensor_id integer not null, time integer not null, value float not null ); -- Fill it up with test data insert into packet select sensor_id, time, time from ( select distinct sensor_id, (n+random()*10)::INTEGER as time from generate_series(50200,50400) n cross join sensors ) d; Note that I deliberately inserted a hole : the log table contains times 0-50000 and the packet contains times 50200-50400. We'll need to decide if we want the hole to appear in the "interp" table or not. Let's say we don't want it to appear, we'll just interpolate over the hole. he "distance" column will be there so we don't forget this data is some sort of guess. If we receive data to fill that hole later, we can always use it. For each sensor in the packet, we need to grab some entries from table "log", at least the most recent one, to be able to do some interpolation with the first (oldest) value in the packet. To be more general, in case we receive old data that will plug a hole, we'll also grab the oldest log entry that is more recent than the most recent one in the packet for this sensor (hum... i have to re-read that...) Anyway, first let's create the missing ticks : INSERT INTO ticks SELECT generate_series( (SELECT max(time) FROM ticks)+10, (SELECT max(time) FROM packet), 10); And ... CREATE TEMPORARY TABLE new_interp( sensor_id INTEGER NOT NULL, time INTEGER NOT NULL, value FLOAT NOT NULL, distance INTEGER NOT NULL ); -- time range in the packet for each sensor WITH ranges AS ( SELECT sensor_id, min(time) AS packet_start_time, max(time) AS packet_end_time FROM packet GROUP BY sensor_id ), -- time ranges for records already in table log that will be needed to interpolate packet records log_boundaries AS ( SELECT sensor_id, COALESCE( (SELECT max(l.time) FROM log l WHERE l.sensor_id=r.sensor_id AND l.time < r.packet_start_time), r.packet_start_time ) AS packet_start_time, COALESCE( (SELECT min(l.time) FROM log l WHERE l.sensor_id=r.sensor_id AND l.time > r.packet_end_time), r.packet_end_time ) AS packet_end_time FROM ranges r ), -- merge existing and new data extended_packet AS ( SELECT log.* FROM log JOIN log_boundaries USING (sensor_id) WHERE log.time BETWEEN packet_start_time AND packet_end_time UNION ALL SELECT * FROM packet ), -- zip current and next records pre_interp AS ( SELECT sensor_id, lag(time) OVER (PARTITION BY sensor_id ORDER BY time) AS start_time, time AS end_time, lag(value) over (PARTITION BY sensor_id ORDER BY time) AS start_value, value AS end_value FROM extended_packet ), -- add tick info pre_interp2 AS ( SELECT *, generate_series( ((start_time+9)/10)*10, ((end_time-1)/10)*10, 10 ) AS time FROM pre_interp ) -- interpolate INSERT INTO new_interp SELECT sensor_id, time, start_value + (end_value-start_value)*(time-start_time)/(end_time-start_time) AS value, greatest( time - start_time, end_time-time ) AS distance FROM pre_interp2; Although this query is huge, it's very fast, since it doesn't hit the big tables with any seq scans (hence the max() and min() tricks to use the indexes instead). I love how postgres can blast that huge pile of SQL in, like, 50 ms... If there is some overlap between packet data and data already in the log, you might get some division by zero errors, in this case you'll need to apply a DISTINCT somewhere (or simply replace the UNION ALL with an UNION, which might be wiser anyway...) Anyway, that doesn't solve the "upsert" problem, so here we go : -- Update the existing rows UPDATE interp SET value = new_interp.value, distance = new_interp.distance FROM new_interp WHERE interp.sensor_id = new_interp.sensor_id AND interp.time = new_interp.time AND interp.distance > new_interp.distance; -- insert new rows INSERT INTO interp SELECT new_interp.* FROM new_interp LEFT JOIN interp USING (sensor_id,time) WHERE interp.sensor_id IS NULL; -- also insert data into log (don't forget this !) INSERT INTO log SELECT * FROM packet; Tada. select * from interp where sensor_id=1 and time > 49950 order by time; sensor_id | time | value | distance -----------+-------+-------+---------- 1 | 49960 | 49960 | 7 1 | 49970 | 49970 | 4 1 | 49980 | 49980 | 3 1 | 49990 | 49990 | 5 1 | 50000 | 50000 | 2 1 | 50010 | 50010 | 190 1 | 50020 | 50020 | 180 1 | 50030 | 50030 | 170 (...) 1 | 50180 | 50180 | 178 1 | 50190 | 50190 | 188 1 | 50200 | 50200 | 2 1 | 50210 | 50210 | 1 1 | 50220 | 50220 | 1 1 | 50230 | 50230 | 1 1 | 50240 | 50240 | 2 Note that the hole was interpolated over, but the "distance" column shows this data is a guess, not real. What happens if we receive some data later to plug the hole ? -- plug the previously left hole truncate packet; truncate new_interp; insert into packet select sensor_id, time, time from ( select distinct sensor_id, (n+random()*10)::INTEGER as time from generate_series(50050,50150) n cross join sensors ) d; (re-run huge query and upsert) select * from interp where sensor_id=1 and time > 49950 order by time; sensor_id | time | value | distance -----------+-------+-------+---------- 1 | 49960 | 49960 | 7 1 | 49970 | 49970 | 4 1 | 49980 | 49980 | 3 1 | 49990 | 49990 | 5 1 | 50000 | 50000 | 2 1 | 50010 | 50010 | 45 1 | 50020 | 50020 | 35 1 | 50030 | 50030 | 28 1 | 50040 | 50040 | 38 1 | 50050 | 50050 | 48 1 | 50060 | 50060 | 1 1 | 50070 | 50070 | 1 1 | 50080 | 50080 | 2 (...) 1 | 50130 | 50130 | 1 1 | 50140 | 50140 | 3 1 | 50150 | 50150 | 1 1 | 50160 | 50160 | 40 1 | 50170 | 50170 | 30 1 | 50180 | 50180 | 26 1 | 50190 | 50190 | 36 1 | 50200 | 50200 | 2 1 | 50210 | 50210 | 1 1 | 50220 | 50220 | 1 1 | 50230 | 50230 | 1 1 | 50240 | 50240 | 2 It has used the new data to rewrite new values over the entire hole, and those values should have better precision. Enjoy !
pgsql-performance by date: