24.2. Integration with Elasticsearch and Elastic APM #

Integration with external Elasticsearch data sources is used for reading logs written by pgpro-otel-collector.

The components below are required for integration.

pgpro-otel-collector #

The monitoring agent that provides the following functionality:

  • collects activity logs from Postgres Pro DBMS instances

  • sends activity logs to Elastic APM for Elasticsearch

Elastic APM Elasticsearch #

The application performance monitoring system based on Elastic Stack that provides the following functionality:

  • receives data from the monitoring agent and converts it to the ES document format

  • sends converted data to Elasticsearch

Elasticsearch #

The activity log storage system that provides the following functionality:

  • receives activity logs from the application performance monitoring system

  • stores activity logs according to internal storage parameters

  • provides the interface for receiving activity logs

PPEM #

The Postgres Pro Enterprise Manager system that provides the following functionality:

  • accesses Elasticsearch for receiving DBMS instance activity logs

  • provides the user with the monitoring interface based on activity logs in the form of text data

The integration process includes the following steps:

Additional configuration of the agent is not required.

24.2.1. Configuring Elasticsearch #

  1. Install the Elastic APM server using the standard documentation.

  2. Integrate the Elastic APM server with Elasticsearch using the standard documentation.

  3. Configure the pgpro-otel-collector ingest pipeline.

    This is required for compatibility between document (log) fields and the Elasticsearch Common Schema (ECS) field naming schema.

    Pipeline configuration example (both queries are executed sequentially in Kibana Developer Tools):

    PUT _ingest/pipeline/postgrespro-otelcol-enrich-logs
    {
      "description": "Enrich PostgresPro Otel collector logs",
      "processors": [
        {
          "rename": {
            "if": "ctx?.labels?.message != null",
            "field": "labels.message",
            "target_field": "message",
            "ignore_failure": true,
            "ignore_missing": false,
            "override": true
          }
        },
        {
          "rename": {
            "if": "ctx?.labels?.pid != null",
            "field": "labels.pid",
            "target_field": "process.pid",
            "ignore_failure": true,
            "ignore_missing": false,
            "override": true
          }
        },
        {
          "rename": {
            "if": "ctx?.labels?.error_severity != null",
            "field": "labels.error_severity",
            "target_field": "log.level",
            "ignore_failure": true,
            "ignore_missing": false,
            "override": true
          }
        },
        {
          "rename": {
            "if": "ctx?.labels?.user != null",
            "field": "labels.user",
            "target_field": "user.name",
            "ignore_failure": true,
            "ignore_missing": false,
            "override": true
          }
        },
        {
          "rename": {
            "if": "ctx?.labels?.session_start != null",
            "field": "labels.session_start",
            "target_field": "session.start_time",
            "ignore_failure": true,
            "ignore_missing": false,
            "override": true
          }
        },
        {
          "rename": {
            "if": "ctx?.labels?.session_id != null",
            "field": "labels.session_id",
            "target_field": "session.id",
            "ignore_failure": true,
            "ignore_missing": false,
            "override": true
          }
        },
        {
          "rename": {
            "if": "ctx?.numeric_labels?.tx_id != null",
            "field": "numeric_labels.tx_id",
            "target_field": "transaction.id",
            "ignore_failure": true,
            "ignore_missing": false,
            "override": true
          }
        },
        {
          "rename": {
            "if": "ctx?.labels?.log_file_name != null",
            "field": "labels.log_file_name",
            "target_field": "log.file.path",
            "ignore_failure": true,
            "ignore_missing": false,
            "override": true
          }
        },
        {
          "rename": {
            "if": "ctx?.labels?.dbname != null",
            "field": "labels.dbname",
            "target_field": "db.name",
            "ignore_failure": true,
            "ignore_missing": false,
            "override": true
          }
        },
        {
          "gsub": {
            "if": "ctx?.service?.node?.name != null",
            "field": "service.node.name",
            "target_field": "host.name",
            "pattern": ":.+$",
            "replacement": "",
            "ignore_failure": true,
            "ignore_missing": false
          }
        },
        {
          "remove": {
            "field": [
              "observer.version",
              "observer.hostname",
              "service.language.name"
            ],
            "ignore_failure": true
          }
        },
        {
          "remove": {
            "field": "agent.version",
            "if": "ctx?.agent?.version == \"unknown\"",
            "ignore_failure": true
          }
        }
      ]
    }
    
    PUT _ingest/pipeline/logs-apm.app@custom
    {
      "processors": [
        {
          "pipeline": {
            "name": "postgrespro-otelcol-enrich-logs"
          }
        }
      ]
    }
    

24.2.2. Configuring pgpro-otel-collector for Elasticsearch #

  1. Enable and configure the filelog receiver.

    Receiver configuration example for the scenario when PostgreSQL logs are generated in the JSON format:

    receivers:
      filelog:
        include:
        - /var/log/postgresql/*.json
        operators:
        - parse_ints: true
          timestamp:
            layout: '%Y-%m-%d %H:%M:%S.%L %Z'
            layout_type: strptime
            parse_from: attributes.timestamp
          type: json_parser
        - field: attributes.timestamp
          type: remove
        retry_on_failure:
          enabled: true
          initial_interval: 1s
          max_elapsed_time: 5m
          max_interval: 30s
        start_at: end
    
  2. Configure processors:

    processors:
      attributes/convert:
        actions:
        - action: convert
          converted_type: string
          key: query_id
        - action: convert
          converted_type: string
          key: pid
      resource:
        attributes:
        - action: upsert
          key: service.name
          value: postgresql
        - action: upsert
          key: service.instance.id
          value: postgresql-01.example.org:5432
    

    Where:

    • service.name is the key for naming the data stream (data stream) and, consequently, indexes.

    • service.instance.id is the key for identifying the instance.

    • For logs in the JSON format, converting query_id to a string is required because integers are displayed incorrectly in ES.

    Important

    Data streams are used for storing data. The target stream is selected automatically and has the logs-apm.app.service.name-namespace format.

    The service.name value is specified in the collector configuration, in the processors.resource.attributes list, by the key: service.name element.

    The namespace value is defined by the element with the service.environment key. It is not sent in this configuration so the default value is entered by default. If this configuration is used, activity logs will be stored in the stream named logs-apm.app.postgresql-default.

  3. Configure log sending using otlphttpexporter and the pipeline:

    exporters:
      otlphttp/elastic_logs:
        compression: gzip
        endpoint: https://elasticsearch-apm.example.org
        tls:
          insecure_skip_verify: false
    
    service:
      extensions: []
      pipelines:
        logs:
          receivers:
          - filelog
          processors:
          - resource
          - attributes/convert
          exporters:
          - otlphttp/elastic_logs
    
  4. Start the collector and ensure that metrics are published on its side:

    # systemctl start pgpro-otel-collector
    
    # systemctl status pgpro-otel-collector
    ● pgpro-otel-collector.service - PostgresPro OpenTelemetry Collector
        Loaded: loaded (/lib/systemd/system/pgpro-otel-collector.service; enabled; preset: enabled)
        Active: active (running) since Thu 2025-03-20 01:18:08 MSK; 4h 13min ago
      Main PID: 6991 (pgpro-otel-coll)
        Tasks: 8 (limit: 3512)
        Memory: 119.3M
          CPU: 2min 49.311s
        CGroup: /system.slice/pgpro-otel-collector.service
                └─6991 /usr/bin/pgpro-otel-collector --config /etc/pgpro-otel-collector/basic.yml
    
    Mar 20 01:18:08 postgresql-01.example.org pgpro-otel-collector[6991]: {"level":"info","ts":1742422688.366656,"msg":"Setting up own telemetry..."}
    Mar 20 01:18:08 postgresql-01.example.org pgpro-otel-collector[6991]: {"level":"info","ts":1742422688.367178,"msg":"Skipped telemetry setup."}
    Mar 20 01:18:08 postgresql-01.example.org pgpro-otel-collector[6991]: {"level":"info","ts":1742422688.3679142,"msg":"Development component. May change in the future.","kind":"receiver","name":"postgrespro","data_type":"metrics"}
    Mar 20 01:18:08 postgresql-01.example.org pgpro-otel-collector[6991]: {"level":"warn","ts":1742422688.3494158,"caller":"envprovider@v1.16.0/provider.go:59","msg":"Configuration references unset environment variable","name":"POSTGRESQL_P>
    Mar 20 01:18:08 postgresql-01.example.org pgpro-otel-collector[6991]: {"level":"info","ts":1742422688.4481084,"msg":"Starting pgpro-otel-collector...","Version":"v0.5.0","NumCPU":1}
    Mar 20 01:18:08 postgresql-01.example.org pgpro-otel-collector[6991]: {"level":"info","ts":1742422688.4481149,"msg":"Starting extensions..."}
    Mar 20 01:18:08 postgresql-01.example.org pgpro-otel-collector[6991]: {"level":"warn","ts":1742422688.4483361,"msg":"Using the 0.0.0.0 address exposes this server to every network interface, which may facilitate Denial of Service attack>
    Mar 20 01:18:08 postgresql-01.example.org pgpro-otel-collector[6991]: {"level":"info","ts":1742422688.4515307,"msg":"Starting stanza receiver","kind":"receiver","name":"filelog","data_type":"logs"}
    Mar 20 01:18:08 postgresql-01.example.org pgpro-otel-collector[6991]: {"level":"info","ts":1742422688.451749,"msg":"Everything is ready. Begin running and processing data."}
    Mar 20 01:18:08 postgresql-01.example.org pgpro-otel-collector[6991]: {"level":"info","ts":1742422688.6523068,"msg":"Started watching file","kind":"receiver","name":"filelog","data_type":"logs","component":"fileconsumer","path":"/var/log/postgresql/postgresql-2025-03-20.json"}
    

24.2.3. Checking Logs in Elasticsearch #

  • After configuring log sending from pgpro-otel-collector, ensure that metrics are received by the Elasticsearch system.

    For this check, you can execute a query to the storage using the curl utility.

    Query example:

    curl -s -XGET "https://elasticsearch.example.org:9200/logs-apm.app.postgresql-default/_search?size=10" -H 'Content-Type: application/json' -d'
    {
      "_source": ["message","service.node.name","@timestamp"],
      "sort": [
        { "@timestamp": "desc" }
      ],
      "query": {
            "bool": {
                "filter": [
                    { "term":{"service.node.name":"postgresql-01.example.org:5432" }}]
            }
        }
    }'
    

    Where:

    • https://elasticsearch.example.org:9200 is the URL of the log storage system.

    • logs-apm.app.postgresql-default is the name of the data stream for the search.

    • size=10 limits the number of returned logs.

    • "_source": ["message","service.node.name","@timestamp"] are requested fields.

    Response example:

    {
      "took": 18,
      "timed_out": false,
      "_shards": {
        "total": 11,
        "successful": 11,
        "skipped": 0,
        "failed": 0
      },
      "hits": {
        "total": {
          "value": 10000,
          "relation": "gte"
        },
        "max_score": null,
        "hits": [
          {
            "_index": ".ds-logs-apm.app.postgresql-default-2025.03.19-000379",
            "_id": "qmuArJUB2PKtie47RffA",
            "_score": null,
            "_source": {
              "message": "checkpoint complete: wrote 2038 buffers (16.6%); 0 WAL file(s) added, 0 removed, 10 recycled; write=269.563 s, sync=1.192 s, total=270.962 s; sync files=246, longest=0.677 s, average=0.005 s; distance=162419 kB, estimate=174180 kB; lsn=6/62000850, redo lsn=6/583C4DD8",
              "@timestamp": "2025-03-19T03:44:01.336Z",
              "service": {
                "node": {
                  "name": "postgresql-01.example.org:5432"
                }
              }
            },
            "sort": [
              1742355841336
            ]
          }
        ]
      }
    }
    

24.2.4. Configuring a Log Data Source #

  1. In the navigation panel, go to InfrastructureData sourcesMessage storages.

  2. In the top-right corner of the page, click Create storage.

  3. Specify the log storage parameters (parameters marked with an asterisk are required):

    • Message storage system: The type of the log storage system.

      Select Elasticsearch.

    • Name: The unique name of the log storage. For example, Elasticsearch.

    • URL: The network address for connecting to the log storage. For example, https://elasticsearch.example.org.

    • Elasticsearch index: The name of the index (stream) for search queries.

      Specify logs-apm.app.postgresql-default.

    • User: The unique name of the user if authentication is used.

    • Password: The password of the user if authentication is used.

    • Description: The description of the log storage.

    • Make default datasource: Specifies whether the log storage is used by default for all queries requesting activity logs.

24.2.5. Checking the Log Storage Operation #

  1. In the navigation panel, go to MonitoringLogs.

    The table of logs will be displayed.

  2. To reset filters, click Reset all above the table.

  3. Check if new activity logs are listed in the table.