6.3. Working with Logs #

This section describes the steps required to manage logs.

Warning

Log collection in the JSON format for DBMS instances is not supported in Postgres Pro version 14 and earlier.

6.3.1. General Setup #

6.3.1.1. Adding and Configuring the filelog Receiver #

The filelog receiver is an open-source component of the OpenTelemetry Collector that is used for collecting logs from the DBMS instance. For detailed information about this receiver, refer to the OpenTelemetry documentation.

The filelog receiver should be added to the receivers section and configured.

The receiver configuration depends on the database instance setup and the log format used (see the logging_collector and log_destination parameters). The collector supports log collection in the CSV and JSON formats.

Regardless of the log format, the path to the log directory and the template for log file names need to be specified.

An example of setting up a receiver for collecting logs in the JSON format:

receivers:
  filelog:
    include: [ /var/log/postgresql/*.json ]
    start_at: end
    retry_on_failure:
      enabled: true
      initial_interval: 1s
      max_interval: 30s
      max_elapsed_time: 5m
    operators:
      - type: json_parser
        parse_ints: true
        timestamp:
          parse_from: attributes.timestamp
          layout_type: strptime
          layout: '%Y-%m-%d %H:%M:%S.%L %Z'
        severity:
          parse_from: attributes.error_severity
          mapping:
            debug: [ DEBUG ]
            info:  [ INFO, NOTICE, LOG ]
            warn:  [ WARNING ]
            error: [ ERROR ]
            fatal: [ FATAL, PANIC ]
      - type: remove
        field: attributes.timestamp

The severity section defines how log severity levels are parsed and categorized based on their importance and urgency. For more information on this parameter, refer to the OpenTelemetry documentation.

An example of setting up a receiver for collecting logs in the CVS format:

receivers:
  filelog:
    include: [ /var/log/postgresql/*.csv ]
    start_at: end
    retry_on_failure:
      enabled: true
      initial_interval: 1s
      max_interval: 30s
      max_elapsed_time: 5m
    multiline:
      line_start_pattern: ^[0-9]{4}-[0-9]{2}-[0-9]{2}
    operators:
      - type: csv_parser
        header: timestamp,user,dbname,pid,connection_from,session_id,line_num,ps,session_start,vxid,txid,error_severity,state_code,message,detail,hint,internal_query,internal_position,context,statement,cursor_position,func_name,application_name,backend_type,leader_pid,query_id
        timestamp:
          parse_from: attributes.timestamp
          layout_type: strptime
          layout: '%Y-%m-%d %H:%M:%S.%L %Z'
        severity:
          parse_from: attributes.error_severity
          mapping:
            debug: [ DEBUG ]
            info:  [ INFO, NOTICE, LOG ]
            warn:  [ WARNING ]
            error: [ ERROR ]
            fatal: [ FATAL, PANIC ]
      - type: remove
        field: attributes.timestamp

Note

CSV configuration requires specifying more parameters than other formats, as it has to adapt to CSV logging specifics.

A detailed description of configuration parameters with examples can be found in the /usr/share/doc/pgpro-otel-collector/examples directory.

6.3.1.2. Adding and Configuring the journald Receiver #

The journald receiver is an open-source component of the OpenTelemetry Collector for gathering logs from the systemd journal, which is particularly useful for Shardman and Postgres Pro Enterprise Manager(PPEM). For detailed information about this receiver, refer to the OpenTelemetry documentation.

The log_destination parameter in Postgres Pro must be configured to send logs to syslog. For more details on the configuration, refer to the section Error Reporting and Logging.

An example of setting up the journald receiver for sending logs in the journald format to PPEM for PostgreSQL:

receivers:
  journald:
    directory: /var/log/journal
    start_at: end
    units:
     - postgresql@17-main
    operators:
      # Rename _PID to pid. This field is required by PPEM
      - type: move
        id: "pid"
        from: body._PID
        to: attributes.pid
      # Rename __MONOTONIC_TIMESTAMP to line_num. This field is required by PPEM
      - type: move
        id: "line_num"
        from: body.__MONOTONIC_TIMESTAMP
        to: attributes.line_num
      # Rename MESSAGE to message
      - type: move
        id: "message"
        from: body.MESSAGE
        to: attributes.message
      # Rename _SYSTEMD_UNIT to backend_type
      - type: move
        id: "backend_type"
        field: attributes.backend_type
        from: body._SYSTEMD_UNIT
        to: attributes.backend_type
      # Transform PRIORITY number to severity text
      - type: severity_parser
        parse_from: body.PRIORITY
        overwrite_text: true
        mapping:
            debug: [ 7 ] # debug
            info:  [ 5, 6 ] # notice, info
            warn:  [ 4 ] # warning
            error: [ 3 ] # err
            fatal: [ 0, 1, 2 ] # emerg, alert, crit
processors:
  batch/journald:
    send_batch_size: 4096
    timeout: 15s
  resource:
    attributes:
    - action: upsert
      key: service.name
      value: postgresql
    - action: upsert
      key: service.instance.id
      value: address-of-postgres-instance:5432
  attributes/convert:
    actions:
      - key: pid
        action: convert
        converted_type: int
      - key: line_num
        action: convert
        converted_type: int
  transform:
    log_statements:
      - context: log
        statements:
          # Set the error_severity attribute from the severity_text field. This field is required by PPEM
          - set(log.attributes["error_severity"], log.severity_text)
          # Set the session_start attribute. This field is required by PPEM
          - set(log.attributes["session_start"], FormatTime(log.time, "%Y-%m-%d %H:%M:%S %Z"))
          # Set the 'session_id' field from _SYSTEMD_INVOCATION_ID or INVOCATION_ID. This field is required by PPEM
          - set(log.attributes["session_id"], log.body["_SYSTEMD_INVOCATION_ID"]) where log.body["_SYSTEMD_INVOCATION_ID"] != nil
          - set(log.attributes["session_id"], log.body["INVOCATION_ID"]) where (log.attributes["session_id"] == nil and log.body["INVOCATION_ID"] != nil)
exporters:
  otlphttp:
    endpoint: https://logs.example.org:8080
    tls:
      insecure_skip_verify: true
    compression: ""
    headers:
      "X-Ppem-Source-Agent-Name": ppem-agent
      "X-Ppem-Source-Instance-Port": '5432'
service:
  telemetry:
  pipelines:
    logs:
      receivers: [ journald ]
      processors: [ resource,attributes/convert,transform,batch/journald ]
      exporters: [ otlphttp ]

This receiver can also be used to read Shardman logs. Below is the example setup for reading such logs and sending them to Elasticsearch.

receivers:
  journald:
    directory: /var/log/journal
    start_at: end
    units:
      - shardmand@*
    operators:
      # Parse shardmand message into corresponding log fields
      - type: regex_parser
        parse_from: body.MESSAGE
        regex: '^(?P<timestamp>.+)	(?P<level>.+)	(?P<message>.*)	(?P<fields>.+)$'
        timestamp:
          parse_from: attributes.timestamp
          layout_type: strptime
          layout: '%Y-%m-%dT%H:%M:%S.%f%z'
        severity:
          parse_from: attributes.level
      # Parse additional attributes from the 'fields' JSON string
      - type: json_parser
        parse_from: attributes.fields
      # Remove the parsed timestamp to avoid duplication
      - type: remove
        id: remove_timestamp
        field: attributes.timestamp
      # Remove the parsed level after severity is extracted
      - type: remove
        id: remove_level
        field: attributes.level
      # Remove the parsed 'fields' JSON string after parsing
      - type: remove
        id: remove_fields
        field: attributes.fields
      # Retain only fields useful for debugging
      - type: retain
        fields:
          - body._PID
          - body._GID
          - body._UID
          - body._CMDLINE
          - body._EXE
          - body._HOSTNAME
          - body._SYSTEMD_UNIT
          - body._TRANSPORT
processors:
  batch/journald:
    send_batch_size: 4096
    timeout: 15s
  transform:
    log_statements:
      - context: log
        statements:
          # Set resource.process.pid from body._PID, then remove body._PID
          - set(resource.attributes["process.pid"], Int(log.body["_PID"]))
          - delete_key(log.body, "_PID")
          # Set resource.process.command_line from body._CMDLINE (if present), then remove body._CMDLINE
          - set(resource.attributes["process.command_line"], log.body["_CMDLINE"]) where log.body["_CMDLINE"] != nil
          - delete_key(log.body, "_CMDLINE")
  resource:
    attributes:
    - action: upsert
      key: service.name
      value: shardmand
    - action: upsert
      key: service.instance.id
      value: address-of-shardman-instance:5432
exporters:
  otlphttp:
    compression: gzip
    endpoint: https://logs.example.org:8080
    tls:
      insecure_skip_verify: true
service:
  pipelines:
    logs:
      receivers: [ journald ]
      processors: [ transform,resource,batch/journald ]
      exporters: [ otlphttp ]

6.3.1.3. Adding and Configuring the SQL Query Receiver #

The sqlquery receiver is an open-source component of the OpenTelemetry Collector for gathering metrics and/or logs from custom SQL queries.

Warning

The sqlquery receiver is currently experimental and is not recommended for production use.

To set up the sqlquery receiver for collecting logs, follow the example procedure below.

  1. Create the sqlquery.yml configuration file.

  2. In the created configuration file, specify database connection parameters in the receivers.sqlquery section:

    receivers:
      sqlquery:
        driver: postgres
        host: localhost
        port: 5432
        database: postgres
        username: postgres
        password: ${env:POSTGRESQL_PASSWORD}
        # Additional driver-specific connection parameters
        additional_params:
          application_name: pgpro-otel-collector
          sslmode: disable
        # The time interval between query executions. Default: 10s
        collection_interval: 60s
        # storage: file_storage
        # Defines setup for the component's own telemetry
        telemetry:
          logs:
            # If true, each executed query is logged at debug level
            query: false
        # The maximum number of open connections to the Postgres Pro server. Default: 0 (unlimited)
        max_open_conn: 5
    

    For persistent tracking across the collector restarts, configure a storage extension and reference it with the storage parameter. For more details, refer to the OpenTelemetry documentation.

    The password parameter supports environment variable substitution, as shown in the example. Special characters in the credentials are automatically URL-encoded to ensure proper connection string formatting.

    Alternatively, you can use the datasource parameter to provide a complete connection string:

    datasource: "host=localhost port=5432 user=postgres password=postgres application_name=pgpro-otel-collector sslmode=disable"
    
  3. Add queries to collect logs. Each query consists of an SQL statement and a logs section. There may be several logs sections, but at least one such section is required.

    receivers:
      sqlquery:
        ...
        queries:
          - sql: SELECT id, message, type FROM my_logs WHERE id > $1 ORDER BY id
            tracking_start_value: "0"
            tracking_column: id
            logs:
              # Column containing the log message text
              - body_column: message
                # Columns to include as log attributes
                attribute_columns: ["type"]
    

    This example assumes the following database schema:

    CREATE TABLE my_logs (id INTEGER, message TEXT, type TEXT);
    INSERT INTO my_logs VALUES (1, 'message1', 'info'), (2, 'message2', 'info'), (3, 'message3', 'error');
    

    The tracking_start_value parameter defines the initial value for the query parameter ($1), and tracking_column specifies which column value to store for subsequent queries. These parameters apply only to log collection.

    Note

    To prevent duplicate log collection across collection intervals, use parameterized queries with the tracking_start_value and tracking_column parameters. Ensure query results are sorted in ascending order by the tracking_column value.

    Note

    Avoid queries that produce NULL values. If a query returns NULL in a column referenced in the configuration, errors will be logged, but the receiver will continue operating.

  4. Configure exporters, processors, and the service pipeline:

    ...
    exporters:
      otlphttp/sqlquery/logs:
        compression: gzip
        endpoint: https://logs.example.org:8080
        headers:
          X-Ppem-Source-Agent-Name: local
          X-Ppem-Source-Instance-Port: '5432'
    processors:
      batch/sqlquery:
        send_batch_size: 2048
        timeout: 10s
      resource:
        attributes:
          - key: service.name
            action: upsert
            value: postgresql
          - key: service.instance.id
            action: upsert
            value: address-of-postgres-instance:5432
    service:
      # Telemetry for the collector itself
      telemetry:
        logs:
          # Sets the minimum enabled logging level
          # Values: debug, info, warn, error
          # Default = info
          level: info
      pipelines:
        logs/sqlquery:
          receivers: [ sqlquery ]
          processors: [ batch/sqlquery, resource ]
          exporters: [ otlphttp/sqlquery/logs ]
    
  5. Start pgpro-otel-collector with the created configuration file:

    build/pgpro-otel-collector/pgpro-otel-collector --config configs/sqlquery.yml
    

For the full list of the sqlquery configuration parameters, refer to the OpenTelemetry documentation.

6.3.1.4. Adding and Configuring the attributes, resource, and filter Processors #

The attributes, resource, and filter processors are open-source components of the OpenTelemetry Collector.

The processor configuration also depends on the database instance setup and the log format used (see the logging_collector and log_destination parameters).

The resource processor needs to be configured when sending logs to Elastic. Regardless of the log format, the service.name and service.instance.id attributes need to be specified.

Logs can be filtered by severity level using the filter processor, as shown in the examples below.

An example of setting up processors for collecting logs in the JSON format:

processors:
  filter/include:
    logs:
      include:
        match_type: strict # Or regexp
        severity_texts:
          # - "DEBUG"
          - "INFO"
          - "NOTICE"
          - "WARNING"
          - "ERROR"
          - "LOG"
          - "FATAL"
          - "PANIC"
  attributes/convert:
    actions:
      - key: query_id
        action: convert
        converted_type: string
      - key: pid
        action: convert
        converted_type: string
  resource:
    attributes:
      - key: service.name
        action: upsert
        value: postgresql
      - key: service.instance.id
        action: upsert
        value: 1.2.3.4:5432

An example of setting up processors for collecting logs in the CSV format:

processors:
  filter/include:
    logs:
      include:
        match_type: strict # Or regexp
        severity_texts:
          # - "DEBUG"
          - "INFO"
          - "NOTICE"
          - "WARNING"
          - "ERROR"
          - "LOG"
          - "FATAL"
          - "PANIC"
  attributes/convert:
    actions:
      - key: pid
        action: convert
        converted_type: int
      - key: line_num
        action: convert
        converted_type: int
      - key: txid
        action: convert
        converted_type: int
      key: remote_port
        action: convert
        converted_type: int
      - key: cursor_position
        action: convert
        converted_type: int
      - key: internal_position
        action: convert
        converted_type: int
      - key: leader_pid
        action: convert
        converted_type: int
  resource:
    attributes:
      - key: service.name
        action: upsert
        value: postgresql
      - key: service.instance.id
        action: upsert
        value: 1.2.3.4:5432

6.3.1.5. Adding and Configuring the otlphttp Exporter #

The otlphttp exporter is an open-source component of the OpenTelemetry Collector and is used for exporting collected logs to an OTLP-compatible storage or monitoring system that has to be predeployed and accessible. For more details, refer to the OpenTelemetry documentation.

To configure the otlphttp exporter, it is sufficient to specify the address of the target system where data should be sent:

exporters:
  otlphttp:
    endpoint: https://otlp.example.org

6.3.1.6. Adding and Configuring the kafka Exporter #

The kafka exporter is an open-source component of the OpenTelemetry Collector for sending metrics and logs to Apache Kafka. For more details, refer to the OpenTelemetry documentation.

Below is the example of setting it up for sending logs.

receivers:
  filelog:
    include: [ /var/log/postgresql/*.json ]
    start_at: end
    retry_on_failure:
      enabled: true
      initial_interval: 1s
      max_interval: 30s
      max_elapsed_time: 5m
    operators:
      - type: json_parser
        parse_ints: true
        timestamp:
          parse_from: attributes.timestamp
          layout_type: strptime
          layout: '%Y-%m-%d %H:%M:%S.%L %Z'
        severity:
          parse_from: attributes.error_severity
          mapping:
            debug: [ DEBUG ]
            info:  [ INFO, NOTICE, LOG ]
            warn:  [ WARNING ]
            error: [ ERROR ]
            fatal: [ FATAL, PANIC ]
      - type: remove
        id: remove_timestamp
        field: attributes.timestamp
exporters:
  kafka:
    brokers:
      - localhost:9092
    protocol_version: 2.1.0
    client_id: pgpro-otel-collector
    logs:
      topic: otlp_logs
      encoding: otlp_json # proto supported
    include_metadata_keys:
      - service.name
      - service.instance.id
    tls:
      insecure: true
    timeout: 30s
    producer:
      max_message_bytes: 1000000
      required_acks: 1
      compression: none # gzip, snappy, lz4, and zstd;
processors:
  batch/kafka:
    send_batch_size: 1024
    timeout: 1s
  attributes/convert:
    actions:
      - key: query_id
        action: convert
        converted_type: string
  resource:
    attributes:
      - key: service.name
        action: upsert
        value: postgresql
      - key: service.instance.id
        action: upsert
        value: address-of-postgres-instance:5432
service:
  pipelines:
    logs/kafka:
      receivers: [ filelog ]
      processors: [ batch/kafka,resource,attributes/convert ]
      exporters: [ kafka ]

6.3.1.7. Setting up a Pipeline #

Once receivers, processors, and exporters are added and configured, they need to be combined into a pipeline. The pipeline is configured in the service section. The pipeline contents depend altogether on the previously added components (there is no default configuration).

Below is the example of how to set up a pipeline for log management. The data is collected by the filelog receiver, processed by the resource and attributes processors and exported by the otlphttp exporter.

Thus, all the components used in the pipeline should also be added in the configuration file and set up.

service:
  extensions: []
  pipelines:
    logs:
      receivers:
        - filelog
      processors:
        - resource
        - attributes/convert
      exporters:
        - otlphttp

6.3.2. Use Cases #

6.3.2.1. Using pgpro-otel-collector with VictoriaLogs #

VictoriaLogs is a log management system that can receive OpenTelemetry logs via OTLP over HTTP. Follow the procedure below to set up pgpro-otel-collector for export to VictoriaLogs.

  1. Create the victorialogs.yml configuration file with the following content:

    exporters:
      otlphttp/victorialogs:
        compression: gzip
        # The encoding to use for messages (must be "proto", which is the default value)
        encoding: proto
        # The base URL of the VictoriaLogs endpoint to send data to (without a suffix)
        # By default, the collector appends /v1/logs
        endpoint: http://localhost:9428/insert/opentelemetry
        tls:
          # Whether to skip certificate verification
          insecure_skip_verify: false
    processors:
      resource:
        attributes:
          - key: service.name
            action: upsert
            value: postgresql
          - key: service.instance.id
            action: upsert
            value: address-of-postgres-instance:5432
    service:
      extensions: []
      pipelines:
        logs:
          receivers:  [ filelog ]
          processors: [ resource ]
          exporters:  [ otlphttp/victorialogs ]
    

    gzip is the default value for the compression parameter. It reduces network bandwidth usage and is recommended for VictoriaLogs. For more information, refer to the OpenTelemetry documentation.

    VictoriaLogs infers the types of attributes based on their values. To send logs to another system, like Elasticsearch, consider using the convert processor. For more details, refer to the example setup shown in the logs_json.yml file.

    Note

    VictoriaLogs uses _stream fields for efficient log organization and querying. The resource processor defines these streams. Only include attributes that are frequently used in queries. For more information about streams, refer to the VictoriaLogs documentation.

  2. Start pgpro-otel-collector with the created victorialogs.yml configuration file and the preferred log configuration:

    build/pgpro-otel-collector/pgpro-otel-collector --config configs/logs_json.yml --config configs/victorialogs.yml
    # Or build/pgpro-otel-collector/pgpro-otel-collector --config configs/logs_csv.yml --config configs/victorialogs.yml
    
  3. Verify that logs are visible in the UI:

    http://localhost:9428/select/vmui
    

For detailed information on OpenTelemetry integration with VictoriaLogs, refer to the VictoriaMetrics documentation.