Re: [Solved] Generic logging system for pre-hstore using plperl triggers - Mailing list pgsql-general

From Diego Augusto Molina
Subject Re: [Solved] Generic logging system for pre-hstore using plperl triggers
Date
Msg-id CAGOxLdHDUE8Oc+b4fwEZYuBxvHit2MfTRYnk_7ipDQAT581Kgg@mail.gmail.com
Whole thread Raw
In response to Re: [Solved] Generic logging system for pre-hstore using plperl triggers  (Filip Rembiałkowski <plk.zuber@gmail.com>)
Responses Re: [Solved] Generic logging system for pre-hstore using plperl triggers
Re: [Solved] Generic logging system for pre-hstore using plperl triggers
List pgsql-general
        /* Created by Diego Augusto Molina in 2011 for Tucuman Government,
Argentina. */

/*
    -- Execute the following accordingly to your needs.
  CREATE TRUSTED PROCEDURAL LANGUAGE 'plpgsql';
  CREATE TRUSTED PROCEDURAL LANGUAGE 'plperl';
*/

CREATE ROLE auditor NOSUPERUSER NOINHERIT NOCREATEDB NOCREATEROLE;
CREATE ROLE audit LOGIN ENCRYPTED PASSWORD 'test.1234' NOSUPERUSER
NOINHERIT NOCREATEDB NOCREATEROLE;
CREATE SCHEMA audit AUTHORIZATION audit;
ALTER ROLE auditor SET search_path=audit;
ALTER ROLE audit SET search_path=audit;
SET search_path=audit;
SET SESSION AUTHORIZATION audit;

CREATE SEQUENCE seq_audit
  INCREMENT 1
  MINVALUE -9223372036854775808
  MAXVALUE 9223372036854775807
  START 0
  CACHE 1
  CYCLE;
ALTER TABLE seq_audit OWNER TO audit;

CREATE SEQUENCE seq_elems
  INCREMENT 1
  MINVALUE -32768
  MAXVALUE 32767
  START 0
  CACHE 1
  CYCLE;
ALTER TABLE seq_elems OWNER TO audit;


CREATE TABLE field
(
  id smallint NOT NULL DEFAULT nextval('seq_elems'::regclass),
  value name NOT NULL,
  CONSTRAINT field_pk PRIMARY KEY (id)
  WITH (FILLFACTOR=100),
  CONSTRAINT field_uq_value UNIQUE (value)
  WITH (FILLFACTOR=100)
)
WITH (
  OIDS=FALSE
);
ALTER TABLE field OWNER TO audit;
GRANT ALL ON TABLE field TO audit;
GRANT SELECT ON TABLE field TO auditor;

CREATE TABLE client_inet
(
  id smallint NOT NULL DEFAULT nextval('seq_elems'::regclass),
  value inet NOT NULL DEFAULT inet_client_addr(),
  CONSTRAINT dir_inet_pk PRIMARY KEY (id )
  WITH (FILLFACTOR=100),
  CONSTRAINT dir_inet_uq_value UNIQUE (value)
  WITH (FILLFACTOR=95)
)
WITH (
  OIDS=FALSE
);
ALTER TABLE client_inet
  OWNER TO audit;
GRANT ALL ON TABLE client_inet TO audit;
GRANT SELECT ON TABLE client_inet TO auditor;

CREATE TABLE schema
(
  id smallint NOT NULL DEFAULT nextval('seq_elems'::regclass),
  value name NOT NULL,
  CONSTRAINT schema_pk PRIMARY KEY (id )
  WITH (FILLFACTOR=100),
  CONSTRAINT schema_uq_value UNIQUE (value )
  WITH (FILLFACTOR=100)
)
WITH (
  OIDS=FALSE
);
ALTER TABLE schema
  OWNER TO audit;
GRANT ALL ON TABLE schema TO audit;
GRANT SELECT ON TABLE schema TO auditor;

CREATE TABLE table
(
  id smallint NOT NULL DEFAULT nextval('seq_elems'::regclass),
  value name NOT NULL,
  CONSTRAINT table_pk PRIMARY KEY (id )
  WITH (FILLFACTOR=100),
  CONSTRAINT table_uq_value UNIQUE (value )
  WITH (FILLFACTOR=100)
)
WITH (
  OIDS=FALSE
);
ALTER TABLE table
  OWNER TO audit;
GRANT ALL ON TABLE table TO audit;
GRANT SELECT ON TABLE table TO auditor;

CREATE TABLE user
(
  id smallint NOT NULL DEFAULT nextval('seq_elems'::regclass),
  value name NOT NULL DEFAULT "current_user"(),
  CONSTRAINT user_pk PRIMARY KEY (id )
  WITH (FILLFACTOR=100),
  CONSTRAINT user_uq_value UNIQUE (value )
  WITH (FILLFACTOR=95)
)
WITH (
  OIDS=FALSE
);
ALTER TABLE user
  OWNER TO audit;
GRANT ALL ON TABLE user TO audit;
GRANT SELECT ON TABLE user TO auditor;

CREATE TABLE audit
(
  id bigint,
  type character(1),
  tstmp timestamp with time zone DEFAULT now(),
  schema smallint,
  table smallint,
  user smallint,
  client_inet smallint,
  client_port integer DEFAULT inet_client_port(),
  pid integer DEFAULT pg_backend_pid()
)
WITH (
  OIDS=FALSE
);
ALTER TABLE audit OWNER TO audit;
GRANT ALL ON TABLE audit TO audit;
GRANT SELECT ON TABLE audit TO auditor;

CREATE TABLE audet
(
  id bigint,
  field smallint,
  is_pk boolean,
  before text,
  after text
)
WITH (
  OIDS=FALSE
);
ALTER TABLE audet OWNER TO audit;
GRANT ALL ON TABLE audet TO audit;
GRANT SELECT ON TABLE audet TO auditor;

CREATE OR REPLACE FUNCTION tgf_ins_audet()
  RETURNS trigger AS
$BODY$
    begin
      execute E'insert into audet_' || tg_argv[0] || E'
        (
          id,
          field,
          is_pk,
          before,
          after
        ) values
        (
          '||coalesce(new.id::text,'NULL')||E',
          '||coalesce(new.field::text,'NULL')||E',
          '||coalesce(new.is_pk::text,'NULL')||E',
          '||coalesce(quote_literal(new.before),'NULL')||E',
          '||coalesce(quote_literal(new.after),'NULL')||E'
        )';
      return null;
  end;$BODY$ LANGUAGE plpgsql VOLATILE;
ALTER FUNCTION tgf_ins_audet() SET search_path=auditoria;
ALTER FUNCTION tgf_ins_audet() OWNER TO audit;
GRANT EXECUTE ON FUNCTION tgf_ins_audet() TO audit;

CREATE OR REPLACE FUNCTION tgf_ins_audit()
  RETURNS trigger AS
$BODY$
    begin
      execute E'insert into audit_' || tg_argv[0] || E'
        (
          id,
          type,
          tstmp,
          schema,
          table,
          user,
          client_inet,
          client_port,
          pid
        ) values
        (
          '||coalesce(new.id::text,'NULL')||E',
          '||coalesce(quote_literal(new.type),'NULL')||E',
          '||coalesce(quote_literal(new.tstmp),'NULL')||E',
          '||coalesce(new.schema::text,'NULL')||E',
          '||coalesce(new.table::text,'NULL')||E',
          '||coalesce(new.user::text,'NULL')||E',
          '||coalesce(new.client_inet::text,'NULL')||E',
          '||coalesce(new.client_port::text,'NULL')||E',
          '||coalesce(new.pid::text,'NULL')||E'
        )';
      return null;
  end;$BODY$ LANGUAGE plpgsql VOLATILE;
ALTER FUNCTION tgf_ins_audit() SET search_path=auditoria;
ALTER FUNCTION tgf_ins_audit() OWNER TO audit;
GRANT EXECUTE ON FUNCTION tgf_ins_audit() TO audit;

CREATE TRIGGER tg_audit_20110518
  BEFORE INSERT
  ON audit
  FOR EACH ROW
  EXECUTE PROCEDURE tgf_ins_audit(20110518);

CREATE TRIGGER tg_audet_20110907
  BEFORE INSERT
  ON audet
  FOR EACH ROW
  EXECUTE PROCEDURE tgf_ins_audet(20110907);

CREATE OR REPLACE FUNCTION rotate(character)
  RETURNS void AS
$BODY$
        /* Created by Diego Augusto Molina in 2011 for Tucuman Government,
Argentina. */
    declare
      first_execution boolean := false;
      cur_start char(8) := null;
      cur_tstmp_min timestamp with time zone;
      cur_tstmp_max timestamp with time zone;
      cur_id_min bigint;
      cur_id_max bigint;
      new_start  char(8);
    begin
        /* Determine the creation tstmp of the tables
=========================================================================
*/
      select substring(max(c.relname::text) from $1 || E'_(........)')
into cur_start
        from
          pg_namespace n inner join
          pg_class c on (n.oid = c.relnamespace)
        where
          n.nspname = 'audit'::name and
          c.relname::text like $1 || '_%';
      if cur_start is null then
        first_execution := true;
        cur_start := '';
      end if;
      new_start := cast(to_char(current_timestamp,'YYYYMMDD') as name);

      case $1
        when 'audit' then /* if I'm rotating the table audit
================================================================== */
            /* current table */
          if not first_execution then
            execute 'select min(tstmp), max(tstmp) from audit_' || cur_start
              into cur_tstmp_min, cur_tstmp_max;
            execute $$
              alter index idx_audit_$$|| cur_start ||$$_id
set (fillfactor = 100);
              alter index idx_audit_$$|| cur_start ||$$_tstmp
set (fillfactor = 100);
              alter index idx_audit_$$|| cur_start ||$$_schema__table
set (fillfactor = 100);
              alter index idx_audit_$$|| cur_start ||$$_user
set (fillfactor = 100);

              cluster audit_$$|| cur_start ||$$;
              analyze audit_$$|| cur_start ||$$;
              alter table audit_$$|| cur_start ||$$ add
                constraint audit_$$|| cur_start ||$$_ck_exclusion
                check (
                  tstmp >= '$$|| cur_tstmp_min ||$$'
                    and
                  tstmp <= '$$|| cur_tstmp_max ||$$'
                )
              $$;
          end if;

          execute $$
              /* new table */
            create table audit_$$|| new_start ||$$ () inherits (audit);
            create index idx_audit_$$|| new_start ||$$_id
on audit_$$|| new_start ||$$ using btree (id)            with
(fillfactor = 99);
            create index idx_audit_$$|| new_start ||$$_tstmp
on audit_$$|| new_start ||$$ using btree (tstmp)         with
(fillfactor = 99);
            create index idx_audit_$$|| new_start ||$$_schema__tabla
on audit_$$|| new_start ||$$ using btree (schema, table) with
(fillfactor = 95);
            create index idx_audit_$$|| new_start ||$$_user
on audit_$$|| new_start ||$$ using btree (usuario)       with
(fillfactor = 95);
            cluster audit_$$|| new_start ||$$ using idx_audit_$$||
new_start ||$$_tstmp;

              /* Parent table */
            drop trigger if exists tg_audit_$$|| cur_start ||$$ on audit;
            create trigger tg_audit_$$|| new_start ||$$
              before insert
              on audit
              for each row
              execute procedure tgf_ins_audit('$$|| new_start ||$$');
          $$;
        when 'audet' then /* if I'm rotating the table audet
================================================================== */
            /* current table */
          if not first_execution then
            execute 'select min(id), max(id) from audet_' || cur_start
              into cur_id_min, cur_id_max;
            execute $$
              alter index idx_audet_$$|| cur_start ||$$_id   set
(fillfactor = 100);
              alter index idx_audet_$$|| cur_start ||$$_fieldpk set
(fillfactor = 100);

              cluster audet_$$|| cur_start ||$$;
              analyze audet_$$|| cur_start ||$$;
              alter table audet_$$|| cur_start ||$$ add
                constraint audet_$$|| cur_start ||$$_ck_exclusion
                check (
                  id >= '$$|| cur_id_min ||$$'
                    and
                  id <= '$$|| cur_id_max ||$$'
                );

                /* Parent table */
              drop trigger tg_audet_$$|| cur_start ||$$ on audet;
            $$;
          end if;

          execute $$
              /* new table */
            create table audet_$$|| new_start ||$$ () inherits (audet);
            create index idx_audet_$$|| new_start ||$$_id      on
audet_$$|| new_start ||$$ using btree (id) with (fillfactor = 99);
            create index idx_audet_$$|| new_start ||$$_fieldpk on
audet_$$|| new_start ||$$ using btree (field) with (fillfactor = 99)
where es_pk;
            cluster audet_$$|| new_start ||$$ using idx_audet_$$||
new_start ||$$_id;

              /* Parent table */
            create trigger tg_audet_$$|| new_start ||$$
              before insert
              on audet
              for each row
              execute procedure tgf_ins_audet('$$|| new_start ||$$');
          $$;
        else /* if I got a wrong argument
=====================================================================================
*/
          raise notice E'Error: expected \'audit\' o \'audet\'. Got \'%\'.', $1;
          return;
      end case;
  end;$BODY$ LANGUAGE plpgsql VOLATILE SECURITY DEFINER;
ALTER FUNCTION rotate(character) SET search_path=auditoria;
ALTER FUNCTION rotate(character) OWNER TO audit;
GRANT EXECUTE ON FUNCTION rotate(character) TO audit;

CREATE OR REPLACE FUNCTION audit() RETURNS trigger AS $BODY$

## Created by Diego Augusto Molina in 2011 for Tucuman Government, Argentina.

##  This is the trigger which should be called by any table we want to
audit. If it receives
## arguments, they will be interpreted as the primary key of the
table. If any of the
## arguments is not a column of the table, or no arguments is received
at all, a probing process
## is taken which ends up determining the pk of the table. Thus, it is
better to create the trigger
## calling this function with no arguments. See the TODO list.

## Usage:
##   CREATE TRIGGER <tg_name> AFTER {INSERT | UPDATE | DELETE} [ OR
...] ON <table_name>
##     FOR EACH ROW EXECUTE PROCEDURE audit.audit( { <column_list> |
<nothing_at_all> } );

## KNOWN ISSUE #1: you don't want to use this trigger on a table which
has a column of some type
##                 that doesn't have an implicit cast to string. That
would cause a runtime error
##                 killing your transaction! See TODO #2. Be easy,
most 'common' types have an
##                 implicit cast to string by default.

## TODO #1: In 'P3', instead of asking '( scalar keys %pk == 0 )' each
time, put an 'else'.
## TODO #1.1: if the pk was not passed as argument, at the end of the
probing execute an 'alter
##          trigger' so that next time there's no probing at all. This
would be unfriendly with
##          modifications in the table definition, which should carry
an update in the trigger
##          (putting no arguments at all would imply probing again for
the first time and then we
##          just use it!).
## TODO #2: search for a way to save the binary contents of the
columns instead of the formatted
##          content. The table 'field' would have an extra column of
type 'type' and that would
##          help describing the field audited. That would solve the
problem with strange fields
##          (assuming that _any_ value can be converted to it's
binary/internal representation).
##          This may carry some extra complexity, maybe needing extra
tables holding information
##          about types.
## TODO #3: make this function receive only two parameters: two arrays
of type name[], the first
##          holding the set of columns which are the primary key, the
second one is the set of
##          columns which in addition to the pk one's are to be
registered. Note that pk columns
##          will always be registered because that will identify the
tuple modified.
## TODO #4: support for TRUNCATE event.

      # P0. Declarations and general definitions
###################################################
      ##############################################################################################

    my $elog_pref =
"(schm:$_TD->{table_schema};tab:$_TD->{table_name};trg:$_TD->{name};evt:$_TD->{event}):";
    my $rv = "";                            # Query execution
    my $val = "";                           # Iterating value
    my %tables = (                          # Value of the respective
tables inserted in "audit"
      "user" => 'pg_catalog."session_user"()',
      "table" => "'$_TD->{table_name}'",
      "schema" => "'$_TD->{table_schema}'",
      "client_inet" => "pg_catalog.inet_client_addr()"
    );
    my $id = "";                            # Id of the tuple inserted
in "audit"
    my $field = "";                         # Field id
    my $is_pk = 0;                          # Determines if a field is
part of the PK
    my $before = "";                        # Value of a field in OLD
    my $after = "";                         # Value of a field in NEW
    my %cols = ();                          # Columns of the table
    my %pk = ();                            # Primary key

      # Copy columns from some available transitional variable
-------------------------------------
    if (exists $_TD->{new}){
      %cols = %{$_TD->{new}};
    } else {
      %cols = %{$_TD->{old}};
    }

      # P1. Create necessary tuples in user, table, schema and
client_inet #########################
      ##############################################################################################

    foreach $val (keys %tables){
      $rv = spi_exec_query("select id from $val where value = $tables{$val}");
      if ( $rv->{status} != SPI_OK_SELECT ){
        elog(ERROR, "$elog_pref Error querying table '$val'.");
      }
      if ( $rv->{processed} == 1 ){
        $tables{$val} = $rv->{rows}[0]->{id};
      } else {
        $rv = spi_exec_query("insert into $val (value) values
($tables{$val}) returning id");
        if ( $rv->{status} != SPI_OK_INSERT_RETURNING ){
          elog(ERROR, "$elog_pref Error inserting in table '$val'.");
        }
        $tables{$val} = $rv->{rows}[0]->{id};
      }
    }

      # P2. Insert in audit
########################################################################
      ##############################################################################################

    $rv = spi_exec_query("select nextval('seq_audit'::regclass) as id");
    if ( $rv->{status} != SPI_OK_SELECT ){
      elog(ERROR, "$elog_pref Error querying next value of sequence
'seq_audit'.");
    }
    $id = $rv->{rows}[0]->{id};
    $rv = spi_exec_query("insert into audit (id, type, schema, table,
user, client_inet)
        values (
          $id,
          substring('$_TD->{event}', 1, 1),
          $tables{'schema'},
          $tables{'table'},
          $tables{'user'},
          $tables{'client_inet'}
          )
      ");
    if ($rv->{status} != SPI_OK_INSERT){
      elog(ERROR,"$elog_pref Error inserting tuple in table 'audit'.");
    }

      # P3. Determine PK of the table
##############################################################
      ##############################################################################################

    if ( scalar keys %pk == 0){
        #  Criterion 1: if got params, each of them is a column of the
table, and all of them make
        # up the pk of the table
-------------------------------------------------------------------
        elog(DEBUG, "$elog_pref Searching pk in the trigger's params.");
      if ($_TD->{argc} > 0){
        ARGS: foreach $val ( @{$_TD->{args}} ){
          if (exists $cols{$val}){
            $pk{$val} = "-";
          } else {
            %pk = ();
            elog(DEBUG, "$elog_pref The column '$val' given as
argument does not exist. Skipping to next criterion.");
            last ARGS;
          }
        }
      }
    }
    if ( scalar keys %pk == 0 ) {
        #  Criterion 2: search the pk in the system catalogs
---------------------------------------
      elog(DEBUG, "$elog_pref Searching pk in system catalogs.");
      $rv = spi_exec_query("
        select a.attname from
          ( select cl.oid, unnest(c.conkey) as att
            from
              pg_catalog.pg_constraint c inner join
              pg_catalog.pg_class cl on (c.conrelid = cl.oid)
            where
              c.contype = 'p'
              and cl.oid = $_TD->{relid}
          ) as c inner join
          pg_catalog.pg_attribute a on (c.att = a.attnum and c.oid = a.attrelid)
      ");
      if ( $rv->{status} == SPI_OK_SELECT ){
        if ( $rv->{processed} > 0 ){
          foreach $val ($rv->{rows}){
            $pk{$val->{attname}} = "-";
          }
        }
      } else {
        elog(DEBUG, "$elog_pref Error querying the system catalogs.
Skipping to next criterion.");
      }
    }
    if ( scalar keys %pk == 0) {
        #  Criterion 3: if the table has OIDs, use that as pk and emit
a warning -------------------
        elog(DEBUG, "$elog_pref Searching OIDs in the table.");
        $rv = spi_exec_query("select * from pg_catalog.pg_class where
oid = $_TD->{relid} and relhasoids = true");
        if( $rv->{status} == SPI_OK_SELECT ){
          if ( $rv->{processed} > 0 ){
            %pk = ("oid","-");
            elog(DEBUG, "$elog_pref Using OIDs as table pk for
'$_TD->{table_name}' because no previous criterion could find one.");
          }
        } else {
          elog(DEBUG, "$elog_pref Error querying the system catalogs.
Skipping to next criterion.");
        }
      }
    if ( scalar keys %pk == 0){
        #  Default criterion: all tuples
-----------------------------------------------------------
      elog(DEBUG, "$elog_pref Could not find a suitable pk. Logging
every column.");
      %pk = %cols;
    }

      # P4. Insert in audet
########################################################################
      ##############################################################################################

    foreach $val (keys %cols){
      $is_pk = 0 + exists($pk{$val});
      if ( $_TD->{event} ne "UPDATE" || $is_pk || $_TD->{new}{$val} ne
$_TD->{old}{$val} ){
        $before = (exists $_TD->{old}) ? "'".$_TD->{old}{$val}."'" : "NULL";
        $after  = (exists $_TD->{new}) ? "'".$_TD->{new}{$val}."'" : "NULL";
        if ( $_TD->{event} eq "UPDATE" && $_TD->{new}{$val} eq
$_TD->{old}{$val}){
            #   We don't save the previous state of the column which
is part of the pk while updating
            # if it hasn't changed.
          $before = "NULL";
        }
        $rv = spi_exec_query("select id from field where value = '$val'");
        if ( $rv->{status} != SPI_OK_SELECT ){
          elog(ERROR, "$elog_pref Error querying table 'field'.");
        }
        if ( $rv->{processed} > 0 ){
          $field = $rv->{rows}[0]->{id};
        } else {
          $rv = spi_exec_query("insert into field (value) values
('$val') returning id");
          if ( $rv->{status} != SPI_OK_INSERT_RETURNING ){
            elog(ERROR, "$elog_pref Error executing insert returning
in table 'field'.");
          }
          $field = $rv->{rows}[0]->{id};
        }
        $rv = spi_exec_query("insert into audet (id, field, is_pk,
before, after)
          values ($id, $field, cast($is_pk as boolean), cast($before
as text), cast($after as text))");
        if ( $rv->{status} ne SPI_OK_INSERT ){
          elog(ERROR, "$elog_pref Error inserting tuples in table 'audet'.");
        }
      }
    }

      # P5. Finishing
##############################################################################
      ##############################################################################################

    return;

  $BODY$
  LANGUAGE plperl VOLATILE SECURITY DEFINER;
ALTER FUNCTION audit() SET search_path=auditoria;
ALTER FUNCTION audit() OWNER TO audit;
GRANT EXECUTE ON FUNCTION audit() TO public;

pgsql-general by date:

Previous
From: John R Pierce
Date:
Subject: Re: Quick-and-Dirty Data Entry with LibreOffice3?
Next
From: Diego Augusto Molina
Date:
Subject: Re: [Solved] Generic logging system for pre-hstore using plperl triggers