24.2. Integration with Elasticsearch and Elastic APM #
Integration with external Elasticsearch data sources is used for reading logs written by pgpro-otel-collector.
The components below are required for integration.
- pgpro-otel-collector #
The monitoring agent that provides the following functionality:
collects activity logs from Postgres Pro DBMS instances
sends activity logs to Elastic APM for Elasticsearch
- Elastic APM Elasticsearch #
The application performance monitoring system based on Elastic Stack that provides the following functionality:
receives data from the monitoring agent and converts it to the ES document format
sends converted data to Elasticsearch
- Elasticsearch #
The activity log storage system that provides the following functionality:
receives activity logs from the application performance monitoring system
stores activity logs according to internal storage parameters
provides the interface for receiving activity logs
- PPEM #
The Postgres Pro Enterprise Manager system that provides the following functionality:
accesses Elasticsearch for receiving DBMS instance activity logs
provides the user with the monitoring interface based on activity logs in the form of text data
The integration process includes the following steps:
Additional configuration of the agent is not required.
24.2.1. Configuring Elasticsearch #
Install the Elastic APM server using the standard documentation.
Integrate the Elastic APM server with Elasticsearch using the standard documentation.
Configure the pgpro-otel-collector ingest pipeline.
This is required for compatibility between document (log) fields and the Elasticsearch Common Schema (ECS) field naming schema.
Pipeline configuration example (both queries are executed sequentially in Kibana Developer Tools):
PUT _ingest/pipeline/postgrespro-otelcol-enrich-logs { "description": "Enrich PostgresPro Otel collector logs", "processors": [ { "rename": { "if": "ctx?.labels?.message != null", "field": "labels.message", "target_field": "message", "ignore_failure": true, "ignore_missing": false, "override": true } }, { "rename": { "if": "ctx?.labels?.pid != null", "field": "labels.pid", "target_field": "process.pid", "ignore_failure": true, "ignore_missing": false, "override": true } }, { "rename": { "if": "ctx?.labels?.error_severity != null", "field": "labels.error_severity", "target_field": "log.level", "ignore_failure": true, "ignore_missing": false, "override": true } }, { "rename": { "if": "ctx?.labels?.user != null", "field": "labels.user", "target_field": "user.name", "ignore_failure": true, "ignore_missing": false, "override": true } }, { "rename": { "if": "ctx?.labels?.session_start != null", "field": "labels.session_start", "target_field": "session.start_time", "ignore_failure": true, "ignore_missing": false, "override": true } }, { "rename": { "if": "ctx?.labels?.session_id != null", "field": "labels.session_id", "target_field": "session.id", "ignore_failure": true, "ignore_missing": false, "override": true } }, { "rename": { "if": "ctx?.numeric_labels?.tx_id != null", "field": "numeric_labels.tx_id", "target_field": "transaction.id", "ignore_failure": true, "ignore_missing": false, "override": true } }, { "rename": { "if": "ctx?.labels?.log_file_name != null", "field": "labels.log_file_name", "target_field": "log.file.path", "ignore_failure": true, "ignore_missing": false, "override": true } }, { "rename": { "if": "ctx?.labels?.dbname != null", "field": "labels.dbname", "target_field": "db.name", "ignore_failure": true, "ignore_missing": false, "override": true } }, { "gsub": { "if": "ctx?.service?.node?.name != null", "field": "service.node.name", "target_field": "host.name", "pattern": ":.+$", "replacement": "", "ignore_failure": true, "ignore_missing": false } }, { "remove": { "field": [ "observer.version", "observer.hostname", "service.language.name" ], "ignore_failure": true } }, { "remove": { "field": "agent.version", "if": "ctx?.agent?.version == \"unknown\"", "ignore_failure": true } } ] }PUT _ingest/pipeline/logs-apm.app@custom { "processors": [ { "pipeline": { "name": "postgrespro-otelcol-enrich-logs" } } ] }
24.2.2. Configuring pgpro-otel-collector for Elasticsearch #
Enable and configure the filelog receiver.
Receiver configuration example for the scenario when PostgreSQL logs are generated in the JSON format:
receivers: filelog: include: - /var/log/postgresql/*.json operators: - parse_ints: true timestamp: layout: '%Y-%m-%d %H:%M:%S.%L %Z' layout_type: strptime parse_from: attributes.timestamp type: json_parser - field: attributes.timestamp type: remove retry_on_failure: enabled: true initial_interval: 1s max_elapsed_time: 5m max_interval: 30s start_at: endConfigure processors:
processors: attributes/convert: actions: - action: convert converted_type: string key: query_id - action: convert converted_type: string key: pid resource: attributes: - action: upsert key: service.name value: postgresql - action: upsert key: service.instance.id value: postgresql-01.example.org:5432Where:
service.nameis the key for naming the data stream (data stream) and, consequently, indexes.service.instance.idis the key for identifying the instance.For logs in the JSON format, converting
query_idto a string is required because integers are displayed incorrectly in ES.
Important
Data streams are used for storing data. The target stream is selected automatically and has the
logs-apm.app.service.name-namespaceformat.The
service.namevalue is specified in the collector configuration, in theprocessors.resource.attributeslist, by thekey: service.nameelement.The
namespacevalue is defined by the element with theservice.environmentkey. It is not sent in this configuration so thedefaultvalue is entered by default. If this configuration is used, activity logs will be stored in the stream namedlogs-apm.app.postgresql-default.Configure log sending using otlphttpexporter and the pipeline:
exporters: otlphttp/elastic_logs: compression: gzip endpoint: https://elasticsearch-apm.example.org tls: insecure_skip_verify: false service: extensions: [] pipelines: logs: receivers: - filelog processors: - resource - attributes/convert exporters: - otlphttp/elastic_logsStart the collector and ensure that metrics are published on its side:
# systemctl start pgpro-otel-collector # systemctl status pgpro-otel-collector ● pgpro-otel-collector.service - PostgresPro OpenTelemetry Collector Loaded: loaded (/lib/systemd/system/pgpro-otel-collector.service; enabled; preset: enabled) Active: active (running) since Thu 2025-03-20 01:18:08 MSK; 4h 13min ago Main PID: 6991 (pgpro-otel-coll) Tasks: 8 (limit: 3512) Memory: 119.3M CPU: 2min 49.311s CGroup: /system.slice/pgpro-otel-collector.service └─6991 /usr/bin/pgpro-otel-collector --config /etc/pgpro-otel-collector/basic.yml Mar 20 01:18:08 postgresql-01.example.org pgpro-otel-collector[6991]: {"level":"info","ts":1742422688.366656,"msg":"Setting up own telemetry..."} Mar 20 01:18:08 postgresql-01.example.org pgpro-otel-collector[6991]: {"level":"info","ts":1742422688.367178,"msg":"Skipped telemetry setup."} Mar 20 01:18:08 postgresql-01.example.org pgpro-otel-collector[6991]: {"level":"info","ts":1742422688.3679142,"msg":"Development component. May change in the future.","kind":"receiver","name":"postgrespro","data_type":"metrics"} Mar 20 01:18:08 postgresql-01.example.org pgpro-otel-collector[6991]: {"level":"warn","ts":1742422688.3494158,"caller":"envprovider@v1.16.0/provider.go:59","msg":"Configuration references unset environment variable","name":"POSTGRESQL_P> Mar 20 01:18:08 postgresql-01.example.org pgpro-otel-collector[6991]: {"level":"info","ts":1742422688.4481084,"msg":"Starting pgpro-otel-collector...","Version":"v0.5.0","NumCPU":1} Mar 20 01:18:08 postgresql-01.example.org pgpro-otel-collector[6991]: {"level":"info","ts":1742422688.4481149,"msg":"Starting extensions..."} Mar 20 01:18:08 postgresql-01.example.org pgpro-otel-collector[6991]: {"level":"warn","ts":1742422688.4483361,"msg":"Using the 0.0.0.0 address exposes this server to every network interface, which may facilitate Denial of Service attack> Mar 20 01:18:08 postgresql-01.example.org pgpro-otel-collector[6991]: {"level":"info","ts":1742422688.4515307,"msg":"Starting stanza receiver","kind":"receiver","name":"filelog","data_type":"logs"} Mar 20 01:18:08 postgresql-01.example.org pgpro-otel-collector[6991]: {"level":"info","ts":1742422688.451749,"msg":"Everything is ready. Begin running and processing data."} Mar 20 01:18:08 postgresql-01.example.org pgpro-otel-collector[6991]: {"level":"info","ts":1742422688.6523068,"msg":"Started watching file","kind":"receiver","name":"filelog","data_type":"logs","component":"fileconsumer","path":"/var/log/postgresql/postgresql-2025-03-20.json"}
24.2.3. Checking Logs in Elasticsearch #
After configuring log sending from pgpro-otel-collector, ensure that metrics are received by the Elasticsearch system.
For this check, you can execute a query to the storage using the curl utility.
Query example:
curl -s -XGET "https://elasticsearch.example.org:9200/logs-apm.app.postgresql-default/_search?size=10" -H 'Content-Type: application/json' -d' { "_source": ["message","service.node.name","@timestamp"], "sort": [ { "@timestamp": "desc" } ], "query": { "bool": { "filter": [ { "term":{"service.node.name":"postgresql-01.example.org:5432" }}] } } }'Where:
https://elasticsearch.example.org:9200is the URL of the log storage system.logs-apm.app.postgresql-defaultis the name of the data stream for the search.size=10limits the number of returned logs."_source": ["message","service.node.name","@timestamp"]are requested fields.
Response example:
{ "took": 18, "timed_out": false, "_shards": { "total": 11, "successful": 11, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 10000, "relation": "gte" }, "max_score": null, "hits": [ { "_index": ".ds-logs-apm.app.postgresql-default-2025.03.19-000379", "_id": "qmuArJUB2PKtie47RffA", "_score": null, "_source": { "message": "checkpoint complete: wrote 2038 buffers (16.6%); 0 WAL file(s) added, 0 removed, 10 recycled; write=269.563 s, sync=1.192 s, total=270.962 s; sync files=246, longest=0.677 s, average=0.005 s; distance=162419 kB, estimate=174180 kB; lsn=6/62000850, redo lsn=6/583C4DD8", "@timestamp": "2025-03-19T03:44:01.336Z", "service": { "node": { "name": "postgresql-01.example.org:5432" } } }, "sort": [ 1742355841336 ] } ] } }
24.2.4. Configuring a Log Data Source #
In the navigation panel, go to Infrastructure → Data sources → Message storages.
In the top-right corner of the page, click Create storage.
Specify the log storage parameters (parameters marked with an asterisk are required):
Message storage system: The type of the log storage system.
Select
Elasticsearch.Name: The unique name of the log storage. For example,
Elasticsearch.URL: The network address for connecting to the log storage. For example,
https://elasticsearch.example.org.Elasticsearch index: The name of the index (stream) for search queries.
Specify
logs-apm.app.postgresql-default.User: The unique name of the user if authentication is used.
Password: The password of the user if authentication is used.
Description: The description of the log storage.
Make default datasource: Specifies whether the log storage is used by default for all queries requesting activity logs.
24.2.5. Checking the Log Storage Operation #
In the navigation panel, go to Monitoring → Logs.
The table of logs will be displayed.
To reset filters, click Reset all above the table.
Check if new activity logs are listed in the table.