Thread: Network Flow Schema + Bulk Import/Updates

Network Flow Schema + Bulk Import/Updates

From
"Michael L. Artz"
Date:
I'm fairly new at Postgres and had some basic design questions.  My
problem is basically that I want to do large bulk imports (millions of
rows) into a large DB (billions of rows) where there might already be
data that needs to be updated instead of inserting a new row.  I read a
similar post a few days ago, but I just had a couple more questions.

My DB decomposes into storing something similar to Cisco Netflow
records, i.e. source IP, source Port, dest IP, dest Port, and a bit more
information about network flow data, as well as 2 timestamps, a count
field, and a foreign key into a small table (<100 rows) (schema below).
I will be nightly importing the data (which could number in the millions
of rows) and, since the DB is not quite production, I can do almost
whatever I want, as long as the DB is in a query-able state by morning.
The nightly imports are the only modifications to the table; otherwise
the table is read-only and accessed via a web application.  Needless to
say, the table will get huge (I'm estimating billions of rows within a
month), although it should level off as I get more and more duplicates.

CREATE TABLE flow (
    source_ip   INET,
    --- SMALLINT not normally big enough (signed vs. unsigned),
    --- but convert in application space
    source_port   SMALLINT,
    dest_ip   INET,
    dest_port   SMALLINT,

    comment   VARCHAR(128),
    count   INTEGER,
    first_seen   DATE,
    last_seen   DATE,
    fk   INTEGER NOT NULL REFERENCES small_table(small_id)
);
CREATE INDEX flow_source_ip_idx ON flow (source_ip);
CREATE INDEX flow_dest_ip_idx ON flow (dest_ip);

When I import the data (thousands to millions of rows), I want to check
and see if there is a row that already exists with the same source_ip,
dest_ip, and comment and, if so, I want to update the row to increment
and update the first_seen and last_seen dates if need be.  Otherwise,
just insert a new row with a count of 1.

Basic question:  What's the best way to go about this?

From what I have read, it seemed like the consensus was to import (COPY)
the new data into a temporary table and then work on the inserts and
updates from there.  I also read some suggestions involving triggers ...
which way would be best given my dataset?  I've thought about doing it
in application space (Perl) by querying out all the rows that need to be
updated, deleting said rows, dropping the indexes, and then doing a bulk
COPY of any new rows plus the modified old rows ... does this sound like
a good/bad idea?

Some other basic questions:

-Should I drop and recreate the indexes anytime, given that the updates
and selects will use them extensively to find matching rows in the
existing flow table?  Perhaps create a new index on (source_ip, dest_ip)?

-What do you think of the schema for storing network flow data
considering that I'm only storing single IPs and I do need the comment
field?  Is the INET data type the best thing to use here (12 bytes, I
think), or should I use my application to convert my IPs to "INTEGER"
equivalents (4 bytes)?  Perhaps create a new Postgres data type?
Although I have another table which actually uses INET networks, and I'd
like to be able to join the two, so a new data type might be more work
than I initially though (although I'm up for it) :)

-How many rows at a time can I practically insert with the COPY
command?  I've read about people doing millions, but is that realistic,
or should I break my load into X-record chunks/transactions?

Any suggestions/comments/flames would be appreciated.

Thanks
-Mike

Re: Network Flow Schema + Bulk Import/Updates

From
Tony Wasson
Date:
On 9/20/05, Michael L. Artz <dragon@october29.net> wrote:
> I'm fairly new at Postgres and had some basic design questions.  My
> problem is basically that I want to do large bulk imports (millions of
> rows) into a large DB (billions of rows) where there might already be
> data that needs to be updated instead of inserting a new row.  I read a
> similar post a few days ago, but I just had a couple more questions.

You can use the merge trigger below to do this. You'll need to add
some code to update the count. You may also benefit from using the new
constraint exclusion (table partitioning) in PostgreSQL 8.1. I am not
sure if CE works against the inet datatype -- if not, try converting
the IP to an integer.

This merge function is from an earlier version by Mike Rylander I got
from here:
http://archives.postgresql.org/pgsql-sql/2004-05/msg00135.php

His version worked fine, all mistakes are my own. I wanted to allow
multiple key columns.

--
-- Merge on INSERT functionallity for Postgres 8.0+
--
-- Original Author: miker ( at ) purplefrog ( dot ) com / 5-14-04
-- ajwasson (at) gmail (dot) cot -- Added support for multiple key
columns 8-20-05
--
-- CAVEAT EMPTOR: Uses table locks to avoid concurrency issues,
--           so it WILL slow down heavily loaded tables.
--           This effecivly puts the table into
--           TRANSACTION ISOLATION LEVEL SERIALIZABLE mode.
--
-- NOTE: You don't want to use key columns that are NULLable.

CREATE OR REPLACE FUNCTION add_merge_on_insert2(
  tablename TEXT,  -- table name
  keycols TEXT[],  -- key columns
  updatecols TEXT[] -- columns to update is key columns match
) RETURNS TEXT
AS $BODY$

DECLARE
  trig  TEXT;
  upclause  TEXT := '';
  keyclause TEXT := '';
  out_msg TEXT;
BEGIN
  -- setup the where clause with all key columns (probably primary keys)
  FOR a IN 1 .. array_upper(keycols,1) LOOP
    keyclause := keyclause || quote_ident(keycols[a]) || ' = NEW.'
      || quote_ident(keycols[a]) || ' AND ';
  END LOOP;
  --trim the last AND
  keyclause := trim(trailing ' AND ' FROM keyclause);

  -- setup the columns to UPDATE
  FOR i IN 1 .. array_upper(updatecols,1) LOOP
    upclause := upclause || quote_ident(updatecols[i])
      || ' = COALESCE(NEW.' || quote_ident(updatecols[i])
      || ', orig.' || quote_ident(updatecols[i]) || '), ';
  END LOOP;
  --trim the last comma and space
  upclause := trim(trailing ', ' FROM upclause);

  ----- put together the function now
EXECUTE 'CREATE FUNCTION "' || tablename || '_merge_on_insert_f" ()
RETURNS TRIGGER AS $$
  DECLARE
    orig ' || quote_ident(tablename) || '%ROWTYPE;
  BEGIN
    -- NOTE: This function was dynamically built by add_merge_on_insert2
    LOCK TABLE ' || quote_ident(tablename) || ' IN ROW EXCLUSIVE MODE;

    SELECT INTO orig * FROM  ' || quote_ident(tablename) || ' WHERE '
|| keyclause || ';

    IF NOT FOUND THEN
      RETURN NEW;
    END IF;

    UPDATE ' || quote_ident(tablename) || ' SET ' || upclause || '
WHERE ' || keyclause || ';

    RETURN NULL;
  END;
  $$ LANGUAGE plpgsql
 '; -- end of execute

EXECUTE 'CREATE TRIGGER "' || tablename || '_merge_on_insert_t" BEFORE INSERT
    ON ' || quote_ident(tablename) || ' FOR EACH ROW
    EXECUTE PROCEDURE "' || tablename || '_merge_on_insert_f" ();
  '; -- end of execute

  out_msg := 'FUNCTION ' || tablename || '_merge_on_insert_f ();
TRIGGER ' || tablename || '_merge_on_insert_t';
    RETURN out_msg;
END;
$BODY$ LANGUAGE 'plpgsql';

Re: Network Flow Schema + Bulk Import/Updates

From
"Michael L. Artz"
Date:
Tony Wasson wrote:

>You can use the merge trigger below to do this. You'll need to add
>some code to update the count. You may also benefit from using the new
>constraint exclusion (table partitioning) in PostgreSQL 8.1. I am not
>sure if CE works against the inet datatype -- if not, try converting
>the IP to an integer.
>
>

CE looked like it was just for parent/child relationships ... did I read
that right?  I'm not sure how it applies.  And the table partitioning
looks like its still on the todo list ... is that really the case?

And as for getting data into the DB ... from earlier posts it sounded
like standard practice was to bulk load the new data into a temporary
table and then do an INSERT ... SELECT to load the data into the new
table.  Is this still the case with the trigger, or can/should I just
COPY the data straight into the final database?  And I assume that I
should *not* delete my indexes while I'm loading the table, since the
queries in the trigger can take advantage of them ... right?

Also, as a slight aside, has anyone created a data type for single IPs
that is essentially an integer (i.e. 4 bytes) that works with the
standard functions for INET?

-Mike

Re: Network Flow Schema + Bulk Import/Updates

From
Tony Wasson
Date:
On 9/21/05, Michael L. Artz <dragon@october29.net> wrote:
> Tony Wasson wrote:
>
> >You can use the merge trigger below to do this. You'll need to add
> >some code to update the count. You may also benefit from using the new
> >constraint exclusion (table partitioning) in PostgreSQL 8.1. I am not
> >sure if CE works against the inet datatype -- if not, try converting
> >the IP to an integer.
> >
> >
>
> CE looked like it was just for parent/child relationships ... did I read
> that right?  I'm not sure how it applies.  And the table partitioning
> looks like its still on the todo list ... is that really the case?

CE is available in the PostgreSQL 8.1 beta. I was thinking you might
use it to slice up your data based on subnet. You can make it all fit
in a single table, but splitting it up could help keep the indexes
smaller. Are your SELECTs going to primarily base on source and
destination IPs? You have the possibility of a massive amount of rows!
Is there anything to be learned from large installations of snort
using a Pg backend?

> And as for getting data into the DB ... from earlier posts it sounded
> like standard practice was to bulk load the new data into a temporary
> table and then do an INSERT ... SELECT to load the data into the new
> table.  Is this still the case with the trigger, or can/should I just
> COPY the data straight into the final database?  And I assume that I
> should *not* delete my indexes while I'm loading the table, since the
> queries in the trigger can take advantage of them ... right?

The initial load can be a COPY or a bunch of INSERTs and I'd suggest
doing it without any triggers or indexes. Then build your indexes and
add your triggers.

The way I normally use this trigger is with INSERTs. You can use a
COPY against it too.

> Also, as a slight aside, has anyone created a data type for single IPs
> that is essentially an integer (i.e. 4 bytes) that works with the
> standard functions for INET?

I've never seen a datatype that does this. Here are some functions to
convert INET to INT8 and back.

CREATE OR REPLACE FUNCTION inet_ntoa(int8) RETURNS inet
AS '
--from http://www.snort.org/docs/snortdb/snortdb_faq.html#faq_b4
    SELECT (
        (($1>>24) & 255::int8) || ''.'' ||
        (($1>>16) & 255::int8) || ''.'' ||
        (($1>>8)  & 255::int8) || ''.'' ||
        ($1     & 255::int8)
        )::INET;
' LANGUAGE 'sql';

CREATE OR REPLACE FUNCTION inet_aton(inet) RETURNS int8
AS '
  -- this ignores any subnetting information
  --http://www.mcabee.org/lists/snort-users/Oct-01/msg00426.html
  SELECT
  (
    (split_part($1::TEXT,''.'',1)::INT8*16777216) +
    (split_part($1::TEXT,''.'',2)::INT8*65536) +
    (split_part($1::TEXT,''.'',3)::INT8*256) +
    (split_part(split_part($1::TEXT,''.'',4),''/'',1))::INT8
  )::INT8;
' LANGUAGE 'sql';