F.48. pgpro_queue — message queueing management #

pgpro_queue is a built-in Postgres Pro Enterprise extension for message queueing management. It enables reliable communication between applications using database queues.

pgpro_queue allows managing message queues directly within the database. With pgpro_queue, you can create queues, add messages to them, and process messages efficiently while ensuring data integrity and reliability. A message from the queue is processed by one of the consumers. Once a message is retrieved from the queue, it is no longer available to other consumers.

Because pgpro_queue provides database-integrated message queuing, standard features, such as transaction recovery, crash restart, and standby synchronization, are fully supported.

Messages in the queue are processed based on assigned priorities, allowing higher-priority messages to be handled ahead of others, even if they arrive later.

The diagram below shows a practical example of requesting the creation of a PDF file. Rather than processing these requests directly, they can be sent to a queue, allowing for efficient handling of high volumes, improved user experience with asynchronous processing, and greater flexibility through a decoupled architecture.

Figure F.1. PDF File Creation Example


F.48.1. Installation and Uninstallation #

The pgpro_queue extension is included into Postgres Pro Enterprise. Once you have Postgres Pro Enterprise installed, complete the following steps to enable pgpro_queue:

  1. Add pgpro_queue to the shared_preload_libraries parameter in the postgresql.conf file:

    shared_preload_libraries = 'pgpro_queue'
    
  2. Create the pgpro_queue extension using the following query:

    CREATE EXTENSION pgpro_queue;
    

    A set of API functions will be created in the current schema.

  3. Initialize the extension:

    SELECT pgpro_queue_initialize();
    

    This function creates a dedicated schema called pgpro_queue_data where all service objects, such as metadata tables, views, and queue-specific tables, are stored. The initialization decouples queue objects from the extension itself, ensuring proper replication and database dumps.

To remove all objects created by the extension, execute the following command (available only to superusers):

SELECT pgpro_queue_deinitialize();

This function performs a cascaded delete of the pgpro_queue_data schema.

F.48.2. Reference #

F.48.2.1. Configuration Parameters #

pgpro_queue.launcher_database (text) #

Specifies the name of the database the launcher process connects to. Specifying a non-existent database or an empty string is not allowed. The default value is postgres. This parameter can only be set at server start.

pgpro_queue.database_with_managed_retry (text) #

Specifies a comma-separated list of database names for which retry-management logic is enabled. If a database name is missing from the list, incorrectly specified, or uses an invalid delimiter, any messages unsuccessfully processed within a transaction, i.e. sent to retry on rollback, will be deleted, regardless of the queue or message parameters. The default value is postgres. This parameter can be set either at server start or when the configuration is reloaded.

pgpro_queue.shared_retry_list_size (integer) #

Specifies the capacity of the hash table in shared memory for the Shared Retry Pending List. This parameter essentially determines the maximum number of retry requests that user backend processes can simultaneously queue for processing by the launcher. The default value is 10000.

F.48.2.2. Functions #

F.48.2.2.1. Queue Management Procedures #

pgpro_queue provides the following procedures for queue management.

CREATE PROCEDURE CREATE_QUEUE( IN q_name name, IN q_type char DEFAULT 'N'::char, IN q_dlq name DEFAULT null, IN q_retries int DEFAULT 10, IN q_retrydelay int DEFAULT 30 ) #

Creates a queue in an existing queue table. The queue owner can restrict other users from accessing a queue by denying them access to the queue table at the ACL level. If any parameter is not specified, its default value is applied for all new messages.

  • q_name: The name of the queue, which can only contain letters, digits, and underscores.

  • q_type: The type of the new queue. The valid values are: N for a normal queue (the default), and D for the dead-letter queue. (The dead-letter queue is not implemented at the moment, the parameter value is reserved for use in future releases.)

  • q_dlq: The existing dead-letter queue. (The dead-letter queue is not implemented at the moment, the parameter is reserved for use in future releases.)

  • q_retries: The maximum number of retries. The default value is 10.

  • q_retrydelay: The number of seconds until a message is scheduled for reprocessing after a ROLLBACK. Specify 0 to retry the message immediately.

CREATE PROCEDURE ALTER_QUEUE( IN q_name name, IN new_type char DEFAULT null, IN new_dlq name DEFAULT null, IN new_retries int DEFAULT null, IN new_retrydelay int DEFAULT null ) #

Modifies the parameters of an existing queue. If any parameter is not specified, its default value is applied for all new messages. This change will not affect any existing messages. Only the queue owner who created the queue can change its parameters.

  • q_name: The name of the queue, which can only contain letters, digits, and underscores.

  • new_type: The new type of the queue. The valid values are: N for a normal queue (the default), and D for the dead-letter queue. (The dead-letter queue is not implemented at the moment, the parameter value is reserved for use in future releases.)

  • new_dlq: The existing dead-letter queue. (The dead-letter queue is not implemented at the moment, the parameter is reserved for use in future releases.)

  • new_retries: The maximum number of retries.

  • new_retrydelay: The number of seconds until a message is scheduled for reprocessing after a ROLLBACK. Specify 0 to retry the message immediately.

CREATE PROCEDURE DROP_QUEUE(IN q_name name) #

Deletes a queue with the specified name.

CREATE FUNCTION GET_QUEUE_TABLE(IN q_name name) #

Returns the OID of the queue table used for the queue.

F.48.2.2.2. Procedures for Inserting Messages #

pgpro_queue provides the following procedures for sending messages to the queue. It is not recommended to mix XML and JSONB payloads within the same queue.

CREATE PROCEDURE INSERT_MESSAGE( IN q_name name, IN q_msg_body jsonb, IN q_msg_priority int DEFAULT 0, IN q_msg_properties jsonb DEFAULT '{}'::jsonb, IN q_msg_retries int DEFAULT null, IN q_msg_retrydelay int DEFAULT null, IN q_msg_enable_time timestamptz DEFAULT null )
CREATE PROCEDURE INSERT_MESSAGE_XML( IN q_name name, IN q_msg_body xml, IN q_msg_priority int DEFAULT 0, IN q_msg_properties jsonb DEFAULT '{}'::jsonb, IN q_msg_retries int DEFAULT null, IN q_msg_retrydelay int DEFAULT null, IN q_msg_enable_time timestamptz DEFAULT null ) #

Inserts a JSON/XML message into the queue.

  • q_name: The name of the queue.

  • q_msg_body: Message payload (JSONB or XML).

  • q_msg_priority: Message priority.

  • q_msg_properties: No properties by default.

  • q_msg_retries: The maximum number of retries. If not specified, the default value from the queue is used.

  • q_msg_retrydelay: The number of seconds until a message is scheduled for reprocessing after a ROLLBACK. If not specified, the default value from the queue is used.

  • q_msg_enable_time: Time of message delay.

F.48.2.2.3. Functions for Reading Messages #

pgpro_queue provides the following functions for reading messages from the queue.

CREATE FUNCTION READ_MESSAGE( IN q_name name, IN q_msg_hfilter jsonb DEFAULT null, IN q_msg_pfilter jsonb DEFAULT null ) RETURNS SETOF jsonb
CREATE FUNCTION READ_MESSAGE_XML( IN q_name name, IN q_msg_hfilter jsonb DEFAULT null, IN q_msg_pfilter jsonb DEFAULT null ) RETURNS SETOF xml
CREATE FUNCTION READ_MESSAGE_ANY( IN q_name name, IN q_msg_hfilter jsonb DEFAULT null, IN q_msg_pfilter jsonb DEFAULT null, OUT q_jsonb jsonb, OUT q_xml xml ) RETURNS SETOF RECORD #

Retrieves a message from the queue as a read operation without waiting. Returns the message payload or null if the queue is empty. On a successful read, the message is removed from the queue table.

  • q_name: The name of the queue.

  • q_msg_hfilter: A header filter, applied in addition to a standard filter.

  • q_msg_pfilter: A properties filter, applied in addition to a standard filter.

  • q_jsonb: Returned JSON body value.

  • q_xml: Returned XML body value.

CREATE FUNCTION READ_MESSAGE_BY_ID( IN q_name name, IN msgid bigint ) RETURNS SETOF jsonb
CREATE FUNCTION READ_MESSAGE_BY_ID_XML( IN q_name name, IN msgid bigint ) RETURNS SETOF xml
CREATE FUNCTION READ_MESSAGE_BY_ID_ANY( IN q_name name, IN msgid bigint, OUT body jsonb, OUT body_xml xml ) RETURNS SETOF RECORD #

Retrieves a specific message from the queue by its ID as a read operation without waiting. Returns the message payload or null if the queue is empty. On a successful read, the message is removed from the queue table.

  • q_name: The name of the queue.

  • msgid: The message ID.

  • body: Returned JSON body value.

  • body_xml: Returned XML body value.

F.48.3. Authors #

Postgres Professional, Moscow, Russia