From dcf38436ecb78176aad3f2944cf20b18b3390186 Mon Sep 17 00:00:00 2001 From: Aleksander Alekseev Date: Tue, 25 Jul 2023 13:03:23 +0300 Subject: [PATCH v2] TODO FIXME write a proper commit message; this is a PoC/WIP patch Anthonin Bonnefoy, reviewed by Nikita Malakhov, Aleksander Alekseev Disucussion: https://postgr.es/m/CAO6_XqoADFOMB6J%2BVc-nhh-ck3%3DeyvRwX57_ZcGMkyBWEGWTDA%40mail.gmail.com --- contrib/Makefile | 1 + contrib/meson.build | 1 + contrib/pg_tracing/Makefile | 21 + contrib/pg_tracing/expected/reset.out | 13 + contrib/pg_tracing/expected/select.out | 237 ++++ contrib/pg_tracing/explain.c | 433 ++++++++ contrib/pg_tracing/explain.h | 10 + contrib/pg_tracing/meson.build | 39 + contrib/pg_tracing/pg_tracing--1.0.sql | 88 ++ contrib/pg_tracing/pg_tracing.c | 1381 ++++++++++++++++++++++++ contrib/pg_tracing/pg_tracing.conf | 1 + contrib/pg_tracing/pg_tracing.control | 5 + contrib/pg_tracing/pg_tracing.h | 50 + contrib/pg_tracing/query_process.c | 472 ++++++++ contrib/pg_tracing/query_process.h | 32 + contrib/pg_tracing/span.c | 249 +++++ contrib/pg_tracing/span.h | 131 +++ contrib/pg_tracing/sql/reset.sql | 3 + contrib/pg_tracing/sql/select.sql | 105 ++ src/backend/executor/instrument.c | 1 + src/include/executor/instrument.h | 1 + 21 files changed, 3274 insertions(+) create mode 100644 contrib/pg_tracing/Makefile create mode 100644 contrib/pg_tracing/expected/reset.out create mode 100644 contrib/pg_tracing/expected/select.out create mode 100644 contrib/pg_tracing/explain.c create mode 100644 contrib/pg_tracing/explain.h create mode 100644 contrib/pg_tracing/meson.build create mode 100644 contrib/pg_tracing/pg_tracing--1.0.sql create mode 100644 contrib/pg_tracing/pg_tracing.c create mode 100644 contrib/pg_tracing/pg_tracing.conf create mode 100644 contrib/pg_tracing/pg_tracing.control create mode 100644 contrib/pg_tracing/pg_tracing.h create mode 100644 contrib/pg_tracing/query_process.c create mode 100644 contrib/pg_tracing/query_process.h create mode 100644 contrib/pg_tracing/span.c create mode 100644 contrib/pg_tracing/span.h create mode 100644 contrib/pg_tracing/sql/reset.sql create mode 100644 contrib/pg_tracing/sql/select.sql diff --git a/contrib/Makefile b/contrib/Makefile index bbf220407b..834685ffbe 100644 --- a/contrib/Makefile +++ b/contrib/Makefile @@ -37,6 +37,7 @@ SUBDIRS = \ pg_prewarm \ pg_stat_statements \ pg_surgery \ + pg_tracing \ pg_trgm \ pgrowlocks \ pgstattuple \ diff --git a/contrib/meson.build b/contrib/meson.build index bd4a57c43c..ebe0790e89 100644 --- a/contrib/meson.build +++ b/contrib/meson.build @@ -48,6 +48,7 @@ subdir('pgrowlocks') subdir('pg_stat_statements') subdir('pgstattuple') subdir('pg_surgery') +subdir('pg_tracing') subdir('pg_trgm') subdir('pg_visibility') subdir('pg_walinspect') diff --git a/contrib/pg_tracing/Makefile b/contrib/pg_tracing/Makefile new file mode 100644 index 0000000000..dfbcb2d1c4 --- /dev/null +++ b/contrib/pg_tracing/Makefile @@ -0,0 +1,21 @@ +# contrib/pg_tracing/Makefile + +MODULE_big = pg_tracing +OBJS = \ + $(WIN32RES) \ + pg_tracing.o + +EXTENSION = pg_tracing +TAP_TESTS = 0 +PGFILEDESC = "pg_tracing - Distributed tracing for postgres" + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = contrib/pg_tracing +top_builddir = ../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/contrib/pg_tracing/expected/reset.out b/contrib/pg_tracing/expected/reset.out new file mode 100644 index 0000000000..2e0c903121 --- /dev/null +++ b/contrib/pg_tracing/expected/reset.out @@ -0,0 +1,13 @@ +-- Check reset is working +SELECT pg_tracing_reset(); + pg_tracing_reset +------------------ + +(1 row) + +SELECT traces from pg_tracing_info; + traces +-------- + 0 +(1 row) + diff --git a/contrib/pg_tracing/expected/select.out b/contrib/pg_tracing/expected/select.out new file mode 100644 index 0000000000..8460f29e84 --- /dev/null +++ b/contrib/pg_tracing/expected/select.out @@ -0,0 +1,237 @@ +CREATE EXTENSION pg_tracing; +/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000001-0000000000000001-01'*/ SELECT 1; + ?column? +---------- + 1 +(1 row) + +-- Get top span id +SELECT span_id AS top_span_id from pg_tracing_spans(false) where parent_id=1 and name!='Parse' \gset +-- Check parameters +SELECT parameters from pg_tracing_spans(false) where span_id=:top_span_id; + parameters +------------ + $1 = 1 +(1 row) + +-- Check the number of children +SELECT count(*) from pg_tracing_spans(false) where parent_id=:'top_span_id'; + count +------- + 5 +(1 row) + +-- Check resource +SELECT resource from pg_tracing_spans(false) where trace_id=1 order by span_start; + resource +------------ + Parse + SELECT $1; + Planner + Start + Run + Result + Finish + End +(8 rows) + +-- Check reported number of trace +SELECT traces from pg_tracing_info; + traces +-------- + 1 +(1 row) + +CREATE OR REPLACE FUNCTION test_function(a int) RETURNS SETOF oid AS +$BODY$ +BEGIN + RETURN QUERY SELECT oid from pg_class where oid = a; +END; +$BODY$ +LANGUAGE plpgsql; +/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000002-0000000000000002-01'*/ select test_function(1); + test_function +--------------- +(0 rows) + +SELECT span_id AS top_span, + extract(epoch from span_start) as top_start, + round(extract(epoch from span_start) + duration / 1000000000.0) as top_end + from pg_tracing_spans(false) where parent_id=2 and name!='Parse' \gset +SELECT span_id AS top_run_span, + extract(epoch from span_start) as top_run_start, + round(extract(epoch from span_start) + duration / 1000000000.0) as top_run_end + from pg_tracing_spans(false) where parent_id=:top_span and name='Executor' and resource='Run' \gset +SELECT span_id AS top_project, + extract(epoch from span_start) as top_project_start, + round(extract(epoch from span_start) + duration / 1000000000.0) as top_project_end + from pg_tracing_spans(false) where parent_id=:top_run_span and name='ProjectSet' \gset +SELECT span_id AS top_result, + extract(epoch from span_start) as top_result_start, + round(extract(epoch from span_start) + duration / 1000000000.0) as top_result_end + from pg_tracing_spans(false) where parent_id=:top_project and name='Result' \gset +SELECT span_id AS nested_select, + extract(epoch from span_start) as select_start, + round(extract(epoch from span_start) + duration / 1000000000.0) as select_end + from pg_tracing_spans(false) where parent_id=:top_result and name='Select' \gset +SELECT span_id AS nested_run, + extract(epoch from span_start) as run_start, + round(extract(epoch from span_start) + duration / 1000000000.0) as run_end + from pg_tracing_spans(false) where parent_id=:nested_select and resource='Run' \gset +SELECT :top_start < :top_run_start, + :top_end >= :top_run_end, + :top_run_start <= :top_project_start, + :top_run_end >= :top_project_end, + :top_run_end >= :select_end, + :top_run_end >= :run_end, + :run_end >= :select_end; + ?column? | ?column? | ?column? | ?column? | ?column? | ?column? | ?column? +----------+----------+----------+----------+----------+----------+---------- + t | t | t | t | t | t | t +(1 row) + +SELECT resource from pg_tracing_spans(false) where parent_id=:nested_run order by resource; + resource +---------------------------------------------------- + IndexOnlyScan using pg_class_oid_index on pg_class +(1 row) + +/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000003-0000000000000003-01'*/ SELECT * from current_database(); + current_database +----------------------- + regression_pg_tracing +(1 row) + +SELECT resource from pg_tracing_spans(false) where trace_id=3 order by resource; + resource +----------------------------------- + End + Finish + FunctionScan on current_database + Parse + Planner + Run + SELECT * from current_database(); + Start +(8 rows) + +/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000004-0000000000000004-01'*/ SELECT s.relation_size + s.index_size +FROM (SELECT + pg_relation_size(C.oid) as relation_size, + pg_indexes_size(C.oid) as index_size + FROM pg_class C) as s limit 1; + ?column? +---------- + 0 +(1 row) + +SELECT resource from pg_tracing_spans(false) where trace_id=4 order by resource; + resource +------------------------------------------------------------------------------------------------------------------------------------------------------------------ + End + Finish + Limit + Parse + Planner + Run + SELECT s.relation_size + s.index_sizeFROM (SELECT pg_relation_size(C.oid) as relation_size, pg_indexes_size(C.oid) as index_size FROM pg_class C) as s limit $1; + SeqScan on pg_class c + Start + SubqueryScan on s +(10 rows) + +-- Check tracking option +set pg_tracing.track = 'top'; +/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000005-0000000000000005-01'*/ select test_function(1); + test_function +--------------- +(0 rows) + +SELECT count(*) from pg_tracing_spans where trace_id=5; + count +------- + 10 +(1 row) + +set pg_tracing.track = 'none'; +/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000006-0000000000000006-01'*/ select test_function(1); + test_function +--------------- +(0 rows) + +SELECT count(*) from pg_tracing_spans where trace_id=6; + count +------- + 0 +(1 row) + +set pg_tracing.track = 'all'; +-- Check that we're in a correct state after a timeout +set statement_timeout=200; +/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000007-0000000000000007-01'*/ select * from pg_sleep(10); +ERROR: canceling statement due to statement timeout +/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000008-0000000000000008-01'*/ select 1; + ?column? +---------- + 1 +(1 row) + +SELECT trace_id, resource, sql_error_code from pg_tracing_spans order by span_start; + trace_id | resource | sql_error_code +----------+-----------------------------+---------------- + 7 | Parse | 00000 + 7 | select * from pg_sleep($1); | 57014 + 7 | Planner | 00000 + 7 | Start | 00000 + 7 | Run | 57014 + 7 | FunctionScan on pg_sleep | 57014 + 8 | Parse | 00000 + 8 | select $1; | 00000 + 8 | Planner | 00000 + 8 | Start | 00000 + 8 | Run | 00000 + 8 | Result | 00000 + 8 | Finish | 00000 + 8 | End | 00000 +(14 rows) + +-- Test prepared statement +PREPARE test_prepared (text, integer) AS /*$1*/ SELECT 1; +EXECUTE test_prepared('dddbs=''postgres.db'',traceparent=''00-00000000000000000000000000000009-0000000000000009-01''', 1); + ?column? +---------- + 1 +(1 row) + +SELECT trace_id, resource from pg_tracing_spans order by span_start; + trace_id | resource +----------+---------------------------------------------------- + 9 | PREPARE test_prepared (text, integer) AS SELECT 1; + 9 | Planner + 9 | Start + 9 | Run + 9 | Result + 9 | Finish + 9 | End +(7 rows) + +-- Test prepared statement with generic plan +SET plan_cache_mode='force_generic_plan'; +EXECUTE test_prepared('dddbs=''postgres.db'',traceparent=''00-00000000000000000000000000000010-0000000000000010-01''', 1); + ?column? +---------- + 1 +(1 row) + +SELECT trace_id, resource from pg_tracing_spans order by span_start; + trace_id | resource +----------+---------------------------------------------------- + 16 | PREPARE test_prepared (text, integer) AS SELECT 1; + 16 | Start + 16 | Run + 16 | Result + 16 | Finish + 16 | End +(6 rows) + +DROP function test_function; diff --git a/contrib/pg_tracing/explain.c b/contrib/pg_tracing/explain.c new file mode 100644 index 0000000000..321aedf0c4 --- /dev/null +++ b/contrib/pg_tracing/explain.c @@ -0,0 +1,433 @@ +/*------------------------------------------------------------------------- + * + * pg_tracing.c + * + * + * Copyright (c) 2023, PostgreSQL Global Development Group + * + * IDENTIFICATION + * contrib/pg_tracing/pg_tracing.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "explain.h" + +#include "utils/builtins.h" +#include "parser/parsetree.h" +#include "nodes/extensible.h" +#include "utils/lsyscache.h" +#include "utils/ruleutils.h" + + +static void +ExplainTargetRel(const PlanState *planstate, const Plan *plan, Index rti, StringInfo *str) +{ + char *objectname = NULL; + char *namespace = NULL; + RangeTblEntry *rte; + char *refname; + + List *rtable_names; + List *rtable; + + rtable = planstate->state->es_range_table; + rte = rt_fetch(rti, rtable); + rtable_names = select_rtable_names_for_explain(rtable, NULL); + + refname = (char *) list_nth(rtable_names, rti - 1); + if (refname == NULL) + refname = rte->eref->aliasname; + + switch (nodeTag(plan)) + { + case T_SeqScan: + case T_SampleScan: + case T_IndexScan: + case T_IndexOnlyScan: + case T_BitmapHeapScan: + case T_TidScan: + case T_TidRangeScan: + case T_ForeignScan: + case T_CustomScan: + case T_ModifyTable: + /* Assert it's on a real relation */ + Assert(rte->rtekind == RTE_RELATION); + objectname = get_rel_name(rte->relid); + break; + case T_FunctionScan: + { + FunctionScan *fscan = (FunctionScan *) plan; + + /* Assert it's on a RangeFunction */ + Assert(rte->rtekind == RTE_FUNCTION); + + /* + * If the expression is still a function call of a single + * function, we can get the real name of the function. + * Otherwise, punt. (Even if it was a single function call + * originally, the optimizer could have simplified it away.) + */ + if (list_length(fscan->functions) == 1) + { + RangeTblFunction *rtfunc = (RangeTblFunction *) linitial(fscan->functions); + + if (IsA(rtfunc->funcexpr, FuncExpr)) + { + FuncExpr *funcexpr = (FuncExpr *) rtfunc->funcexpr; + Oid funcid = funcexpr->funcid; + + objectname = get_func_name(funcid); + } + } + } + break; + case T_TableFuncScan: + Assert(rte->rtekind == RTE_TABLEFUNC); + objectname = "xmltable"; + break; + case T_ValuesScan: + Assert(rte->rtekind == RTE_VALUES); + break; + case T_CteScan: + /* Assert it's on a non-self-reference CTE */ + Assert(rte->rtekind == RTE_CTE); + Assert(!rte->self_reference); + objectname = rte->ctename; + break; + case T_NamedTuplestoreScan: + Assert(rte->rtekind == RTE_NAMEDTUPLESTORE); + objectname = rte->enrname; + break; + case T_WorkTableScan: + /* Assert it's on a self-reference CTE */ + Assert(rte->rtekind == RTE_CTE); + Assert(rte->self_reference); + objectname = rte->ctename; + break; + default: + break; + } + + appendStringInfoString(*str, " on"); + if (namespace != NULL) + appendStringInfo(*str, " %s.%s", quote_identifier(namespace), + quote_identifier(objectname)); + else if (objectname != NULL) + appendStringInfo(*str, " %s", quote_identifier(objectname)); + if (objectname == NULL || strcmp(refname, objectname) != 0) + appendStringInfo(*str, " %s", quote_identifier(refname)); +} + + +static void +ExplainScanTarget(const PlanState *planstate, const Scan *plan, StringInfo *str) +{ + ExplainTargetRel(planstate, (Plan *) plan, plan->scanrelid, str); +} + +static void +ExplainModifyTarget(const PlanState *planstate, const ModifyTable *plan, StringInfo *str) +{ + ExplainTargetRel(planstate, (Plan *) plan, plan->nominalRelation, str); +} + +static void +ExplainIndexScanDetails(Oid indexId, ScanDirection indexorderdir, + StringInfo *str) +{ + const char *indexname = get_rel_name(indexId); + + if (ScanDirectionIsBackward(indexorderdir)) + appendStringInfoString(*str, " Backward"); + appendStringInfo(*str, " using %s", quote_identifier(indexname)); +} + +static void +plan_to_rel_name(const PlanState *planstate, const Plan *plan, StringInfo *str) +{ + switch (nodeTag(plan)) + { + case T_SeqScan: + case T_SampleScan: + case T_BitmapHeapScan: + case T_TidScan: + case T_TidRangeScan: + case T_SubqueryScan: + case T_FunctionScan: + case T_TableFuncScan: + case T_ValuesScan: + case T_CteScan: + case T_WorkTableScan: + ExplainScanTarget(planstate, (Scan *) plan, str); + break; + case T_ForeignScan: + case T_CustomScan: + if (((Scan *) plan)->scanrelid > 0) + ExplainScanTarget(planstate, (Scan *) plan, str); + break; + case T_IndexScan: + { + IndexScan *indexscan = (IndexScan *) plan; + + ExplainIndexScanDetails(indexscan->indexid, + indexscan->indexorderdir, + str); + ExplainScanTarget(planstate, (Scan *) indexscan, str); + } + break; + case T_IndexOnlyScan: + { + IndexOnlyScan *indexonlyscan = (IndexOnlyScan *) plan; + + ExplainIndexScanDetails(indexonlyscan->indexid, + indexonlyscan->indexorderdir, + str); + ExplainScanTarget(planstate, (Scan *) indexonlyscan, str); + } + break; + case T_BitmapIndexScan: + { + BitmapIndexScan *bitmapindexscan = (BitmapIndexScan *) plan; + const char *indexname = get_rel_name(bitmapindexscan->indexid); + + appendStringInfo(*str, " on %s", + quote_identifier(indexname)); + } + break; + case T_ModifyTable: + ExplainModifyTarget(planstate, (ModifyTable *) plan, str); + break; + case T_NestLoop: + case T_MergeJoin: + case T_HashJoin: + { + const char *jointype; + + switch (((Join *) plan)->jointype) + { + case JOIN_INNER: + jointype = "Inner"; + break; + case JOIN_LEFT: + jointype = "Left"; + break; + case JOIN_FULL: + jointype = "Full"; + break; + case JOIN_RIGHT: + jointype = "Right"; + break; + case JOIN_SEMI: + jointype = "Semi"; + break; + case JOIN_ANTI: + jointype = "Anti"; + break; + case JOIN_RIGHT_ANTI: + jointype = "Right Anti"; + break; + default: + jointype = "???"; + break; + } + if (((Join *) plan)->jointype != JOIN_INNER) + appendStringInfo(*str, " %s Join", jointype); + else if (!IsA(plan, NestLoop)) + appendStringInfoString(*str, " Join"); + } + break; + case T_SetOp: + { + const char *setopcmd; + + switch (((SetOp *) plan)->cmd) + { + case SETOPCMD_INTERSECT: + setopcmd = "Intersect"; + break; + case SETOPCMD_INTERSECT_ALL: + setopcmd = "Intersect All"; + break; + case SETOPCMD_EXCEPT: + setopcmd = "Except"; + break; + case SETOPCMD_EXCEPT_ALL: + setopcmd = "Except All"; + break; + default: + setopcmd = "???"; + break; + } + appendStringInfo(*str, " %s", setopcmd); + } + break; + default: + break; + } +} + +char const * +plan_to_operation(const PlanState *planstate, const char *spanName) +{ + StringInfo operation_name = makeStringInfo(); + Plan const *plan = planstate->plan; + + if (plan->parallel_aware) + appendStringInfoString(operation_name, "Parallel "); + if (plan->async_capable) + appendStringInfoString(operation_name, "Async "); + appendStringInfoString(operation_name, spanName); + + plan_to_rel_name(planstate, plan, &operation_name); + return operation_name->data; +} + +const char * +plan_to_span_name(const Plan *plan) +{ + const char *custom_name; + + switch (nodeTag(plan)) + { + case T_Result: + return "Result"; + case T_ProjectSet: + return "ProjectSet"; + case T_ModifyTable: + switch (((ModifyTable *) plan)->operation) + { + case CMD_INSERT: + return "Insert"; + case CMD_UPDATE: + return "Update"; + case CMD_DELETE: + return "Delete"; + case CMD_MERGE: + return "Merge"; + default: + return "???"; + } + case T_Append: + return "Append"; + case T_MergeAppend: + return "MergeAppend"; + case T_RecursiveUnion: + return "RecursiveUnion"; + case T_BitmapAnd: + return "BitmapAnd"; + case T_BitmapOr: + return "BitmapOr"; + case T_NestLoop: + return "NestedLoop"; + case T_MergeJoin: + return "Merge"; /* "Join" gets added by jointype switch */ + case T_HashJoin: + return "Hash"; /* "Join" gets added by jointype switch */ + case T_SeqScan: + return "SeqScan"; + case T_SampleScan: + return "SampleScan"; + case T_Gather: + return "Gather"; + case T_GatherMerge: + return "GatherMerge"; + case T_IndexScan: + return "IndexScan"; + case T_IndexOnlyScan: + return "IndexOnlyScan"; + case T_BitmapIndexScan: + return "BitmapIndexScan"; + case T_BitmapHeapScan: + return "BitmapHeapScan"; + case T_TidScan: + return "TidScan"; + case T_TidRangeScan: + return "TidRangeScan"; + case T_SubqueryScan: + return "SubqueryScan"; + case T_FunctionScan: + return "FunctionScan"; + case T_TableFuncScan: + return "TableFunctionScan"; + case T_ValuesScan: + return "ValuesScan"; + case T_CteScan: + return "CTEScan"; + case T_NamedTuplestoreScan: + return "NamedTuplestoreScan"; + case T_WorkTableScan: + return "WorkTableScan"; + case T_ForeignScan: + switch (((ForeignScan *) plan)->operation) + { + case CMD_SELECT: + return "ForeignScan"; + case CMD_INSERT: + return "ForeignInsert"; + case CMD_UPDATE: + return "ForeignUpdate"; + case CMD_DELETE: + return "ForeignDelete"; + default: + return "???"; + } + case T_CustomScan: + custom_name = ((CustomScan *) plan)->methods->CustomName; + if (custom_name) + return psprintf("CustomScan (%s)", custom_name); + else + return "CustomScan"; + case T_Material: + return "Materialize"; + case T_Memoize: + return "Memoize"; + case T_Sort: + return "Sort"; + case T_IncrementalSort: + return "IncrementalSort"; + case T_Group: + return "Group"; + case T_Agg: + { + Agg *agg = (Agg *) plan; + + switch (agg->aggstrategy) + { + case AGG_PLAIN: + return "Aggregate"; + case AGG_SORTED: + return "GroupAggregate"; + case AGG_HASHED: + return "HashAggregate"; + case AGG_MIXED: + return "MixedAggregate"; + default: + return "Aggregate ???"; + } + } + case T_WindowAgg: + return "WindowAgg"; + case T_Unique: + return "Unique"; + case T_SetOp: + switch (((SetOp *) plan)->strategy) + { + case SETOP_SORTED: + return "SetOp"; + case SETOP_HASHED: + return "HashSetOp"; + default: + return "SetOp ???"; + } + case T_LockRows: + return "LockRows"; + case T_Limit: + return "Limit"; + case T_Hash: + return "Hash"; + default: + return "???"; + } +} diff --git a/contrib/pg_tracing/explain.h b/contrib/pg_tracing/explain.h new file mode 100644 index 0000000000..d65abbd42d --- /dev/null +++ b/contrib/pg_tracing/explain.h @@ -0,0 +1,10 @@ +#ifndef _EXPLAIN_H_ +#define _EXPLAIN_H_ + +#include "span.h" +#include "executor/execdesc.h" + +const char *plan_to_span_name(const Plan *plan); +const char *plan_to_operation(const PlanState *planstate, const char *spanName); + +#endif diff --git a/contrib/pg_tracing/meson.build b/contrib/pg_tracing/meson.build new file mode 100644 index 0000000000..d96ae28c9d --- /dev/null +++ b/contrib/pg_tracing/meson.build @@ -0,0 +1,39 @@ +# Copyright (c) 2022-2023, PostgreSQL Global Development Group + +pg_tracing_sources = files( + 'pg_tracing.c', + 'query_process.c', + 'explain.c', + 'span.c', +) + +if host_system == 'windows' + pg_tracing_sources += rc_lib_gen.process(win32ver_rc, extra_args: [ + '--NAME', 'pg_tracing', + '--FILEDESC', 'pg_tracing - Add support for distributed tracing',]) +endif + +pg_tracing = shared_module('pg_tracing', + pg_tracing_sources, + kwargs: contrib_mod_args, +) +contrib_targets += pg_tracing + +install_data( + 'pg_tracing.control', + 'pg_tracing--1.0.sql', + kwargs: contrib_data_args, +) + +tests += { + 'name': 'pg_tracing', + 'sd': meson.current_source_dir(), + 'bd': meson.current_build_dir(), + 'regress': { + 'sql': [ + 'select', + 'reset' + ], + 'regress_args': ['--temp-config', files('pg_tracing.conf')], + }, +} diff --git a/contrib/pg_tracing/pg_tracing--1.0.sql b/contrib/pg_tracing/pg_tracing--1.0.sql new file mode 100644 index 0000000000..507a8bd509 --- /dev/null +++ b/contrib/pg_tracing/pg_tracing--1.0.sql @@ -0,0 +1,88 @@ +/* contrib/pg_tracing/pg_tracing--1.0.sql */ + +-- complain if script is sourced in psql, rather than via ALTER EXTENSION +\echo Use "CREATE EXTENSION pg_tracing" to load this file. \quit + +--- Define pg_tracing_info +CREATE FUNCTION pg_tracing_info( + OUT traces bigint, + OUT spans bigint, + OUT dropped_spans bigint, + OUT last_consume timestamp with time zone, + OUT stats_reset timestamp with time zone +) +RETURNS record +AS 'MODULE_PATHNAME' +LANGUAGE C STRICT VOLATILE PARALLEL SAFE; + +CREATE FUNCTION pg_tracing_reset() +RETURNS void +AS 'MODULE_PATHNAME' +LANGUAGE C PARALLEL SAFE; + +CREATE FUNCTION pg_tracing_spans( + IN consume bool, + OUT trace_id bigint, + OUT parent_id bigint, + OUT span_id bigint, + OUT name text, + OUT resource text, + OUT span_start timestamp with time zone, + OUT duration bigint, + OUT sql_error_code character(5), + OUT pid int4, + +-- Plan counters + OUT startup_cost float8, + OUT total_cost float8, + OUT plan_rows float8, + OUT plan_width int, + +-- Node Counters + OUT rows int8, + OUT nloops int8, + + OUT shared_blks_hit int8, + OUT shared_blks_read int8, + OUT shared_blks_dirtied int8, + OUT shared_blks_written int8, + + OUT local_blks_hit int8, + OUT local_blks_read int8, + OUT local_blks_dirtied int8, + OUT local_blks_written int8, + + OUT blk_read_time float8, + OUT blk_write_time float8, + + OUT temp_blks_read int8, + OUT temp_blks_written int8, + OUT temp_blk_read_time float8, + OUT temp_blk_write_time float8, + + OUT wal_records int8, + OUT wal_fpi int8, + OUT wal_bytes numeric, + + OUT jit_functions int8, + OUT jit_generation_time float8, + OUT jit_inlining_time float8, + OUT jit_optimization_time float8, + OUT jit_emission_time float8, + +-- SpanNode specific data + OUT startup bigint, -- First tuple + OUT parameters text +) +RETURNS SETOF record +AS 'MODULE_PATHNAME' +LANGUAGE C PARALLEL SAFE; + +CREATE VIEW pg_tracing_info AS + SELECT * FROM pg_tracing_info(); + +CREATE VIEW pg_tracing_spans AS + SELECT * FROM pg_tracing_spans(true); + +GRANT SELECT ON pg_tracing_info TO PUBLIC; +GRANT SELECT ON pg_tracing_spans TO PUBLIC; diff --git a/contrib/pg_tracing/pg_tracing.c b/contrib/pg_tracing/pg_tracing.c new file mode 100644 index 0000000000..9938501dd8 --- /dev/null +++ b/contrib/pg_tracing/pg_tracing.c @@ -0,0 +1,1381 @@ +/*------------------------------------------------------------------------- + * + * pg_tracing.c + * Generate spans for distributed tracing from SQL query + * + * We rely on the caller to know whether the query needs to be traced or not. + * The information is propagated through https://google.github.io/sqlcommenter/. + * + * A query with sqlcommenter will look like: /\*dddbs='postgres.db',traceparent='00-00000000000000000000000000000009-0000000000000005-01'*\/ select 1; + * The traceparent fields are detailed in https://www.w3.org/TR/trace-context/#traceparent-header-field-values + * 00000000000000000000000000000009: trace id + * 0000000000000005: parent id + * 01: trace flags (01 == sampled) + * + * If sampled is set in the trace flags, we will generate spans for the ongoing query. + * A span represents an operation with a start and a duration. + * We will track the following operations: + * - Query Span: The top span for a query. They are created after extracting the traceid from traceparent or to represent a nested query. + * - Planner: We track the time spent in the planner and report the planner counters + * - Node Span: Created from planstate. The name is extracted from the node type (IndexScan, SeqScan), + * - Executor: We trace the different steps of the Executor: Start, Run, Finish and End + * + * A typical traced query will generate the following spans: + * +------------------------------------------------------------------------------------------------------------------------------------------------------+ + * | Name: Select | + * | Operation: Select * pgbench_accounts WHERE aid=$1; | + * +---+------------------------+-+------------------+--+------------------------------------------------------+-+--------------------+-+----------------++ + * | Name: Planner | | Name: Executor | |Name: Executor | | Name: Executor | | Name: Executor | + * | Operation: Planner | | Operation: Start | |Operation: Run | | Operation: Finish | | Operation: End | + * +------------------------+ +------------------+ +--+--------------------------------------------------++ +--------------------+ +----------------+ + * | Name: IndexScan | + * | Operation: IndexScan using pgbench_accounts_pkey | + * | on pgbench_accounts | + * +--------------------------------------------------+ + * + * IDENTIFICATION + * contrib/pg_tracing/pg_tracing.c + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "explain.h" +#include "pg_tracing.h" +#include "query_process.h" +#include "span.h" + +#include "access/xact.h" +#include "common/pg_prng.h" +#include "funcapi.h" +#include "nodes/extensible.h" +#include "optimizer/planner.h" +#include "parser/analyze.h" +#include "storage/ipc.h" +#include "tcop/utility.h" +#include "utils/builtins.h" +#include "utils/lsyscache.h" +#include "utils/xid8.h" + +PG_MODULE_MAGIC; + +typedef enum +{ + PG_TRACING_TRACK_NONE, /* track no statements */ + PG_TRACING_TRACK_TOP, /* only top level statements */ + PG_TRACING_TRACK_ALL /* all statements, including nested ones */ +} PGTracingTrackLevel; + +/* GUC variables */ +static int pg_tracing_max_span; +static int pg_tracing_max_parameter_str; +static double pg_tracing_sample_rate = 1; +static int pg_tracing_track = PG_TRACING_TRACK_ALL; + +static const struct config_enum_entry track_options[] = +{ + {"none", PG_TRACING_TRACK_NONE, false}, + {"top", PG_TRACING_TRACK_TOP, false}, + {"all", PG_TRACING_TRACK_ALL, false}, + {NULL, 0, false} +}; + +#define pg_tracing_enabled(level) \ + (!IsParallelWorker() && \ + (pg_tracing_track == PG_TRACING_TRACK_ALL || \ + (pg_tracing_track == PG_TRACING_TRACK_TOP && (level) == 0))) + + +PG_FUNCTION_INFO_V1(pg_tracing_info); +PG_FUNCTION_INFO_V1(pg_tracing_spans); +PG_FUNCTION_INFO_V1(pg_tracing_reset); + +/* + * Current trace ids extracted from sqlcomment's traceparent + */ +struct pgTracingTraceparentParameter traceparent_parameter; + +/* + * Number of allocated query_spans and executor_ids If we have nested + * queries, we will need to extend those fields + */ +static int allocated_nested_level = 0; + +/* + * Currently tracked query spans by nested level + */ +static Span * query_spans; + +/* + * Span ids of executor run spans by nested level Executor run is used as + * parent for spans generated from planstate + */ +static uint64 *executor_ids; + +/* + * Shared state with stats and file external state + */ +static pgTracingSharedState * pg_tracing = NULL; + +/* + * Shared state storing spans Query with sampled flag will add new spans to + * the shared state Those spans will be consumed during calls to + * pg_tracing_spans + */ +static pgTracingSharedSpans * shared_spans = NULL; + +/* + * Current nesting depth of ExecutorRun+ProcessUtility calls + */ +static int exec_nested_level = 0; + +/* + * Maximum nested level for a query to know how many query spans we need to + * copy in shared_spans + */ +static int max_nested_level = 0; + +/* + * Current nesting depth of planner calls + */ +static int plan_nested_level = 0; + +/* + * Start time of the begining of the trace + */ +static TimestampTz start_query_ts; +static instr_time start_query; + +static void pg_tracing_shmem_request(void); +static void pg_tracing_shmem_startup(void); + +/* Saved hook values in case of unload */ +static shmem_request_hook_type prev_shmem_request_hook = NULL; +static shmem_startup_hook_type prev_shmem_startup_hook = NULL; +static post_parse_analyze_hook_type prev_post_parse_analyze_hook = NULL; +static planner_hook_type prev_planner_hook = NULL; +static ExecutorStart_hook_type prev_ExecutorStart = NULL; +static ExecutorRun_hook_type prev_ExecutorRun = NULL; +static ExecutorFinish_hook_type prev_ExecutorFinish = NULL; +static ExecutorEnd_hook_type prev_ExecutorEnd = NULL; +static ProcessUtility_hook_type prev_ProcessUtility = NULL; + +static void pg_tracing_post_parse_analyze(ParseState *pstate, Query *query, JumbleState *jstate); +static PlannedStmt *pg_tracing_planner_hook(Query *parse, + const char *query_string, + int cursorOptions, + ParamListInfo boundParams); +static void pg_tracing_ExecutorStart(QueryDesc *queryDesc, int eflags); +static void pg_tracing_ExecutorRun(QueryDesc *queryDesc, + ScanDirection direction, + uint64 count, bool execute_once); +static void pg_tracing_ExecutorFinish(QueryDesc *queryDesc); +static void pg_tracing_ExecutorEnd(QueryDesc *queryDesc); +static void pg_tracing_ProcessUtility(PlannedStmt *pstmt, const char *queryString, + bool readOnlyTree, + ProcessUtilityContext context, ParamListInfo params, + QueryEnvironment *queryEnv, + DestReceiver *dest, QueryCompletion *qc); + +static void generate_member_nodes(PlanState **planstates, int nplans, + uint64 trace_id, uint64 parent_id, int sql_error_code); +static void generate_span_from_planstate(PlanState *planstate, + uint64 trace_id, uint64 parent_id, int sql_error_code); + +static MemoryContext pg_tracing_mem_ctx; + +#define PG_TRACING_INFO_COLS 5 +#define PG_TRACING_TRACES_COLS 39 + + +/* + * Module load callback + */ +void +_PG_init(void) +{ + if (!process_shared_preload_libraries_in_progress) + return; + + DefineCustomIntVariable("pg_tracing.max_span", + "Maximum number of spans stored in shared memory.", + NULL, + &pg_tracing_max_span, + 1000, + 0, + 20000, + PGC_SUSET, + 0, + NULL, + NULL, + NULL); + + DefineCustomIntVariable("pg_tracing.max_parameter_size", + "Maximum size of parameters. -1 to disable parameter in query span.", + NULL, + &pg_tracing_max_parameter_str, + 1024, + 0, + 10000, + PGC_SUSET, + 0, + NULL, + NULL, + NULL); + + DefineCustomEnumVariable("pg_tracing.track", + "Selects which statements are tracked by pg_tracing.", + NULL, + &pg_tracing_track, + PG_TRACING_TRACK_ALL, + track_options, + PGC_SUSET, + 0, + NULL, + NULL, + NULL); + + DefineCustomRealVariable("pg_tracing.sample_rate", + "Fraction of queries to process.", + NULL, + &pg_tracing_sample_rate, + 1.0, + 0.0, + 1.0, + PGC_SUSET, + 0, + NULL, + NULL, + NULL); + + MarkGUCPrefixReserved("pg_tracing"); + + /* For jumble state */ + EnableQueryId(); + + /* Install hooks. */ + prev_shmem_request_hook = shmem_request_hook; + shmem_request_hook = pg_tracing_shmem_request; + + prev_shmem_startup_hook = shmem_startup_hook; + shmem_startup_hook = pg_tracing_shmem_startup; + + prev_post_parse_analyze_hook = post_parse_analyze_hook; + post_parse_analyze_hook = pg_tracing_post_parse_analyze; + + prev_planner_hook = planner_hook; + planner_hook = pg_tracing_planner_hook; + + prev_ExecutorStart = ExecutorStart_hook; + ExecutorStart_hook = pg_tracing_ExecutorStart; + prev_ExecutorRun = ExecutorRun_hook; + ExecutorRun_hook = pg_tracing_ExecutorRun; + prev_ExecutorFinish = ExecutorFinish_hook; + ExecutorFinish_hook = pg_tracing_ExecutorFinish; + prev_ExecutorEnd = ExecutorEnd_hook; + ExecutorEnd_hook = pg_tracing_ExecutorEnd; + + prev_ProcessUtility = ProcessUtility_hook; + ProcessUtility_hook = pg_tracing_ProcessUtility; +} + +/* + * shmem_request hook: request additional shared resources. We'll allocate + * or attach to the shared resources in pgss_shmem_startup(). + */ +static void +pg_tracing_shmem_request(void) +{ + Size memsize; + + if (prev_shmem_request_hook) + prev_shmem_request_hook(); + memsize = MAXALIGN(sizeof(pgTracingSharedState)); + RequestAddinShmemSpace(memsize); +} + +/* + * shmem_startup hook: allocate or attach to shared memory, Also create and + * load the query-texts file, which is expected to exist (even if empty) + * while the module is enabled. + */ +static void +pg_tracing_shmem_startup(void) +{ + bool found; + + if (prev_shmem_startup_hook) + prev_shmem_startup_hook(); + + /* reset in case this is a restart within the postmaster */ + pg_tracing = NULL; + + /* + * Create or attach to the shared memory state + */ + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); + + pg_tracing = ShmemInitStruct("pg_tracing", sizeof(pgTracingSharedState), &found); + shared_spans = ShmemInitStruct("pg_tracing_spans", + sizeof(pgTracingSharedSpans) + pg_tracing_max_span * sizeof(Span), + &found); + + /* + * Initialise pg_tracing memory context + */ + pg_tracing_mem_ctx = AllocSetContextCreate(TopMemoryContext, + "pg_tracing memory context", + ALLOCSET_DEFAULT_SIZES); + + /* + * First time, let's init shared state + */ + if (!found) + { + shared_spans->end = 0; + pg_tracing->stats.traces = 0; + pg_tracing->stats.stats_reset = GetCurrentTimestamp(); + SpinLockInit(&pg_tracing->mutex); + SpinLockInit(&pg_tracing->file_mutex); + } + + LWLockRelease(AddinShmemInitLock); + + /* + * Done if some other process already completed our initialization. + */ + if (found) + return; +} + +static void +add_span_to_shared_buffer(Span * span) +{ + volatile pgTracingSharedState *s = (volatile pgTracingSharedState *) pg_tracing; + + SpinLockAcquire(&s->mutex); + if (shared_spans->end + 1 == pg_tracing_max_span) + { + s->stats.dropped_spans++; + + /* + * Shared memory is full no need to keep sampling running + */ + traceparent_parameter.sampled = false; + } + else + { + s->stats.spans++; + shared_spans->spans[shared_spans->end++] = *span; + } + SpinLockRelease(&s->mutex); +} + +/* + * Add span to the shared memory This may fail if shared buffer is full + */ +static void +add_span(Span * span, const instr_time *end_time) +{ + set_span_duration_and_counters(span, end_time); + add_span_to_shared_buffer(span); +} + +/* + * Check if we still have available space in the shared spans. + * + * Between the moment we check and the moment we want to insert, the buffer + * may be full but we redo a check before appending the span. This is done + * early when starting a query span to bail out early if the buffer is + * already full since we don't immediately add the span in the shared buffer. + */ +static bool +check_full_shared_spans() +{ + if (shared_spans->end + 1 == pg_tracing_max_span) + { + volatile pgTracingSharedState *s = (volatile pgTracingSharedState *) pg_tracing; + + SpinLockAcquire(&s->mutex); + s->stats.dropped_spans++; + SpinLockRelease(&s->mutex); + return true; + } + return false; +} + +/* + * Process a query descriptor: Gather all query instrumentation in the query + * span counters and generate span nodes from queryDesc planstate + */ +static void +process_query_desc(QueryDesc *queryDesc, int sql_error_code) +{ + MemoryContext oldcxt; + uint64 parent_id = executor_ids[exec_nested_level]; + NodeCounters *node_counters = &query_spans[exec_nested_level].node_counters; + + oldcxt = MemoryContextSwitchTo(queryDesc->estate->es_query_cxt); + + /* Process total counters */ + if (queryDesc->totaltime) + { + InstrEndLoop(queryDesc->totaltime); + node_counters->buffer_usage = queryDesc->totaltime->bufusage; + node_counters->wal_usage = queryDesc->totaltime->walusage; + } + if (queryDesc->planstate) + { + generate_span_from_planstate(queryDesc->planstate, traceparent_parameter.trace_id, + parent_id, sql_error_code); + } + node_counters->rows = queryDesc->estate->es_total_processed; + if (queryDesc->estate->es_jit) + { + node_counters->jit_usage = queryDesc->estate->es_jit->instr; + } + MemoryContextSwitchTo(oldcxt); +} + +/* + * When we catch an error (timeout, cancel query), we need to flag the ongoing + * span with an error, send current spans in the shared buffer and clean + * our memory context + */ +static void +handle_pg_error(Span * span, QueryDesc *queryDesc) +{ + instr_time end; + int sql_error_code; + + /* + * If we're not sampling the query, bail out + */ + if (traceparent_parameter.sampled == 0 && !pg_tracing_enabled(exec_nested_level)) + { + return; + } + sql_error_code = geterrcode(); + span->sql_error_code = sql_error_code; + query_spans[exec_nested_level].sql_error_code = sql_error_code; + + /* + * Order matters there. We want to process query desc before getting the + * end time otherwise, the span nodes will have a higher duration than + * their parents + */ + if (queryDesc != NULL) + { + process_query_desc(queryDesc, sql_error_code); + } + INSTR_TIME_SET_CURRENT(end); + + add_span(span, &end); + + /* + * We 're at the end, add all spans to the shared memory + */ + if (exec_nested_level == 0) + { + for (int i = max_nested_level; i >= 0; i--) + { + add_span(&query_spans[i], &end); + } + } + + /* + * We can reset the memory context here + */ + MemoryContextReset(pg_tracing_mem_ctx); +} + +/* + * If we're at the end of the query, dump all of query spans in the shared + * memory. + */ +static void +end_tracing(instr_time end) +{ + /* + * We 're at the end, add all spans to the shared memory + */ + if (exec_nested_level == 0) + { + for (int i = max_nested_level; i >= 0; i--) + { + add_span(&query_spans[i], &end); + } + + /* + * We can reset the memory context here + */ + MemoryContextReset(pg_tracing_mem_ctx); + } +} + +/* + * Get the parent id for the given nested level + */ +static uint64 +get_parent_id(int nested_level) +{ + if (nested_level < 0) + { + return traceparent_parameter.parent_id; + } + Assert(nested_level <= allocated_nested_level); + return query_spans[nested_level].span_id; +} + +/* + * Start a new query span if we enter a new nested level The start of a query + * span can vary: prepared statement will skip parsing, the use of cached + * plans will skip the planner hook. + * Thus, a query span can start in either post parse, planner hook or executor run. + * + * Since this is called after the we've detected the start of a trace, we check for available + * space in the buffer. If the buffer is full, we abort tracing by setting sampled to false. + * Callers need to check that tracing was aborted. + */ +static void +start_query_span(CmdType commandType, const Query *query, const JumbleState *jstate, + const char *query_text) +{ + Span *current_top; + int query_len; + const char *normalised_query; + + /* + * Check if we've already created a query span for this nested level + */ + if (exec_nested_level <= max_nested_level) + { + return; + } + + if (check_full_shared_spans()) + { + /* Buffer is full, abort sampling */ + traceparent_parameter.sampled = false; + return; + } + + /* First time */ + if (max_nested_level == -1) + { + MemoryContext oldcxt; + + Assert(pg_tracing_mem_ctx->isReset); + + /* + * We need to be able to pass 2 informations that depend on the nested + * level: - executor span ids: Since an executor run becomes the + * parent span, we need subsequent created node spans to have the + * correct parent - query spans: We create one query span per nested + * level and those are only inserted in the shared buffer at the end + */ + oldcxt = MemoryContextSwitchTo(pg_tracing_mem_ctx); + /* initial allocation */ + allocated_nested_level = 1; + executor_ids = palloc0(allocated_nested_level * sizeof(uint64)); + query_spans = palloc0(allocated_nested_level * sizeof(Span)); + MemoryContextSwitchTo(oldcxt); + + /* Start referential timers */ + INSTR_TIME_SET_CURRENT(start_query); + start_query_ts = GetCurrentTimestamp(); + } + else if (exec_nested_level >= allocated_nested_level) + { + MemoryContext oldcxt; + + oldcxt = MemoryContextSwitchTo(pg_tracing_mem_ctx); + allocated_nested_level += 2; + executor_ids = repalloc(executor_ids, allocated_nested_level * sizeof(uint64)); + query_spans = repalloc(query_spans, allocated_nested_level * sizeof(Span)); + MemoryContextSwitchTo(oldcxt); + } + + max_nested_level = exec_nested_level; + if (exec_nested_level == 0) + { + /* Start of a new trace */ + volatile pgTracingSharedState *s = (volatile pgTracingSharedState *) pg_tracing; + + SpinLockAcquire(&s->mutex); + s->stats.traces++; + SpinLockRelease(&s->mutex); + } + + current_top = query_spans + exec_nested_level; + begin_span(current_top, command_type_to_span_type(commandType), + traceparent_parameter.trace_id, get_parent_id(exec_nested_level - 1), + start_query_ts, start_query, NULL); + if (jstate && jstate->clocations_count > 0 && query != NULL) + { + char *paramStr; + + query_len = query->stmt_len; + Assert(query_len > 0); + normalised_query = normalise_query_parameters(jstate, query_text, + query->stmt_location, + &query_len, ¶mStr); + text_store(pg_tracing, paramStr, strlen(paramStr), ¤t_top->parameter_offset); + } + else + { + /* + * No jstate available + */ + query_len = strlen(query_text); + normalised_query = normalise_query(query_text, &query_len); + } + text_store(pg_tracing, normalised_query, query_len, + ¤t_top->operation_name_offset); +} + +/* + * Post-parse-analysis hook: Extract traceparent parameters from the SQLCommenter + * at the start of the query. If we detect a sampled query then start the span. + */ +static void +pg_tracing_post_parse_analyze(ParseState *pstate, Query *query, JumbleState *jstate) +{ + Span parse_span; + + if (prev_post_parse_analyze_hook) + prev_post_parse_analyze_hook(pstate, query, jstate); + + /* Safety check... */ + if (!pg_tracing || !pg_tracing_enabled(exec_nested_level)) + return; + + if (exec_nested_level == 0) + { + max_nested_level = -1; + traceparent_parameter = extract_traceparent(pstate->p_sourcetext, false); + if (traceparent_parameter.sampled == 1 && pg_tracing_sample_rate < 1.0) + { + /* + * We've parsed a query with sampled, let's apply our sample rate + */ + traceparent_parameter.sampled = (pg_prng_double(&pg_global_prng_state) < pg_tracing_sample_rate); + } + } + + if (traceparent_parameter.sampled == 0) + { + /* + * Either there was no SQLCommenter or the query had sampled=00 + */ + return; + } + + /* + * Either we're inside a nested sampled query or we've parsed a query with + * the sampled flag, start a new query span + */ + start_query_span(query->commandType, query, jstate, + pstate->p_sourcetext); + + /* + * We can deduct the time spent parsing by relying on stmtStartTimestamp + */ + initialize_span_fields(&parse_span, SPAN_PARSE, traceparent_parameter.trace_id, + traceparent_parameter.parent_id); + parse_span.start = GetCurrentStatementStartTimestamp(); + parse_span.duration.ticks = (start_query_ts - parse_span.start) * NS_PER_US; + add_span_to_shared_buffer(&parse_span); +} + +/* + * If the query was started as an prepared statement, we won't be able to + * extract traceparent during query parsing since parsing was skipped + * We assume that SQLCommenter content can be passed as a text in the + * first parameter + */ +static void +check_traceparameter_in_parameter(ParamListInfo boundParams) +{ + if (traceparent_parameter.sampled == 0 && pg_tracing_enabled(plan_nested_level + exec_nested_level)) + { + if (boundParams && boundParams->numParams > 0) + { + Oid typoutput; + bool typisvarlena; + char *pstring; + ParamExternData param; + + param = boundParams->params[0]; + if (param.ptype == TEXTOID) + { + getTypeOutputInfo(param.ptype, &typoutput, &typisvarlena); + pstring = OidOutputFunctionCall(typoutput, param.value); + traceparent_parameter = extract_traceparent(pstring, true); + } + } + } +} + +/* + * Planner hook: forward to regular planner, but measure planning time if + * needed. + */ +static PlannedStmt * +pg_tracing_planner_hook(Query *parse, + const char *query_string, + int cursorOptions, + ParamListInfo boundParams) +{ + PlannedStmt *result; + Span span; + Span *current_top; + + check_traceparameter_in_parameter(boundParams); + if (traceparent_parameter.sampled > 0 && pg_tracing_enabled(plan_nested_level + exec_nested_level)) + { + /* + * We may have skipped parsing if statement was prepared, start a new + * query span if we don't have one + */ + start_query_span(parse->commandType, parse, NULL, query_string); + + /* + * Recheck for sampled as starting query span may have failed due to + * full buffer + */ + if (traceparent_parameter.sampled == false) + { + goto fallback; + } + current_top = query_spans + exec_nested_level; + + begin_span(&span, SPAN_PLANNER, traceparent_parameter.trace_id, + get_parent_id(exec_nested_level), + start_query_ts, start_query, NULL); + + plan_nested_level++; + PG_TRY(); + { + if (prev_planner_hook) + result = prev_planner_hook(parse, query_string, cursorOptions, + boundParams); + else + result = standard_planner(parse, query_string, cursorOptions, + boundParams); + } + PG_CATCH(); + { + plan_nested_level--; + handle_pg_error(&span, NULL); + PG_RE_THROW(); + } + PG_END_TRY(); + plan_nested_level--; + + add_span(&span, NULL); + + /* + * If we have a prepared statement, add bound parameters to the query + * span + */ + if (boundParams != NULL) + { + char *paramStr = BuildParamLogString(boundParams, + NULL, pg_tracing_max_parameter_str); + + Assert(current_top->parameter_offset == -1); + if (paramStr != NULL) + { + text_store(pg_tracing, paramStr, strlen(paramStr), ¤t_top->parameter_offset); + } + } + } + else + { +fallback: + if (prev_planner_hook) + result = prev_planner_hook(parse, query_string, cursorOptions, + boundParams); + else + result = standard_planner(parse, query_string, cursorOptions, + boundParams); + } + + return result; +} + + +/* + * ExecutorStart hook: start up logging if needed + */ +static void +pg_tracing_ExecutorStart(QueryDesc *queryDesc, int eflags) +{ + Span span; + + check_traceparameter_in_parameter(queryDesc->params); + if (traceparent_parameter.sampled > 0 && pg_tracing_enabled(exec_nested_level)) + { + /* + * In case of a cached plan, we haven't gone through neither parsing + * nor planner hook. + */ + start_query_span(queryDesc->operation, NULL, NULL, queryDesc->sourceText); + + begin_span(&span, SPAN_EXECUTOR_START, traceparent_parameter.trace_id, get_parent_id(exec_nested_level), + start_query_ts, start_query, NULL); + + /* + * Activate query instrumentation to get timing, rows, buffers and WAL + * usage + */ + queryDesc->instrument_options = INSTRUMENT_ALL; + } + + if (prev_ExecutorStart) + prev_ExecutorStart(queryDesc, eflags); + else + standard_ExecutorStart(queryDesc, eflags); + + if (traceparent_parameter.sampled > 0 && pg_tracing_enabled(exec_nested_level)) + { + /* Allocate instrumentation */ + if (queryDesc->totaltime == NULL) + { + MemoryContext oldcxt; + + oldcxt = MemoryContextSwitchTo(queryDesc->estate->es_query_cxt); + queryDesc->totaltime = InstrAlloc(1, INSTRUMENT_ALL, false); + queryDesc->planstate->instrument = InstrAlloc(1, INSTRUMENT_ALL, false); + MemoryContextSwitchTo(oldcxt); + } + + add_span(&span, NULL); + } +} + +/* + * ExecutorRun hook: all we need do is track nesting depth + * and create executor run span + */ +static void +pg_tracing_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, + uint64 count, bool execute_once) +{ + Span span; + bool tracing = traceparent_parameter.sampled != 0 && pg_tracing_enabled(exec_nested_level); + + if (tracing) + { + begin_span(&span, SPAN_EXECUTOR_RUN, traceparent_parameter.trace_id, get_parent_id(exec_nested_level), + start_query_ts, start_query, NULL); + + /* + * Executor run is used as the parent's span. + */ + executor_ids[exec_nested_level] = span.span_id; + } + + exec_nested_level++; + PG_TRY(); + { + if (prev_ExecutorRun) + prev_ExecutorRun(queryDesc, direction, count, execute_once); + else + standard_ExecutorRun(queryDesc, direction, count, execute_once); + } + PG_CATCH(); + { + exec_nested_level--; + handle_pg_error(&span, queryDesc); + PG_RE_THROW(); + } + PG_END_TRY(); + exec_nested_level--; + + if (tracing) + { + add_span(&span, NULL); + } +} + +/* + * ExecutorFinish hook: all we need do is track nesting depth + */ +static void +pg_tracing_ExecutorFinish(QueryDesc *queryDesc) +{ + Span span; + + if (traceparent_parameter.sampled != 0) + { + begin_span(&span, SPAN_EXECUTOR_FINISH, traceparent_parameter.trace_id, get_parent_id(exec_nested_level), + start_query_ts, start_query, NULL); + } + + exec_nested_level++; + PG_TRY(); + { + if (prev_ExecutorFinish) + prev_ExecutorFinish(queryDesc); + else + standard_ExecutorFinish(queryDesc); + } + PG_CATCH(); + { + exec_nested_level--; + handle_pg_error(&span, queryDesc); + PG_RE_THROW(); + } + PG_END_TRY(); + exec_nested_level--; + + if (traceparent_parameter.sampled != 0) + { + add_span(&span, NULL); + } +} + +/* + * ExecutorEnd hook: process queryDesc and end tracing if we're at nested + * level 0 + */ +static void +pg_tracing_ExecutorEnd(QueryDesc *queryDesc) +{ + Span span; + + if (traceparent_parameter.sampled > 0 && pg_tracing_enabled(exec_nested_level)) + { + begin_span(&span, SPAN_EXECUTOR_END, traceparent_parameter.trace_id, + get_parent_id(exec_nested_level), + start_query_ts, start_query, NULL); + + /* + * Query finished normally, send 0 as error code + */ + process_query_desc(queryDesc, 0); + } + + if (prev_ExecutorEnd) + prev_ExecutorEnd(queryDesc); + else + standard_ExecutorEnd(queryDesc); + + if (traceparent_parameter.sampled > 0) + { + instr_time end; + + INSTR_TIME_SET_CURRENT(end); + if (pg_tracing_enabled(exec_nested_level)) + { + add_span(&span, &end); + } + end_tracing(end); + } +} + +/* + * ProcessUtility hook + */ +static void +pg_tracing_ProcessUtility(PlannedStmt *pstmt, const char *queryString, + bool readOnlyTree, + ProcessUtilityContext context, + ParamListInfo params, QueryEnvironment *queryEnv, + DestReceiver *dest, QueryCompletion *qc) +{ + Span span; + instr_time end; + + if (traceparent_parameter.sampled > 0 && pg_tracing_enabled(exec_nested_level)) + { + begin_span(&span, SPAN_PROCESS_UTILITY, traceparent_parameter.trace_id, + get_parent_id(exec_nested_level), start_query_ts, start_query, NULL); + + exec_nested_level++; + + PG_TRY(); + { + if (prev_ProcessUtility) + prev_ProcessUtility(pstmt, queryString, readOnlyTree, + context, params, queryEnv, + dest, qc); + else + standard_ProcessUtility(pstmt, queryString, readOnlyTree, + context, params, queryEnv, + dest, qc); + } + PG_CATCH(); + { + exec_nested_level--; + handle_pg_error(&span, NULL); + PG_RE_THROW(); + } + PG_END_TRY(); + exec_nested_level--; + + /* TODO Get command tag? */ + span.node_counters.rows = qc->nprocessed; + + INSTR_TIME_SET_CURRENT(end); + add_span(&span, &end); + end_tracing(end); + } + else + { + if (prev_ProcessUtility) + prev_ProcessUtility(pstmt, queryString, readOnlyTree, + context, params, queryEnv, + dest, qc); + else + standard_ProcessUtility(pstmt, queryString, readOnlyTree, + context, params, queryEnv, + dest, qc); + } +} + +/* + * Walk through the planstate tree generating a node span for each node. + * We pass possible error code to tag unfinished node with it + */ +static void +generate_span_from_planstate(PlanState *planstate, uint64 trace_id, uint64 parent_id, int sql_error_code) +{ + Span span; + ListCell *l; + char const *span_name; + char const *operation_name; + Plan const *plan = planstate->plan; + instr_time node_duration; + + InstrEndLoop(planstate->instrument); + + begin_span(&span, SPAN_NODE, trace_id, parent_id, + start_query_ts, start_query, &planstate->instrument->firsttime); + /* first tuple time */ + span.startup = planstate->instrument->startup * NS_PER_S; + + /* + * If we have a Result node, make it the span parent of the next query + * span if we have any + */ + if (nodeTag(plan) == T_Result && exec_nested_level < max_nested_level) + { + query_spans[exec_nested_level + 1].parent_id = span.span_id; + } + + /* + * Generate names and store them + */ + span_name = plan_to_span_name(plan); + operation_name = plan_to_operation(planstate, span_name); + + text_store(pg_tracing, span_name, strlen(span_name), &span.name_offset); + text_store(pg_tracing, operation_name, strlen(operation_name), &span.operation_name_offset); + + span.node_counters.rows = (int64) planstate->instrument->ntuples / planstate->instrument->nloops; + span.node_counters.nloops = (int64) planstate->instrument->nloops; + span.node_counters.buffer_usage = planstate->instrument->bufusage; + span.node_counters.wal_usage = planstate->instrument->walusage; + + span.plan_counters.startup_cost = plan->startup_cost; + span.plan_counters.total_cost = plan->total_cost; + span.plan_counters.plan_rows = plan->plan_rows; + span.plan_counters.plan_width = plan->plan_width; + + if (!planstate->state->es_finished) + { + /* + * We're processing this node in an error handler, stop the node + * instrumentation to get it's current state + */ + InstrStopNode(planstate->instrument, planstate->state->es_processed); + InstrEndLoop(planstate->instrument); + span.sql_error_code = sql_error_code; + } + node_duration = planstate->instrument->firsttime; + Assert(planstate->instrument->total > 0); + node_duration.ticks += planstate->instrument->total * NS_PER_S; + add_span(&span, &node_duration); + + /* + * Walk the planstate tree + */ + if (planstate->lefttree) + { + generate_span_from_planstate(planstate->lefttree, trace_id, span.span_id, sql_error_code); + } + if (planstate->righttree) + { + generate_span_from_planstate(planstate->righttree, trace_id, span.span_id, sql_error_code); + } + + /* + * Handle init plans and subplans + */ + foreach(l, planstate->initPlan) + { + SubPlanState *sstate = (SubPlanState *) lfirst(l); + PlanState *splan = sstate->planstate; + + generate_span_from_planstate(splan, trace_id, span.span_id, sql_error_code); + } + + foreach(l, planstate->subPlan) + { + SubPlanState *sstate = (SubPlanState *) lfirst(l); + PlanState *splan = sstate->planstate; + + generate_span_from_planstate(splan, trace_id, span.span_id, sql_error_code); + } + + /* + * Handle special nodes with children nodes + */ + switch (nodeTag(plan)) + { + case T_Append: + generate_member_nodes(((AppendState *) planstate)->appendplans, + ((AppendState *) planstate)->as_nplans, trace_id, span.span_id, sql_error_code); + break; + case T_MergeAppend: + generate_member_nodes(((MergeAppendState *) planstate)->mergeplans, + ((MergeAppendState *) planstate)->ms_nplans, trace_id, span.span_id, sql_error_code); + break; + case T_BitmapAnd: + generate_member_nodes(((BitmapAndState *) planstate)->bitmapplans, + ((BitmapAndState *) planstate)->nplans, trace_id, span.span_id, sql_error_code); + break; + case T_BitmapOr: + generate_member_nodes(((BitmapOrState *) planstate)->bitmapplans, + ((BitmapOrState *) planstate)->nplans, trace_id, span.span_id, sql_error_code); + break; + case T_SubqueryScan: + generate_span_from_planstate(((SubqueryScanState *) planstate)->subplan, trace_id, span.span_id, sql_error_code); + break; + /* case T_CustomScan: */ + /* ExplainCustomChildren((CustomScanState *) planstate, */ + /* ancestors, es); */ + /* break; */ + default: + break; + } +} + +static void +generate_member_nodes(PlanState **planstates, int nplans, uint64 trace_id, uint64 parent_id, int sql_error_code) +{ + int j; + + for (j = 0; j < nplans; j++) + generate_span_from_planstate(planstates[j], trace_id, parent_id, sql_error_code); +} + +static int +add_plan_counters(const PlanCounters * plan_counters, int i, Datum *values) +{ + values[i++] = Float8GetDatumFast(plan_counters->startup_cost); + values[i++] = Float8GetDatumFast(plan_counters->total_cost); + values[i++] = Float8GetDatumFast(plan_counters->plan_rows); + values[i++] = Int32GetDatum(plan_counters->plan_width); + return i; +} + +static int +add_node_counters(const NodeCounters * node_counters, int i, Datum *values) +{ + Datum wal_bytes; + char buf[256]; + + values[i++] = Int64GetDatumFast(node_counters->rows); + values[i++] = Int64GetDatumFast(node_counters->nloops); + + /* Buffer usage */ + values[i++] = Int64GetDatumFast(node_counters->buffer_usage.shared_blks_hit); + values[i++] = Int64GetDatumFast(node_counters->buffer_usage.shared_blks_read); + values[i++] = Int64GetDatumFast(node_counters->buffer_usage.shared_blks_dirtied); + values[i++] = Int64GetDatumFast(node_counters->buffer_usage.shared_blks_written); + + values[i++] = Int64GetDatumFast(node_counters->buffer_usage.local_blks_hit); + values[i++] = Int64GetDatumFast(node_counters->buffer_usage.local_blks_read); + values[i++] = Int64GetDatumFast(node_counters->buffer_usage.local_blks_dirtied); + values[i++] = Int64GetDatumFast(node_counters->buffer_usage.local_blks_written); + + values[i++] = Float8GetDatumFast(INSTR_TIME_GET_MILLISEC(node_counters->buffer_usage.blk_read_time)); + values[i++] = Float8GetDatumFast(INSTR_TIME_GET_MILLISEC(node_counters->buffer_usage.blk_write_time)); + + values[i++] = Int64GetDatumFast(node_counters->buffer_usage.temp_blks_read); + values[i++] = Int64GetDatumFast(node_counters->buffer_usage.temp_blks_written); + values[i++] = Float8GetDatumFast(INSTR_TIME_GET_MILLISEC(node_counters->buffer_usage.temp_blk_read_time)); + values[i++] = Float8GetDatumFast(INSTR_TIME_GET_MILLISEC(node_counters->buffer_usage.temp_blk_write_time)); + + /* WAL usage */ + values[i++] = Int64GetDatumFast(node_counters->wal_usage.wal_records); + values[i++] = Int64GetDatumFast(node_counters->wal_usage.wal_fpi); + + snprintf(buf, sizeof buf, UINT64_FORMAT, node_counters->wal_usage.wal_bytes); + /* Convert to numeric. */ + wal_bytes = DirectFunctionCall3(numeric_in, + CStringGetDatum(buf), + ObjectIdGetDatum(0), + Int32GetDatum(-1)); + values[i++] = wal_bytes; + + /* JIT usage */ + values[i++] = Int8GetDatum(node_counters->jit_usage.created_functions); + values[i++] = Float8GetDatumFast(INSTR_TIME_GET_MILLISEC(node_counters->jit_usage.generation_counter)); + values[i++] = Float8GetDatumFast(INSTR_TIME_GET_MILLISEC(node_counters->jit_usage.inlining_counter)); + values[i++] = Float8GetDatumFast(INSTR_TIME_GET_MILLISEC(node_counters->jit_usage.optimization_counter)); + values[i++] = Float8GetDatumFast(INSTR_TIME_GET_MILLISEC(node_counters->jit_usage.emission_counter)); + + return i; +} + +static void +add_result_span(ReturnSetInfo *rsinfo, Span * span, + const char *qbuffer, Size qbuffer_size) +{ + Datum values[PG_TRACING_TRACES_COLS] = {0}; + bool nulls[PG_TRACING_TRACES_COLS] = {0}; + + int i = 0; + + values[i++] = Int64GetDatum(span->trace_id); + values[i++] = Int64GetDatum(span->parent_id); + values[i++] = Int64GetDatum(span->span_id); + values[i++] = CStringGetTextDatum(get_span_name(span, qbuffer)); + values[i++] = CStringGetTextDatum(get_operation_name(span, qbuffer)); + values[i++] = Int64GetDatum(span->start); + + values[i++] = Int64GetDatum(INSTR_TIME_GET_NANOSEC(span->duration)); + values[i++] = CStringGetTextDatum(unpack_sql_state(span->sql_error_code)); + values[i++] = UInt32GetDatum(span->be_pid); + + if ((span->type >= SPAN_NODE && span->type <= SPAN_NODE_UNKNOWN) + || span->type == SPAN_PLANNER) + { + i = add_plan_counters(&span->plan_counters, i, values); + i = add_node_counters(&span->node_counters, i, values); + + values[i++] = Int64GetDatum(span->startup); + if (span->parameter_offset != -1) + { + values[i++] = CStringGetTextDatum(qbuffer + span->parameter_offset); + } + } + + for (int j = i; j < PG_TRACING_TRACES_COLS; j++) + { + nulls[j] = 1; + } + + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); +} + +/* + * Return spans as a result set. + * + * Accept a consume parameter. When consume is set, + * we empty the shared buffer and query text. + */ +Datum +pg_tracing_spans(PG_FUNCTION_ARGS) +{ + bool consume; + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + Span *span; + const char *qbuffer; + Size qbuffer_size; + + consume = PG_GETARG_BOOL(0); + if (!pg_tracing) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("pg_tracing must be loaded via shared_preload_libraries"))); + InitMaterializedSRF(fcinfo, 0); + + qbuffer = qtext_load_file(&qbuffer_size); + { + volatile pgTracingSharedState *s = (volatile pgTracingSharedState *) pg_tracing; + + SpinLockAcquire(&s->mutex); + for (int i = 0; i < shared_spans->end; i++) + { + span = shared_spans->spans + i; + add_result_span(rsinfo, span, qbuffer, qbuffer_size); + } + + /* + * Consume is set, remove spans from the shared buffer and reset query + * file + */ + if (consume) + { + shared_spans->end = 0; + s->extent = 0; + } + SpinLockRelease(&s->mutex); + } + return (Datum) 0; +} + +/* + * Return statistics of pg_tracing. + */ +Datum +pg_tracing_info(PG_FUNCTION_ARGS) +{ + pgTracingGlobalStats stats; + TupleDesc tupdesc; + Datum values[PG_TRACING_INFO_COLS] = {0}; + bool nulls[PG_TRACING_INFO_COLS] = {0}; + int i = 0; + + if (!pg_tracing) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("pg_tracing must be loaded via shared_preload_libraries"))); + + /* Build a tuple descriptor for our result type */ + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + /* Get a copy of the pg_tracing stats */ + { + volatile pgTracingSharedState *s = (volatile pgTracingSharedState *) pg_tracing; + + SpinLockAcquire(&s->mutex); + stats = s->stats; + SpinLockRelease(&s->mutex); + } + + values[i++] = Int64GetDatum(stats.traces); + values[i++] = Int64GetDatum(stats.spans); + values[i++] = Int64GetDatum(stats.dropped_spans); + values[i++] = TimestampTzGetDatum(stats.last_consume); + values[i++] = TimestampTzGetDatum(stats.stats_reset); + + PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); +} + +/* + * Reset pg_tracing statistics. + */ +Datum +pg_tracing_reset(PG_FUNCTION_ARGS) +{ + /* + * Reset global statistics for pg_tracing since all entries are removed. + */ + { + volatile pgTracingSharedState *s = (volatile pgTracingSharedState *) pg_tracing; + TimestampTz stats_reset = GetCurrentTimestamp(); + + SpinLockAcquire(&s->mutex); + s->stats.traces = 0; + s->stats.stats_reset = stats_reset; + SpinLockRelease(&s->mutex); + } + PG_RETURN_VOID(); +} diff --git a/contrib/pg_tracing/pg_tracing.conf b/contrib/pg_tracing/pg_tracing.conf new file mode 100644 index 0000000000..ad6b5e7c1d --- /dev/null +++ b/contrib/pg_tracing/pg_tracing.conf @@ -0,0 +1 @@ +shared_preload_libraries = 'pg_tracing' diff --git a/contrib/pg_tracing/pg_tracing.control b/contrib/pg_tracing/pg_tracing.control new file mode 100644 index 0000000000..efa78a6610 --- /dev/null +++ b/contrib/pg_tracing/pg_tracing.control @@ -0,0 +1,5 @@ +# pg_tracing extension +comment = 'Distributed tracing for postgres' +default_version = '1.0' +module_pathname = '$libdir/pg_tracing' +relocatable = true diff --git a/contrib/pg_tracing/pg_tracing.h b/contrib/pg_tracing/pg_tracing.h new file mode 100644 index 0000000000..9a9b6fda70 --- /dev/null +++ b/contrib/pg_tracing/pg_tracing.h @@ -0,0 +1,50 @@ +/*------------------------------------------------------------------------- + * pg_tracing.h + * + * IDENTIFICATION + * contrib/pg_tracing/pg_tracing.h + *------------------------------------------------------------------------- + */ +#ifndef _PG_TRACING_H_ +#define _PG_TRACING_H_ + +#include "postgres.h" +#include "span.h" + +#include "storage/s_lock.h" + +/* + * Global statistics for pg_tracing + */ +typedef struct pgTracingGlobalStats +{ + int64 traces; + int64 dropped_spans; + int64 spans; + TimestampTz last_consume; + TimestampTz stats_reset; +} pgTracingGlobalStats; + +/* + * Global shared state + */ +typedef struct pgTracingSharedState +{ + slock_t file_mutex; /* protects query file fields: */ + Size extent; /* current extent of query file */ + int n_writers; /* number of active writers to query file */ + slock_t mutex; + pgTracingGlobalStats stats; /* global statistics for pg_tracing */ +} pgTracingSharedState; + +/* + * Structure to store spans in shared memory. + * The size is fixed at startup. + */ +typedef struct pgTracingSharedSpans +{ + int end; /* Index of last element */ + Span spans[FLEXIBLE_ARRAY_MEMBER]; +} pgTracingSharedSpans; + +#endif diff --git a/contrib/pg_tracing/query_process.c b/contrib/pg_tracing/query_process.c new file mode 100644 index 0000000000..bffe9db9b6 --- /dev/null +++ b/contrib/pg_tracing/query_process.c @@ -0,0 +1,472 @@ +#include "query_process.h" +#include "pg_tracing.h" + +#include +#include +#include "parser/scanner.h" +#include "nodes/extensible.h" + +/* Location of external text file */ +#define PG_TRACING_TEXT_FILE PG_STAT_TMP_DIR "/pg_tracing.stat" + +/* + * Extract traceparent parameters from SQLCommenter + * + * We're expecting the query to start with a SQLComment containing the + * traceparent parameter + * "/\*traceparent='00-00000000000000000000000000000009-0000000000000005-01'*\/ + * SELECT 1;" + * Traceparent has the following format: version-traceid-parentid-sampled + * We also accept SQLCommenter as a parameter. In this case, we don't have + * the start and end of comments. + * + */ +pgTracingTraceparentParameter +extract_traceparent(const char *sqlcomment_str, bool is_parameter) +{ + const char *expected_start = "/*"; + const char *traceparent; + const char *end_sqlcomment; + char *endptr; + pgTracingTraceparentParameter pgTracingTraceparentParameter; + + pgTracingTraceparentParameter.sampled = 0; + pgTracingTraceparentParameter.trace_id = 0; + pgTracingTraceparentParameter.sampled = 0; + + if (!is_parameter) + { + /* + * Look for the start of a comment We're expecting the comment to be + * at the very start of the query + */ + for (size_t i = 0; i < strlen(expected_start); i++) + { + if (sqlcomment_str[i] != expected_start[i]) + { + return pgTracingTraceparentParameter; + } + } + + /* + * Check that we have an end + */ + end_sqlcomment = strstr(sqlcomment_str, "*/"); + if (end_sqlcomment == NULL) + { + return pgTracingTraceparentParameter; + } + } + else + { + end_sqlcomment = sqlcomment_str + strlen(sqlcomment_str); + } + + /* + * Locate traceparent parameter and make sure it has the expected size + * "traceparent" + "=" + '' -> 13 characters + * 00-00000000000000000000000000000009-0000000000000005-01 -> 55 + * characters + */ + traceparent = strstr(sqlcomment_str, "traceparent='"); + if (traceparent == NULL || traceparent > end_sqlcomment || end_sqlcomment - traceparent < 55 + 13) + { + return pgTracingTraceparentParameter; + } + + /* + * Move to the start of the traceparent values + */ + traceparent = traceparent + 13; + + /* + * Check that '-' are at the expected places + */ + if (traceparent[2] != '-' || traceparent[35] != '-' || traceparent[52] != '-') + { + return pgTracingTraceparentParameter; + } + + /* + * Parse traceparent parameters + */ + errno = 0; + pgTracingTraceparentParameter.trace_id = strtol(&traceparent[3], &endptr, 16); + if (endptr != traceparent + 35 || errno) + { + return pgTracingTraceparentParameter; + } + pgTracingTraceparentParameter.parent_id = strtol(&traceparent[36], &endptr, 16); + if (endptr != traceparent + 52 || errno) + { + return pgTracingTraceparentParameter; + } + pgTracingTraceparentParameter.sampled = strtol(&traceparent[53], &endptr, 16); + if (endptr != traceparent + 55 || errno) + { + /* + * Just to be sure, reset sampled on error + */ + pgTracingTraceparentParameter.sampled = 0; + } + return pgTracingTraceparentParameter; +} + + +/* + * comp_location: comparator for qsorting LocationLen structs by location + */ +static int +comp_location(const void *a, const void *b) +{ + int l = ((const LocationLen *) a)->location; + int r = ((const LocationLen *) b)->location; + + if (l < r) + return -1; + else if (l > r) + return +1; + else + return 0; +} + + +/* + * Normalise query and fill paramStr. + * Normalised query will separate tokens with a single space and + * parameters are replaced by $1, $2... + * Parameters are put in the paramStr wich will contain all parameters values + * using the format: "$1 = 0, $2 = 'v'" + */ +const char * +normalise_query_parameters(const JumbleState *jstate, const char *query, + int query_loc, int *query_len_p, char **paramStr) +{ + char *norm_query; + int query_len = *query_len_p; + int norm_query_buflen, /* Space allowed for norm_query */ + n_quer_loc = 0; + LocationLen *locs; + core_yyscan_t yyscanner; + core_yy_extra_type yyextra; + core_YYSTYPE yylval; + YYLTYPE yylloc; + int current_loc = 0; + StringInfoData buf; + + initStringInfo(&buf); + + norm_query_buflen = query_len + jstate->clocations_count * 10; + Assert(norm_query_buflen > 0); + locs = jstate->clocations; + + /* Allocate result buffer */ + norm_query = palloc(norm_query_buflen + 1); + + if (jstate->clocations_count > 1) + qsort(jstate->clocations, jstate->clocations_count, + sizeof(LocationLen), comp_location); + + /* initialize the flex scanner --- should match raw_parser() */ + yyscanner = scanner_init(query, + &yyextra, + &ScanKeywords, + ScanKeywordTokens); + + for (;;) + { + int loc = locs[current_loc].location; + int tok; + + tok = core_yylex(&yylval, &yylloc, yyscanner); + + /* + * We should not hit end-of-string, but if we do, behave sanely + */ + if (tok == 0) + break; /* out of inner for-loop */ + + /* + * We should find the token position exactly, but if we somehow run + * past it, work with that. + */ + if (current_loc < jstate->clocations_count && yylloc >= loc) + { + appendStringInfo(&buf, + "%s$%d = ", + current_loc > 0 ? ", " : "", + current_loc + 1); + if (query[loc] == '-') + { + /* + * It's a negative value - this is the one and only case where + * we replace more than a single token. + * + * Do not compensate for the core system's special-case + * adjustment of location to that of the leading '-' operator + * in the event of a negative constant. It is also useful for + * our purposes to start from the minus symbol. In this way, + * queries like "select * from foo where bar = 1" and "select * + * from foo where bar = -2" will have identical normalized + * query strings. + */ + appendStringInfoChar(&buf, '-'); + tok = core_yylex(&yylval, &yylloc, yyscanner); + if (tok == 0) + break; /* out of inner for-loop */ + } + if (yylloc > 0 && yyextra.scanbuf[yylloc - 1] == ' ' && n_quer_loc > 0) + { + norm_query[n_quer_loc++] = ' '; + } + + /* + * Append the current parameter $x in the normalised query + */ + n_quer_loc += sprintf(norm_query + n_quer_loc, "$%d", + current_loc + 1 + jstate->highest_extern_param_id); + + appendStringInfoString(&buf, yyextra.scanbuf + yylloc); + + current_loc++; + } + else + { + int to_copy; + + if (yylloc > 0 && yyextra.scanbuf[yylloc - 1] == ' ' && n_quer_loc > 0) + { + norm_query[n_quer_loc++] = ' '; + } + to_copy = strlen(yyextra.scanbuf + yylloc); + Assert(n_quer_loc + to_copy < norm_query_buflen + 1); + memcpy(norm_query + n_quer_loc, yyextra.scanbuf + yylloc, to_copy); + n_quer_loc += to_copy; + } + } + scanner_finish(yyscanner); + + *query_len_p = n_quer_loc; + norm_query[n_quer_loc] = '\0'; + *paramStr = buf.data; + return norm_query; +} + +const char * +normalise_query(const char *query, int *query_len_p) +{ + char *norm_query; + int query_len = *query_len_p; + int norm_query_buflen = query_len; + int n_quer_loc = 0; + core_yyscan_t yyscanner; + core_yy_extra_type yyextra; + core_YYSTYPE yylval; + YYLTYPE yylloc; + + /* Allocate result buffer */ + norm_query = palloc(norm_query_buflen + 1); + + /* initialize the flex scanner --- should match raw_parser() */ + yyscanner = scanner_init(query, &yyextra, &ScanKeywords, ScanKeywordTokens); + for (;;) + { + int tok; + int to_copy; + + tok = core_yylex(&yylval, &yylloc, yyscanner); + + if (tok == 0) + break; /* out of inner for-loop */ + + if (yylloc > 0 && yyextra.scanbuf[yylloc - 1] == ' ' && n_quer_loc > 0) + { + norm_query[n_quer_loc++] = ' '; + } + to_copy = strlen(yyextra.scanbuf + yylloc); + Assert(n_quer_loc + to_copy < norm_query_buflen + 1); + memcpy(norm_query + n_quer_loc, yyextra.scanbuf + yylloc, to_copy); + n_quer_loc += to_copy; + } + scanner_finish(yyscanner); + + *query_len_p = n_quer_loc; + norm_query[n_quer_loc] = '\0'; + return norm_query; +} + +bool +text_store(pgTracingSharedState * pg_tracing, const char *query, int query_len, + Size *query_offset) +{ + Size off; + int fd; + + { + volatile pgTracingSharedState *s = (volatile pgTracingSharedState *) pg_tracing; + + SpinLockAcquire(&s->file_mutex); + off = s->extent; + s->extent += query_len + 1; + s->n_writers++; + SpinLockRelease(&s->file_mutex); + } + + /* + * Don't allow the file to grow larger than what qtext_load_file can + * (theoretically) handle. This has been seen to be reachable on 32-bit + * platforms. + */ + if (unlikely(query_len >= MaxAllocHugeSize - off)) + { + errno = EFBIG; /* not quite right, but it'll do */ + fd = -1; + goto error; + } + + /* Now write the data into the successfully-reserved part of the file */ + fd = OpenTransientFile(PG_TRACING_TEXT_FILE, O_RDWR | O_CREAT | PG_BINARY); + if (fd < 0) + goto error; + + if (pg_pwrite(fd, query, query_len, off) != query_len) + goto error; + if (pg_pwrite(fd, "\0", 1, off + query_len) != 1) + goto error; + + CloseTransientFile(fd); + + /* Mark our write complete */ + { + volatile pgTracingSharedState *s = (volatile pgTracingSharedState *) pg_tracing; + + SpinLockAcquire(&s->mutex); + s->n_writers--; + SpinLockRelease(&s->mutex); + } + + /* + * Set offset once write was succesful + */ + *query_offset = off; + + return true; + +error: + ereport(LOG, + (errcode_for_file_access(), + errmsg("could not write file \"%s\": %m", + PG_TRACING_TEXT_FILE))); + + if (fd >= 0) + CloseTransientFile(fd); + + /* Mark our write complete */ + { + volatile pgTracingSharedState *s = (volatile pgTracingSharedState *) pg_tracing; + + SpinLockAcquire(&s->mutex); + s->n_writers--; + SpinLockRelease(&s->mutex); + } + + return false; +} + +/* + * Read the external query text file into a malloc'd buffer. + * + * Returns NULL (without throwing an error) if unable to read, eg file not + * there or insufficient memory. + * + * On success, the buffer size is also returned into *buffer_size. + * + * This can be called without any lock on pgss->lock, but in that case the + * caller is responsible for verifying that the result is sane. + */ +const char * +qtext_load_file(Size *buffer_size) +{ + char *buf; + int fd; + struct stat stat; + Size nread; + + fd = OpenTransientFile(PG_TRACING_TEXT_FILE, O_RDONLY | PG_BINARY); + if (fd < 0) + { + if (errno != ENOENT) + ereport(LOG, + (errcode_for_file_access(), + errmsg("could not read file \"%s\": %m", + PG_TRACING_TEXT_FILE))); + return NULL; + } + + /* Get file length */ + if (fstat(fd, &stat)) + { + ereport(LOG, + (errcode_for_file_access(), + errmsg("could not stat file \"%s\": %m", + PG_TRACING_TEXT_FILE))); + CloseTransientFile(fd); + return NULL; + } + + /* Allocate buffer; beware that off_t might be wider than size_t */ + if (stat.st_size <= MaxAllocHugeSize) + buf = (char *) malloc(stat.st_size); + else + buf = NULL; + if (buf == NULL) + { + ereport(LOG, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"), + errdetail("Could not allocate enough memory to read file \"%s\".", + PG_TRACING_TEXT_FILE))); + CloseTransientFile(fd); + return NULL; + } + + /* + * OK, slurp in the file. Windows fails if we try to read more than + * INT_MAX bytes at once, and other platforms might not like that either, + * so read a very large file in 1GB segments. + */ + nread = 0; + while (nread < stat.st_size) + { + int toread = Min(1024 * 1024 * 1024, stat.st_size - nread); + + /* + * If we get a short read and errno doesn't get set, the reason is + * probably that garbage collection truncated the file since we did + * the fstat(), so we don't log a complaint --- but we don't return + * the data, either, since it's most likely corrupt due to concurrent + * writes from garbage collection. + */ + errno = 0; + if (read(fd, buf + nread, toread) != toread) + { + if (errno) + ereport(LOG, + (errcode_for_file_access(), + errmsg("could not read file \"%s\": %m", + PG_TRACING_TEXT_FILE))); + free(buf); + CloseTransientFile(fd); + return NULL; + } + nread += toread; + } + + if (CloseTransientFile(fd) != 0) + ereport(LOG, + (errcode_for_file_access(), + errmsg("could not close file \"%s\": %m", PG_TRACING_TEXT_FILE))); + + *buffer_size = nread; + return buf; +} diff --git a/contrib/pg_tracing/query_process.h b/contrib/pg_tracing/query_process.h new file mode 100644 index 0000000000..0d62891e4e --- /dev/null +++ b/contrib/pg_tracing/query_process.h @@ -0,0 +1,32 @@ +#ifndef _QUERY_PROCESS_H_ +#define _QUERY_PROCESS_H_ + +#include "pg_tracing.h" +#include "nodes/queryjumble.h" +#include "parser/parse_node.h" + +typedef struct pgTracingTraceparentParameter +{ + uint64 trace_id; + uint64 parent_id; + int sampled; +} pgTracingTraceparentParameter; + +/* + * Normalise query: - Comments are removed - Constants are replaced by $x - + * All tokens are separated by a single space + */ +const char *normalise_query_parameters(const JumbleState *jstate, const char *query, + int query_loc, int *query_len_p, char **paramStr); + +pgTracingTraceparentParameter extract_traceparent(const char *query_str, bool is_parameter); + +/* + * Normalise simple query + */ +const char *normalise_query(const char *query, int *query_len_p); +bool text_store(pgTracingSharedState * pg_tracing, const char *query, + int query_len, Size *query_offset); +const char *qtext_load_file(Size *buffer_size); + +#endif diff --git a/contrib/pg_tracing/span.c b/contrib/pg_tracing/span.c new file mode 100644 index 0000000000..403785ac27 --- /dev/null +++ b/contrib/pg_tracing/span.c @@ -0,0 +1,249 @@ +#include "span.h" + +#include "nodes/extensible.h" +#include "common/pg_prng.h" + + +SpanType +command_type_to_span_type(CmdType cmd_type) +{ + switch (cmd_type) + { + case CMD_SELECT: + return SPAN_NODE_SELECT; + case CMD_INSERT: + return SPAN_NODE_INSERT; + case CMD_UPDATE: + return SPAN_NODE_UPDATE; + case CMD_DELETE: + return SPAN_NODE_DELETE; + case CMD_MERGE: + return SPAN_NODE_MERGE; + case CMD_UTILITY: + return SPAN_NODE_UTILITY; + case CMD_NOTHING: + return SPAN_NODE_UTILITY; + default: + return SPAN_NODE_UNKNOWN; + } +} + +void +initialize_span_fields(Span * span, SpanType type, uint64 trace_id, uint64 parent_id) +{ + span->trace_id = trace_id; + span->type = type; + span->parent_id = parent_id; + span->span_id = pg_prng_uint64(&pg_global_prng_state); + span->name_offset = -1; + span->operation_name_offset = -1; + span->parameter_offset = -1; + span->sql_error_code = 0; + span->startup = 0; + span->be_pid = MyProcPid; + INSTR_TIME_SET_ZERO(span->duration); + memset(&span->node_counters, 0, sizeof(NodeCounters)); + memset(&span->plan_counters, 0, sizeof(PlanCounters)); + if (type == SPAN_PLANNER || span->type == SPAN_PROCESS_UTILITY) + { + /* + * Store the starting buffer and wal usage for planner and process + * utility spans + */ + span->node_counters.buffer_usage = pgBufferUsage; + span->node_counters.wal_usage = pgWalUsage; + } +} + +static +void +initialize_span_time(Span * span, + TimestampTz start_query_ts, + instr_time start_query_instr_time, + const instr_time *start_span_time) +{ + instr_time delta_start; + + /* + * If no start span is provided, get the current one + */ + if (start_span_time == NULL) + INSTR_TIME_SET_CURRENT(delta_start); + else + delta_start = *start_span_time; + + /* + * We use duration to temporarily store instr_time span start + */ + span->duration = delta_start; + + /* + * Compute the span timestamp start by using delta instr_time between + * start query and start span. + */ + INSTR_TIME_SUBTRACT(delta_start, start_query_instr_time); + span->start = start_query_ts + INSTR_TIME_GET_MICROSEC(delta_start); +} + +/* + * Initialize span and set span starting time. + * + * A span needs a start timestamp and a duration. + * Given that spans can have a very short duration (less than 1ms), we + * need to rely on monotonic clock as much as possible to have the best precision. + * + * For that, after we established that a query was sampled, we get both the start + * timestamp and instr_time. All subsequent times will be taken instr_time and we + * deduct the starting time from the delta between the current instr_time and the + * start_instr_time. + * + * We provide multiple times: + * - start_query_ts is the earliest timestamp we've captured for the current trace. + * - start_query_instr_time is the instr_time taken at the same moment as the start_query_ts. + * - start_span_time is an optional start instr_time of the provided span. It will be used + * to get the start timestamp of the span. If nothing is provided, we use the current instr_time. + * This parameter is mostly used when generating spans from planstate as we need to rely on the + * query instrumentation to find the node start. + */ +void +begin_span(Span * span, SpanType type, uint64 trace_id, uint64 parent_id, + TimestampTz start_query_ts, + instr_time start_query_instr_time, + const instr_time *start_span_time) +{ + initialize_span_fields(span, type, trace_id, parent_id); + initialize_span_time(span, start_query_ts, start_query_instr_time, start_span_time); +} + +/* + * Set span duration and accumulated buffers + * end_time is optional, if NULL is passed, we use + * the current time + */ +void +set_span_duration_and_counters(Span * span, const instr_time *end_time) +{ + BufferUsage buffer_usage; + WalUsage wal_usage; + + /* + * We used span->duration as a temporary storage for the instr_time at the + * begining of the span, fetch it + */ + instr_time start_span = span->duration; + + Assert(span->trace_id > 0); + + /* + * Set span duration with the end time before substrating the start + */ + if (end_time == NULL) + { + INSTR_TIME_SET_CURRENT(span->duration); + } + else + { + span->duration = *end_time; + } + INSTR_TIME_SUBTRACT(span->duration, start_span); + + if (span->type == SPAN_PLANNER || span->type == SPAN_PROCESS_UTILITY) + { + /* calc differences of buffer counters. */ + memset(&buffer_usage, 0, sizeof(BufferUsage)); + BufferUsageAccumDiff(&buffer_usage, &pgBufferUsage, + &span->node_counters.buffer_usage); + /* calc differences of WAL counters. */ + memset(&wal_usage, 0, sizeof(wal_usage)); + WalUsageAccumDiff(&wal_usage, &pgWalUsage, + &span->node_counters.wal_usage); + } + +} + +const char * +get_span_name(const Span * span, const char *qbuffer) +{ + switch (span->type) + { + case SPAN_PARSE: + return "Parse"; + case SPAN_PLANNER: + return "Planner"; + case SPAN_FUNCTION: + return "Function"; + case SPAN_PROCESS_UTILITY: + return "ProcessUtility"; + case SPAN_EXECUTOR_START: + return "Executor"; + case SPAN_EXECUTOR_RUN: + return "Executor"; + case SPAN_EXECUTOR_END: + return "Executor"; + case SPAN_EXECUTOR_FINISH: + return "Executor"; + + case SPAN_NODE_SELECT: + return "Select"; + case SPAN_NODE_INSERT: + return "Insert"; + case SPAN_NODE_UPDATE: + return "Update"; + case SPAN_NODE_DELETE: + return "Delete"; + case SPAN_NODE_MERGE: + return "Merge"; + case SPAN_NODE_UTILITY: + return "Utility"; + case SPAN_NODE_NOTHING: + return "Nothing"; + case SPAN_NODE_UNKNOWN: + return "Unknown"; + + case SPAN_NODE: + if (span->name_offset == -1) + return "Node"; + return qbuffer + span->name_offset; + default: + elog(ERROR, "unexpected span->type"); + } +} + +const char * +get_operation_name(const Span * span, const char *qbuffer) +{ + switch (span->type) + { + case SPAN_PARSE: + return "Parse"; + case SPAN_PLANNER: + return "Planner"; + case SPAN_FUNCTION: + return "Function"; + case SPAN_PROCESS_UTILITY: + return "ProcessUtility"; + case SPAN_EXECUTOR_START: + return "Start"; + case SPAN_EXECUTOR_RUN: + return "Run"; + case SPAN_EXECUTOR_END: + return "End"; + case SPAN_EXECUTOR_FINISH: + return "Finish"; + + case SPAN_NODE_SELECT: + case SPAN_NODE_INSERT: + case SPAN_NODE_UPDATE: + case SPAN_NODE_DELETE: + case SPAN_NODE_MERGE: + case SPAN_NODE_UTILITY: + case SPAN_NODE_NOTHING: + case SPAN_NODE_UNKNOWN: + case SPAN_NODE: + if (span->operation_name_offset == -1) + return "Node"; + return qbuffer + span->operation_name_offset; + default: + elog(ERROR, "unexpected span->type"); + } +} diff --git a/contrib/pg_tracing/span.h b/contrib/pg_tracing/span.h new file mode 100644 index 0000000000..f966c8d1df --- /dev/null +++ b/contrib/pg_tracing/span.h @@ -0,0 +1,131 @@ +/*------------------------------------------------------------------------- + * contrib/pg_tracing/span.h + * + * Header for span. + * + * IDENTIFICATION + * contrib/pg_tracing/span.h + * + *------------------------------------------------------------------------- + */ +#ifndef _SPAN_H_ +#define _SPAN_H_ + +#include "postgres.h" + +#include "jit/jit.h" +#include "pgstat.h" +#include "access/transam.h" + +/* + * SpanType: Type of the span + */ +typedef enum +{ + SPAN_PARSE, /* Wraps query parsing */ + SPAN_PLANNER, /* Wraps planner execution in planner hook */ + SPAN_FUNCTION, /* Wraps function in fmgr hook */ + SPAN_PROCESS_UTILITY, /* Wraps ProcessUtility execution */ + + SPAN_EXECUTOR_START, /* Executor Spans wrapping the matching hooks */ + SPAN_EXECUTOR_RUN, + SPAN_EXECUTOR_END, + SPAN_EXECUTOR_FINISH, + + SPAN_NODE, /* Represents a node execution, generated from + * planstate */ + + SPAN_NODE_SELECT, /* Query Span types. They are created from the + * query cmdType */ + SPAN_NODE_INSERT, + SPAN_NODE_UPDATE, + SPAN_NODE_DELETE, + SPAN_NODE_MERGE, + SPAN_NODE_UTILITY, + SPAN_NODE_NOTHING, + SPAN_NODE_UNKNOWN, +} SpanType; + + +/* + * Counters extracted from query instrumentation + */ +typedef struct NodeCounters +{ + int64 rows; /* # of tuples processed */ + int64 nloops; /* # of cycles for this node */ + + BufferUsage buffer_usage; /* total buffer usage for this node */ + WalUsage wal_usage; /* total WAL usage for this node */ + JitInstrumentation jit_usage; /* total JIT usage for this node */ +} NodeCounters; + +/* + * Counters extracted from query's plan + */ +typedef struct PlanCounters +{ + /* + * estimated execution costs for plan (see costsize.c for more info) + */ + Cost startup_cost; /* cost expended before fetching any tuples */ + Cost total_cost; /* total cost (assuming all tuples fetched) */ + + /* + * planner's estimate of result size of this plan step + */ + Cardinality plan_rows; /* number of rows plan is expected to emit */ + int plan_width; /* average row width in bytes */ +} PlanCounters; + +/* + * The Span data structure represents an operation with a start, a duration. + * It contains the minimum needed to represent a span and serves as a base + * for the SpanNode. It is used for simple operations that don't need + * significant metadata like Executor spans. + */ +typedef struct Span +{ + uint64 trace_id; /* Trace id extracted from the SQLCommenter's + * traceparent */ + uint64 span_id; /* Span Identifier generated from a random + * uint64 */ + uint64 parent_id; /* Span's parent id. For the top span, it's + * extracted from SQLCommenter's traceparent. + * For other spans, we pass the parent's span. */ + + TimestampTz start; /* Start of the span. Except for the */ + instr_time duration; /* Duration of the span in nanoseconds. */ + SpanType type; /* Type of the span. Used to generate the + * span's name for all spans except SPAN_NODE. */ + int be_pid; /* Pid of the backend process */ + + /* + * We store variable size metadata in an external file. Those represent + * the position of NULL terminated strings in the file. Set to -1 if + * unused. + */ + Size name_offset; /* span name offset in external file */ + Size operation_name_offset; /* operation name offset in external + * file */ + Size parameter_offset; /* parameters offset in external file */ + + PlanCounters plan_counters; /* Counters with plan costs */ + NodeCounters node_counters; /* Counters with node costs (jit, wal, + * buffers) */ + int64 startup; /* Time to the first tuple */ + int sql_error_code; /* query error code extracted from ErrorData, + * 0 if query was successful */ +} Span; + +void begin_span(Span * span, SpanType type, uint64 trace_id, uint64 parent_id, + TimestampTz start_query_ts, instr_time start_query_instr_time, + const instr_time *start_time); +void set_span_duration_and_counters(Span * span, const instr_time *end_time); +void initialize_span_fields(Span * span, SpanType type, uint64 trace_id, uint64 parent_id); + +SpanType command_type_to_span_type(CmdType cmd_type); +const char *get_span_name(const Span * span, const char *qbuffer); +const char *get_operation_name(const Span * span, const char *qbuffer); + +#endif diff --git a/contrib/pg_tracing/sql/reset.sql b/contrib/pg_tracing/sql/reset.sql new file mode 100644 index 0000000000..531db2172d --- /dev/null +++ b/contrib/pg_tracing/sql/reset.sql @@ -0,0 +1,3 @@ +-- Check reset is working +SELECT pg_tracing_reset(); +SELECT traces from pg_tracing_info; diff --git a/contrib/pg_tracing/sql/select.sql b/contrib/pg_tracing/sql/select.sql new file mode 100644 index 0000000000..48311eec40 --- /dev/null +++ b/contrib/pg_tracing/sql/select.sql @@ -0,0 +1,105 @@ +CREATE EXTENSION pg_tracing; + +/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000001-0000000000000001-01'*/ SELECT 1; + +-- Get top span id +SELECT span_id AS top_span_id from pg_tracing_spans(false) where parent_id=1 and name!='Parse' \gset + +-- Check parameters +SELECT parameters from pg_tracing_spans(false) where span_id=:top_span_id; + +-- Check the number of children +SELECT count(*) from pg_tracing_spans(false) where parent_id=:'top_span_id'; + +-- Check resource +SELECT resource from pg_tracing_spans(false) where trace_id=1 order by span_start; + +-- Check reported number of trace +SELECT traces from pg_tracing_info; + + +CREATE OR REPLACE FUNCTION test_function(a int) RETURNS SETOF oid AS +$BODY$ +BEGIN + RETURN QUERY SELECT oid from pg_class where oid = a; +END; +$BODY$ +LANGUAGE plpgsql; + +/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000002-0000000000000002-01'*/ select test_function(1); + +SELECT span_id AS top_span, + extract(epoch from span_start) as top_start, + round(extract(epoch from span_start) + duration / 1000000000.0) as top_end + from pg_tracing_spans(false) where parent_id=2 and name!='Parse' \gset +SELECT span_id AS top_run_span, + extract(epoch from span_start) as top_run_start, + round(extract(epoch from span_start) + duration / 1000000000.0) as top_run_end + from pg_tracing_spans(false) where parent_id=:top_span and name='Executor' and resource='Run' \gset +SELECT span_id AS top_project, + extract(epoch from span_start) as top_project_start, + round(extract(epoch from span_start) + duration / 1000000000.0) as top_project_end + from pg_tracing_spans(false) where parent_id=:top_run_span and name='ProjectSet' \gset +SELECT span_id AS top_result, + extract(epoch from span_start) as top_result_start, + round(extract(epoch from span_start) + duration / 1000000000.0) as top_result_end + from pg_tracing_spans(false) where parent_id=:top_project and name='Result' \gset +SELECT span_id AS nested_select, + extract(epoch from span_start) as select_start, + round(extract(epoch from span_start) + duration / 1000000000.0) as select_end + from pg_tracing_spans(false) where parent_id=:top_result and name='Select' \gset +SELECT span_id AS nested_run, + extract(epoch from span_start) as run_start, + round(extract(epoch from span_start) + duration / 1000000000.0) as run_end + from pg_tracing_spans(false) where parent_id=:nested_select and resource='Run' \gset + +SELECT :top_start < :top_run_start, + :top_end >= :top_run_end, + + :top_run_start <= :top_project_start, + + :top_run_end >= :top_project_end, + :top_run_end >= :select_end, + :top_run_end >= :run_end, + + :run_end >= :select_end; + + +SELECT resource from pg_tracing_spans(false) where parent_id=:nested_run order by resource; + +/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000003-0000000000000003-01'*/ SELECT * from current_database(); +SELECT resource from pg_tracing_spans(false) where trace_id=3 order by resource; + +/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000004-0000000000000004-01'*/ SELECT s.relation_size + s.index_size +FROM (SELECT + pg_relation_size(C.oid) as relation_size, + pg_indexes_size(C.oid) as index_size + FROM pg_class C) as s limit 1; +SELECT resource from pg_tracing_spans(false) where trace_id=4 order by resource; + +-- Check tracking option +set pg_tracing.track = 'top'; +/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000005-0000000000000005-01'*/ select test_function(1); +SELECT count(*) from pg_tracing_spans where trace_id=5; +set pg_tracing.track = 'none'; +/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000006-0000000000000006-01'*/ select test_function(1); +SELECT count(*) from pg_tracing_spans where trace_id=6; +set pg_tracing.track = 'all'; + +-- Check that we're in a correct state after a timeout +set statement_timeout=200; +/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000007-0000000000000007-01'*/ select * from pg_sleep(10); +/*dddbs='postgres.db',traceparent='00-00000000000000000000000000000008-0000000000000008-01'*/ select 1; +SELECT trace_id, resource, sql_error_code from pg_tracing_spans order by span_start; + +-- Test prepared statement +PREPARE test_prepared (text, integer) AS /*$1*/ SELECT 1; +EXECUTE test_prepared('dddbs=''postgres.db'',traceparent=''00-00000000000000000000000000000009-0000000000000009-01''', 1); +SELECT trace_id, resource from pg_tracing_spans order by span_start; + +-- Test prepared statement with generic plan +SET plan_cache_mode='force_generic_plan'; +EXECUTE test_prepared('dddbs=''postgres.db'',traceparent=''00-00000000000000000000000000000010-0000000000000010-01''', 1); +SELECT trace_id, resource from pg_tracing_spans order by span_start; + +DROP function test_function; diff --git a/src/backend/executor/instrument.c b/src/backend/executor/instrument.c index ee78a5749d..1174e2450f 100644 --- a/src/backend/executor/instrument.c +++ b/src/backend/executor/instrument.c @@ -70,6 +70,7 @@ InstrStartNode(Instrumentation *instr) if (instr->need_timer && !INSTR_TIME_SET_CURRENT_LAZY(instr->starttime)) elog(ERROR, "InstrStartNode called twice in a row"); + INSTR_TIME_SET_CURRENT_LAZY(instr->firsttime); /* save buffer usage totals at node entry, if needed */ if (instr->need_bufusage) diff --git a/src/include/executor/instrument.h b/src/include/executor/instrument.h index 87e5e2183b..13840be1d7 100644 --- a/src/include/executor/instrument.h +++ b/src/include/executor/instrument.h @@ -72,6 +72,7 @@ typedef struct Instrumentation bool async_mode; /* true if node is in async mode */ /* Info about current plan cycle: */ bool running; /* true if we've completed first tuple */ + instr_time firsttime; /* start time of this node */ instr_time starttime; /* start time of current iteration of node */ instr_time counter; /* accumulated runtime for this node */ double firsttuple; /* time for first tuple of this cycle */ -- 2.41.0