Hot Standby status - Mailing list pgsql-hackers

From Heikki Linnakangas
Subject Hot Standby status
Date
Msg-id 4AF9B2C8.6060108@enterprisedb.com
Whole thread Raw
Responses Re: Hot Standby status
Re: Hot Standby status
List pgsql-hackers
Attached is the latest and greatest patch against CVS head, taken from
the hs-riggs branch in my git repository.

These two TODO items I listed earlier have now been fixed:

> <done> allow connections after a shutdown checkpoint
>
> <done> don't clear locks belonging to prepared transactions at startup

And these still remain:

> - clarify default_transaction_read_only and transaction_read_only
>
http://archives.postgresql.org/message-id/4AB75A61.6040505@enterprisedb.com
>
> - rename references to "loggable locks" to "AccessExclusiveLocks in
> master" or similar
>
> - race condition in xact_redo_commit/abort
>
(http://archives.postgresql.org/message-id/4ABF539F.8050305@enterprisedb.com)
>
> - connection goes out of sync when an idle-in-transaction transaction is
> killed
>
(http://archives.postgresql.org/message-id/4ACF77A5.1070706@enterprisedb.com)

Those are pretty small things, shouldn't take long to fix. After all of
those TODO items have been addressed, the patch needs a round or two of
cleanup. Things have been moved around and changed so much that many of
the functions are probably not in a good place in code anymore.

As I said before:

> Let me know if I'm missing something. And please feel free to help, by
> testing, by reviewing and commenting on the patch, or by addressing any
> of the above issues. I will continue working on this, but this is a big
> patch so any help is much appreciated.

--
  Heikki Linnakangas
  EnterpriseDB   http://www.enterprisedb.com
diff --git a/doc/src/sgml/backup.sgml b/doc/src/sgml/backup.sgml
index b633e04..1902ed3 100644
--- a/doc/src/sgml/backup.sgml
+++ b/doc/src/sgml/backup.sgml
@@ -1883,6 +1883,667 @@ if (!triggered)
   </sect2>
  </sect1>

+ <sect1 id="hot-standby">
+  <title>Hot Standby</title>
+
+   <para>
+    Hot Standby is the term used to describe the ability to connect to
+    the server and run queries while the server is in archive recovery. This
+    is useful for both log shipping replication and for restoring a backup
+    to an exact state with great precision.
+    The term Hot Standby also refers to the ability of the server to move
+    from recovery through to normal running while users continue running
+    queries and/or continue their connections.
+   </para>
+
+   <para>
+    Running queries in recovery is in many ways the same as normal running
+    though there are a large number of usage and administrative points
+    to note.
+   </para>
+
+  <sect2 id="hot-standby-users">
+   <title>User's Overview</title>
+
+   <para>
+    Users can connect to the database while the server is in recovery
+    and perform read-only queries. Read-only access to catalogs and views
+    will also occur as normal.
+   </para>
+
+   <para>
+    The data on the standby takes some time to arrive from the primary server
+    so there will be a measurable delay between primary and standby.
+    Queries executed on the standby will be correct as of the data that had
+    been recovered at the start of the query (or start of first statement,
+    in the case of Serializable transactions). Running the same query nearly
+    simultaneously on both primary and standby might therefore return
+    differing results.     We say that data on the standby is eventually
+    consistent with the primary.
+   </para>
+
+   <para>
+    When a connection is made in recovery, the parameter
+    default_transaction_read_only will be forced to be true, whatever its
+    setting in postgresql.conf. As a result, all transactions started during
+    this time will be limited to read-only actions only. In all other ways,
+    connected sessions will appear identical to sessions initiated during
+    normal processing mode. There are no special commands required to
+    initiate a connection at this time, so all interfaces will work
+    normally without change.
+   </para>
+
+   <para>
+    Read-only here means "no writes to the permanent database tables".
+    There are no problems with queries that make use of temporary sort and
+    work files will be used.  Temporary tables cannot be created and
+    therefore cannot be used at all in recovery mode.
+   </para>
+
+   <para>
+    The following actions are allowed
+
+    <itemizedlist>
+     <listitem>
+      <para>
+       Query access - SELECT, COPY TO including views and SELECT RULEs
+      </para>
+     </listitem>
+     <listitem>
+      <para>
+       Cursor commands - DECLARE, FETCH, CLOSE,
+      </para>
+     </listitem>
+     <listitem>
+      <para>
+       Parameters - SHOW, SET, RESET
+      </para>
+     </listitem>
+     <listitem>
+      <para>
+       Transaction management commands
+        <itemizedlist>
+         <listitem>
+          <para>
+           BEGIN, END, ABORT, START TRANSACTION
+          </para>
+         </listitem>
+         <listitem>
+          <para>
+           SAVEPOINT, RELEASE, ROLLBACK TO SAVEPOINT
+          </para>
+         </listitem>
+         <listitem>
+          <para>
+           EXCEPTION blocks and other internal subtransactions
+          </para>
+         </listitem>
+        </itemizedlist>
+      </para>
+     </listitem>
+     <listitem>
+      <para>
+       LOCK TABLE, though only when explicitly IN ACCESS SHARE MODE
+      </para>
+     </listitem>
+     <listitem>
+      <para>
+       Plans and resources - PREPARE, EXECUTE, DEALLOCATE, DISCARD
+      </para>
+     </listitem>
+     <listitem>
+      <para>
+       Plugins and extensions - LOAD
+      </para>
+     </listitem>
+    </itemizedlist>
+   </para>
+
+   <para>
+    These actions produce error messages
+
+    <itemizedlist>
+     <listitem>
+      <para>
+       DML - Insert, Update, Delete, COPY FROM, Truncate.
+       Note that there are no actions that result in a trigger
+       being executed during recovery.
+      </para>
+     </listitem>
+     <listitem>
+      <para>
+       DDL - Create, Drop, Alter, Comment (even for temporary tables because
+       currently these cause writes to catalog tables)
+      </para>
+     </listitem>
+     <listitem>
+      <para>
+       SELECT ... FOR SHARE | UPDATE which cause row locks to be written
+      </para>
+     </listitem>
+     <listitem>
+      <para>
+       RULEs on SELECT statements that generate DML commands. RULEs on DML
+       commands that produce only SELECT statements are already disallowed
+       during read-only transactions.
+      </para>
+     </listitem>
+     <listitem>
+      <para>
+       LOCK TABLE, in short default form, since it requests ACCESS EXCLUSIVE MODE.
+       LOCK TABLE that explicitly requests a lock other than ACCESS SHARE MODE.
+      </para>
+     </listitem>
+     <listitem>
+      <para>
+       Transaction management commands that explicitly set non-read only state
+        <itemizedlist>
+         <listitem>
+          <para>
+            BEGIN READ WRITE,
+            START TRANSACTION READ WRITE
+          </para>
+         </listitem>
+         <listitem>
+          <para>
+            SET TRANSACTION READ WRITE,
+            SET SESSION CHARACTERISTICS AS TRANSACTION READ WRITE
+          </para>
+         </listitem>
+         <listitem>
+          <para>
+           SET transaction_read_only = off; or
+           SET default_transaction_read_only = off;
+          </para>
+         </listitem>
+        </itemizedlist>
+      </para>
+     </listitem>
+     <listitem>
+      <para>
+       Two-phase commit commands - PREPARE TRANSACTION, COMMIT PREPARED,
+       ROLLBACK PREPARED because even read-only transactions need to write
+       WAL in the prepare phase (the first phase of two phase commit).
+      </para>
+     </listitem>
+     <listitem>
+      <para>
+       sequence update - nextval()
+      </para>
+     </listitem>
+     <listitem>
+      <para>
+       LISTEN, UNLISTEN, NOTIFY since they currently write to system tables
+      </para>
+     </listitem>
+    </itemizedlist>
+   </para>
+
+   <para>
+    Note that current behaviour of read only transactions when not in
+    recovery is to allow the last two actions, so there are small and
+    subtle differences in behaviour between read-only transactions
+    run on standby and during normal running.
+    It is possible that the restrictions on LISTEN, UNLISTEN, NOTIFY and
+    temporary tables may be lifted in a future release, if their internal
+    implementation is altered to make this possible.
+   </para>
+
+   <para>
+    If failover or switchover occurs the database will switch to normal
+    processing mode. Sessions will remain connected while the server
+    changes mode. Current transactions will continue, though will remain
+    read-only. After this, it will be possible to initiate read-write
+    transactions, though users must explicitly reset their
+    default_transaction_read_only setting first, if they want that
+    behaviour.
+   </para>
+
+   <para>
+    Users will be able to tell whether their session is read-only by
+    issuing SHOW default_transaction_read_only.  In addition a set of
+    functions <xref linkend="functions-recovery-info-table"> allow users to
+    access information about Hot Standby. These allow you to write
+    functions that are aware of the current state of the database. These
+    can be used to monitor the progress of recovery, or to allow you to
+    write complex programs that restore the database to particular states.
+   </para>
+
+   <para>
+    In recovery, transactions will not be permitted to take any table lock
+    higher than AccessShareLock. In addition, transactions may never assign
+    a TransactionId and may never write WAL.
+    Any LOCK TABLE command that runs on the standby and requests a specific
+    lock type other than AccessShareLock will be rejected.
+   </para>
+
+   <para>
+    During recovery database changes are applied using full MVCC rules.
+    In general this means that queries will not experience lock conflicts
+    with writes, just like normal Postgres concurrency control (MVCC).
+   </para>
+  </sect2>
+
+  <sect2 id="hot-standby-conflict">
+   <title>Handling query conflicts</title>
+
+   <para>
+    There is some potential for conflict between standby queries
+    and WAL redo from the primary node. The user is provided with a number
+    of optional ways to handle these conflicts, though we must first
+    understand the possible reasons behind a conflict.
+
+      <itemizedlist>
+       <listitem>
+        <para>
+         Access Exclusive Locks from primary node, including both explicit
+         LOCK commands and various kinds of DDL action
+        </para>
+       </listitem>
+       <listitem>
+        <para>
+         Early cleanup of data still visible to the current query's snapshot
+        </para>
+       </listitem>
+       <listitem>
+        <para>
+         Dropping tablespaces on the primary while standby queries are using
+         those tablespace for temporary work files (work_mem overflow)
+        </para>
+       </listitem>
+       <listitem>
+        <para>
+         Dropping databases on the primary while that role is connected on standby.
+        </para>
+       </listitem>
+       <listitem>
+        <para>
+         Waiting to acquire buffer cleanup locks (for which there is no time out)
+        </para>
+       </listitem>
+      </itemizedlist>
+   </para>
+
+   <para>
+    Some WAL redo actions will be for DDL actions. These DDL actions are
+    repeating actions that have already committed on the primary node, so
+    they must not fail on the standby node. These DDL locks take priority
+    and will automatically *cancel* any read-only transactions that get in
+    their way, after a grace period. This is similar to the possibility of
+    being canceled by the deadlock detector, but in this case the standby
+    process always wins, since the replayed actions must not fail. This
+    also ensures that replication doesn't fall behind while we wait for a
+    query to complete. Again, we assume that the standby is there for high
+    availability purposes primarily.
+   </para>
+
+   <para>
+    An example of the above would be an Administrator on Primary server
+    runs a DROP TABLE command on a table that's currently being queried
+    in the standby server.
+   </para>
+
+   <para>
+    Clearly the query cannot continue if we let the DROP TABLE proceed. If
+    this situation occurred on the primary, the DROP TABLE would wait until
+    the query has finished. When the query is on the standby and the
+    DROP TABLE is on the primary, the primary doesn't have information about
+    what the standby is running and so does not wait on the primary. The
+    WAL change records come through to the standby while the query is still
+    running, causing a conflict.
+   </para>
+
+   <para>
+    The second reason for conflict between standby queries and WAL redo is
+    "early cleanup". Normally, PostgreSQL allows cleanup of old row versions
+    when there are no users who may need to see them to ensure correct
+    visibility of data (known as MVCC). If there is a standby query that has
+    been running for longer than any query on the primary then it is possible
+    for old row versions to be removed by either VACUUM or HOT. This will
+    then generate WAL records that, if applied, would remove data on the
+    standby that might *potentially* be required by the standby query.
+    In more technical language, the Primary's xmin horizon is later than
+    the Standby's xmin horizon, allowing dead rows to be removed.
+   </para>
+
+   <para>
+    We have a number of choices for resolving query conflicts.  The default
+    is that we wait and hope the query completes. The server will wait automatically until the lag between
+    primary and standby is at most max_standby_delay seconds. Once that grace
+    period expires, we take one of the following actions:
+
+      <itemizedlist>
+       <listitem>
+        <para>
+         If the conflict is caused by a lock, we cancel the standby transaction
+         immediately, even if it is idle-in-transaction.
+        </para>
+       </listitem>
+       <listitem>
+        <para>
+         If the conflict is caused by cleanup records we tell the standby query
+         that a conflict has occurred and that it must cancel itself to avoid the
+         risk that it silently fails to read relevant data because
+         that data has been removed. (This is very similar to the much feared
+         error message "snapshot too old").
+        </para>
+
+        <para>
+         Note also that this means that idle-in-transaction sessions are never
+         canceled except by locks. Users should be clear that tables that are
+         regularly and heavily updated on primary server will quickly cause
+         cancellation of any longer running queries in the standby.
+        </para>
+
+        <para>
+         If cancellation does occur, the query and/or transaction can always
+         be re-executed. The error is dynamic and will not necessarily occur
+         the same way if the query is executed again.
+        </para>
+       </listitem>
+      </itemizedlist>
+   </para>
+
+   <para>
+    Other remedial actions exist if the number of cancelations is unacceptable.
+    The first option is to connect to primary server and keep a query active
+    for as long as we need to run queries on the standby. This guarantees that
+    a WAL cleanup record is never generated and we don't ever get query
+    conflicts as described above. This could be done using contrib/dblink
+    and pg_sleep(), or via other mechanisms.
+   </para>
+
+   <para>
+    Note that max_standby_delay is set in recovery.conf. It applies to the
+    server as a whole, so once used it may not be available for other users.
+    They will have to wait for the server to catch up again before the grace
+    period is available again. So max_standby_delay is a configuration
+    parameter set by the administrator which controls the maximum acceptable
+    failover delay and is not a user-settable parameter to specify how long
+    their query needs to run in.
+   </para>
+
+   <para>
+    Waits for buffer cleanup locks do not currently result in query
+    cancelation. Long waits are uncommon, though can happen in some cases
+    with long running nested loop joins.
+   </para>
+
+   <para>
+    Dropping tablespaces or databases is discussed in the administrator's
+    section since they are not typical user situations.
+   </para>
+  </sect2>
+
+  <sect2 id="hot-standby-admin">
+   <title>Administrator's Overview</title>
+
+   <para>
+    If there is a recovery.conf file present the server will start in Hot Standby
+    mode by default, though this can be disabled by setting
+    "recovery_connections = off" in recovery.conf. The server may take some
+    time to enable recovery connections since the server must first complete
+    sufficient recovery to provide a consistent state against which queries
+    can run before enabling read only connections. Look for these messages
+    in the server logs
+
+<programlisting>
+LOG:  consistent recovery state reached
+LOG:  database system is ready to accept read only connections
+</programlisting>
+
+    If you are running file-based log shipping ("warm standby"), you may need
+    to wait until the next WAL file arrives, which could be as long as the
+    archive_timeout setting on the primary. This is because consistency
+    information is recorded once per checkpoint on the primary. The
+    consistent state can also be delayed in the presence of both transactions
+    that contain large numbers of subtransactions and long-lived transactions.
+   </para>
+
+   <para>
+    The setting of max_connections on the standby should be equal to or
+    greater than the setting of max_connections on the primary. This is to
+    ensure that standby has sufficient resources to manage incoming
+    transactions. max_prepared_transactions already has this restriction.
+   </para>
+
+   <para>
+    It is important that the administrator consider the appropriate setting
+    of "max_standby_delay", set in recovery,conf.  The default is 60 seconds,
+    though there is no optimal setting and it should be set according to
+    business priorities. For example if the server is primarily tasked as a
+    High Availability server, then you may wish to lower max_standby_delay
+    or even set it to zero. If the standby server is tasked as an additional
+    server for decision support queries then it may be acceptable to set this
+    to a value of many hours, e.g. max_standby_delay = 43200 (12 hours). It
+    is also possible to set max_standby_delay to -1 which means "always wait"
+    if there are conflicts, which will be useful when performing an archive
+    recovery from a backup.
+   </para>
+
+   <para>
+    Transaction status "hint bits" written on primary are not WAL-logged,
+    so data on standby will likely re-write the hints again on the standby.
+    Thus the main database blocks will produce write I/Os even though
+    all users are read-only; no changes have occurred to the data values
+    themselves.  Users will be able to write large sort temp files and
+    re-generate relcache info files, so there is no part of the database
+    that is truly read-only during hot standby mode. There is no restriction
+    on the use of set returning functions, or other users of tuplestore/tuplesort
+    code. Note also that writes to remote databases will still be possible,
+    even though the transaction is read-only locally.
+   </para>
+
+   <para>
+    The following types of administrator command are not accepted
+    during recovery mode
+
+      <itemizedlist>
+       <listitem>
+        <para>
+         Data Definition Language (DDL) - e.g. CREATE INDEX
+        </para>
+       </listitem>
+       <listitem>
+        <para>
+         Privilege and Ownership - GRANT, REVOKE, REASSIGN
+        </para>
+       </listitem>
+       <listitem>
+        <para>
+         Maintenance commands - ANALYZE, VACUUM, CLUSTER, REINDEX
+        </para>
+       </listitem>
+      </itemizedlist>
+   </para>
+
+   <para>
+    Note again that some of these commands are actually allowed during
+    "read only" mode transactions on the primary.
+   </para>
+
+   <para>
+    As a result, you cannot create additional indexes that exist solely
+    on the standby, nor can statistics that exist solely on the standby.
+   </para>
+
+   <para>
+    pg_cancel_backend() will work on user backends, but not the Startup
+    process, which performs recovery. pg_locks will show locks held by
+    backends as normal. pg_locks also shows a virtual transaction
+    managed by the Startup process that owns all AccessExclusiveLocks held
+    by transactions being replayed by recovery. pg_stat_activity does not
+    show an entry for the Startup process, nor do recovering transactions
+    show as active.
+   </para>
+
+   <para>
+    check_pgsql will work, but it is very simple. check_postgres will also
+    work, though many some actions could give different or confusing results.
+    e.g. last vacuum time will not be maintained for example, since no
+    vacuum occurs on the standby (though vacuums running on the primary do
+    send their changes to the standby).
+   </para>
+
+   <para>
+    WAL file control commands will not work during recovery
+    e.g. pg_start_backup(), pg_switch_xlog() etc..
+   </para>
+
+   <para>
+    Dynamically loadable modules work, including the pg_stat_statements.
+   </para>
+
+   <para>
+    Advisory locks work normally in recovery, including deadlock detection.
+    Note that advisory locks are never WAL logged, so it is not possible for
+    an advisory lock on either the primary or the standby to conflict with WAL
+    replay. Nor is it possible to acquire an advisory lock on the primary
+    and have it initiate a similar advisory lock on the standby. Advisory
+    locks relate only to a single server on which they are acquired.
+   </para>
+
+   <para>
+    Trigger-based replication systems (Slony, Londiste, Bucardo etc) won't
+    run on the standby at all, though they will run happily on the primary
+    server. WAL replay is not trigger-based so you cannot relay from the
+    standby to any system that requires additional database writes or
+    relies on the use of triggers.
+   </para>
+
+   <para>
+    New oids cannot be assigned, though some UUID generators may still
+    work as long as they do not rely on writing new status to the database.
+   </para>
+
+   <para>
+    Currently, creating temp tables is not allowed during read only
+    transactions, so in some cases existing scripts will not run correctly.
+    It is possible we may relax that restriction in a later release. This is
+    both a SQL Standard compliance issue and a technical issue, so will not
+    be resolved in this release.
+   </para>
+
+   <para>
+    DROP TABLESPACE can only succeed if the tablespace is empty. Some standby
+    users may be actively using the tablespace via their temp_tablespaces
+    parameter. If there are temp files in the tablespace we currently
+    cancel all active queries to ensure that temp files are removed, so
+    that we can remove the tablespace and continue with WAL replay.
+   </para>
+
+   <para>
+    Running DROP DATABASE, ALTER DATABASE SET TABLESPACE, or ALTER DATABASE
+    RENAME on primary will cause all users connected to that database on the
+    standby to be forcibly disconnected, once max_standby_delay has been
+    reached.
+   </para>
+
+   <para>
+    In normal running, if you issue DROP USER or DROP ROLE for a role with login
+    capability while that user is still connected then nothing happens to the
+    connected user - they remain connected. The user cannot reconnect however.
+    This behaviour applies in recovery also, so a DROP USER on the primary does
+    not disconnect that user on the standby.
+   </para>
+
+   <para>
+    Stats collector is active during recovery. All scans, reads, blocks,
+    index usage etc will all be recorded normally on the standby. Replayed
+    actions will not duplicate their effects on primary, so replaying an
+    insert will not increment the Inserts column of pg_stat_user_tables.
+    The stats file is deleted at start of recovery, so stats from primary
+    and standby will differ; this is considered a feature not a bug.
+   </para>
+
+   <para>
+    Autovacuum is not active during recovery, though will start normally
+    at the end of recovery.
+   </para>
+
+   <para>
+    Background writer is active during recovery and will perform
+    restartpoints (similar to checkpoints on primary) and normal block
+    cleaning activities. The CHECKPOINT command is accepted during recovery,
+    though performs a restartpoint rather than a new checkpoint.
+   </para>
+  </sect2>
+
+  <sect2 id="hot-standby-parameters">
+   <title>Hot Standby Parameter Reference</title>
+
+   <para>
+    The following additional parameters are supported/provided within the
+    <filename>recovery.conf</>.
+
+     <variablelist>
+
+     <varlistentry id="recovery-connections" xreflabel="recovery_connections">
+      <term><varname>recovery_connections</varname> (<type>boolean</type>)</term>
+      <listitem>
+       <para>
+        Specifies whether you would like to connect during recovery, or not.
+        The default is on, though you may wish to disable it to avoid
+        software problems, should they occur. Parameter can only be changed
+        be stopping and restarting the server.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry id="max-standby-delay" xreflabel="max_standby_delay">
+      <term><varname>max_standby_delay</varname> (<type>string</type>)</term>
+      <listitem>
+       <para>
+        This parameter allows the Administrator to set a wait policy for
+        queries that conflict with incoming data changes. Valid settings
+        are -1, meaning wait forever, or a wait time of 0 or more seconds.
+        If a conflict should occur the server will delay up to this
+        amount before it begins trying to resolve things less amicably, as
+        described in <xref linkend="hot-standby-conflict">. Typically,
+        this parameter makes sense only during replication, so when
+        performing an archive recovery to recover from data loss a
+        parameter setting of 0 is recommended.
+        Parameter can only be changed be stopping and restarting the
+        server.
+       </para>
+      </listitem>
+     </varlistentry>
+
+   </variablelist>
+   </para>
+  </sect2>
+
+  <sect2 id="hot-standby-caveats">
+   <title>Caveats</title>
+
+   <para>
+    At this writing, there are several limitations of Hot Standby.
+    These can and probably will be fixed in future releases:
+
+  <itemizedlist>
+   <listitem>
+    <para>
+     Operations on hash indexes are not presently WAL-logged, so
+     replay will not update these indexes.  Hash indexes will not be
+     available for use when running queries during recovery.
+    </para>
+   </listitem>
+   <listitem>
+    <para>
+     Full knowledge of running transactions is required before snapshots
+     may be taken. Transactions that take use large numbers of subtransactions
+     (currently greater than 64) will delay the start of read only
+     connections until the completion of the longest running write transaction.
+     If this situation occurs explanatory messages will be sent to server log.
+    </para>
+   </listitem>
+  </itemizedlist>
+
+   </para>
+  </sect2>
+
+ </sect1>
+
  <sect1 id="migration">
   <title>Migration Between Releases</title>

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index eb2071f..741f2d0 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -376,6 +376,12 @@ SET ENABLE_SEQSCAN TO OFF;
         allows. See <xref linkend="sysvipc"> for information on how to
         adjust those parameters, if necessary.
        </para>
+
+       <para>
+        When running a standby server, you must set this parameter to the
+        same or higher value than on the master server. Otherwise, queries
+        will not be allowed in the standby server.
+       </para>
       </listitem>
      </varlistentry>

@@ -5476,6 +5482,32 @@ plruby.use_strict = true        # generates error: unknown class name
       </listitem>
      </varlistentry>

+     <varlistentry id="guc-trace-recovery-messages" xreflabel="trace_recovery_messages">
+      <term><varname>trace_recovery_messages</varname> (<type>string</type>)</term>
+      <indexterm>
+       <primary><varname>trace_recovery_messages</> configuration parameter</primary>
+      </indexterm>
+      <listitem>
+       <para>
+        Controls which message levels are written to the server log
+        for system modules needed for recovery processing. This allows
+        the user to override the normal setting of log_min_messages,
+        but only for specific messages. This is intended for use in
+        debugging Hot Standby.
+        Valid values are <literal>DEBUG5</>, <literal>DEBUG4</>,
+        <literal>DEBUG3</>, <literal>DEBUG2</>, <literal>DEBUG1</>,
+        <literal>INFO</>, <literal>NOTICE</>, <literal>WARNING</>,
+        <literal>ERROR</>, <literal>LOG</>, <literal>FATAL</>, and
+        <literal>PANIC</>.  Each level includes all the levels that
+        follow it.  The later the level, the fewer messages are sent
+        to the log.  The default is <literal>WARNING</>.  Note that
+        <literal>LOG</> has a different rank here than in
+        <varname>client_min_messages</>.
+        Parameter should be set in the postgresql.conf only.
+       </para>
+      </listitem>
+     </varlistentry>
+
     <varlistentry id="guc-zero-damaged-pages" xreflabel="zero_damaged_pages">
       <term><varname>zero_damaged_pages</varname> (<type>boolean</type>)</term>
       <indexterm>
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 50ba094..aa1a167 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -13121,6 +13121,39 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
     <xref linkend="continuous-archiving">.
    </para>

+   <indexterm>
+    <primary>pg_is_in_recovery</primary>
+   </indexterm>
+
+   <para>
+    The functions shown in <xref
+    linkend="functions-recovery-info-table"> provide information
+    about the current status of Hot Standby.
+    These functions may be executed during both recovery and in normal running.
+   </para>
+
+   <table id="functions-recovery-info-table">
+    <title>Recovery Information Functions</title>
+    <tgroup cols="3">
+     <thead>
+      <row><entry>Name</entry> <entry>Return Type</entry> <entry>Description</entry>
+      </row>
+     </thead>
+
+     <tbody>
+      <row>
+       <entry>
+        <literal><function>pg_is_in_recovery</function>()</literal>
+        </entry>
+       <entry><type>bool</type></entry>
+       <entry>True if recovery is still in progress. If you wish to
+        know more detailed status information use <function>pg_current_recovery_target</>.
+       </entry>
+      </row>
+     </tbody>
+    </tgroup>
+   </table>
+
    <para>
     The functions shown in <xref linkend="functions-admin-dbsize"> calculate
     the disk space usage of database objects.
diff --git a/doc/src/sgml/ref/checkpoint.sgml b/doc/src/sgml/ref/checkpoint.sgml
index d2992e4..ee4a09f 100644
--- a/doc/src/sgml/ref/checkpoint.sgml
+++ b/doc/src/sgml/ref/checkpoint.sgml
@@ -43,6 +43,11 @@ CHECKPOINT
   </para>

   <para>
+   If executed during recovery, the <command>CHECKPOINT</command> command
+   will force a restartpoint rather than writing a new checkpoint.
+  </para>
+
+  <para>
    Only superusers can call <command>CHECKPOINT</command>.  The command is
    not intended for use during normal operation.
   </para>
diff --git a/src/backend/access/gin/ginxlog.c b/src/backend/access/gin/ginxlog.c
index 1ebd15e..b750f57 100644
--- a/src/backend/access/gin/ginxlog.c
+++ b/src/backend/access/gin/ginxlog.c
@@ -621,6 +621,10 @@ gin_redo(XLogRecPtr lsn, XLogRecord *record)
 {
     uint8        info = record->xl_info & ~XLR_INFO_MASK;

+    /*
+     * GIN indexes do not require any conflict processing.
+     */
+
     RestoreBkpBlocks(lsn, record, false);

     topCtx = MemoryContextSwitchTo(opCtx);
diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c
index 4a20d90..adf27ff 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -396,6 +396,12 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record)
     uint8        info = record->xl_info & ~XLR_INFO_MASK;
     MemoryContext oldCxt;

+    /*
+     * GIST indexes do not require any conflict processing. NB: If we ever
+     * implement a similar optimization we have in b-tree, and remove killed
+     * tuples outside VACUUM, we'll need to handle that here.
+     */
+
     RestoreBkpBlocks(lsn, record, false);

     oldCxt = MemoryContextSwitchTo(opCtx);
diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c
index e2aed44..33dbcae 100644
--- a/src/backend/access/hash/hash.c
+++ b/src/backend/access/hash/hash.c
@@ -371,6 +371,17 @@ hashbeginscan(PG_FUNCTION_ARGS)
     IndexScanDesc scan;
     HashScanOpaque so;

+    /*
+     * Hash indexes are not recoverable, so cannot ever be used
+     * during recovery mode. We try to avoid this by tweaking the
+     * cost of hash index scans during recovery (see selfuncs.c),
+     * but we may still get called, so specifically prevent scans here.
+     * XXX We expect at some point to be able to exclude index scans on
+     * non-recoverable index types at the index AM level.
+     */
+    if (RecoveryInProgress())
+        elog(ERROR, "Cannot use hash indexes during recovery");
+
     scan = RelationGetIndexScan(rel, keysz, scankey);
     so = (HashScanOpaque) palloc(sizeof(HashScanOpaqueData));
     so->hashso_bucket_valid = false;
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index b0a911e..c14a179 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -3770,19 +3770,77 @@ heap_restrpos(HeapScanDesc scan)
 }

 /*
+ * If 'tuple' contains any XID greater than latestRemovedXid, update
+ * latestRemovedXid to the greatest one found.
+ */
+void
+HeapTupleHeaderAdvanceLatestRemovedXid(HeapTupleHeader tuple,
+                                       TransactionId *latestRemovedXid)
+{
+    TransactionId xmin = HeapTupleHeaderGetXmin(tuple);
+    TransactionId xmax = HeapTupleHeaderGetXmax(tuple);
+    TransactionId xvac = HeapTupleHeaderGetXvac(tuple);
+
+    if (tuple->t_infomask & HEAP_MOVED_OFF ||
+        tuple->t_infomask & HEAP_MOVED_IN)
+    {
+        if (TransactionIdPrecedes(*latestRemovedXid, xvac))
+            *latestRemovedXid = xvac;
+    }
+
+    if (TransactionIdPrecedes(*latestRemovedXid, xmax))
+        *latestRemovedXid = xmax;
+
+    if (TransactionIdPrecedes(*latestRemovedXid, xmin))
+        *latestRemovedXid = xmin;
+
+    Assert(TransactionIdIsValid(*latestRemovedXid));
+}
+
+/*
+ * Perform XLogInsert to register a heap cleanup info message. These
+ * messages are sent once per VACUUM and are required because
+ * of the phasing of removal operations during a lazy VACUUM.
+ * see comments for vacuum_log_cleanup_info().
+ */
+XLogRecPtr
+log_heap_cleanup_info(RelFileNode rnode, TransactionId latestRemovedXid)
+{
+    xl_heap_cleanup_info xlrec;
+    XLogRecPtr    recptr;
+    XLogRecData rdata;
+
+    xlrec.node = rnode;
+    xlrec.latestRemovedXid = latestRemovedXid;
+
+    rdata.data = (char *) &xlrec;
+    rdata.len = SizeOfHeapCleanupInfo;
+    rdata.buffer = InvalidBuffer;
+    rdata.next = NULL;
+
+    recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_CLEANUP_INFO, &rdata);
+
+    return recptr;
+}
+
+/*
  * Perform XLogInsert for a heap-clean operation.  Caller must already
  * have modified the buffer and marked it dirty.
  *
  * Note: prior to Postgres 8.3, the entries in the nowunused[] array were
  * zero-based tuple indexes.  Now they are one-based like other uses
  * of OffsetNumber.
+ *
+ * We also include latestRemovedXid, which is the greatest XID present in
+ * the removed tuples. That allows recovery processing to cancel or wait
+ * for long standby queries that can still see these tuples.
  */
 XLogRecPtr
 log_heap_clean(Relation reln, Buffer buffer,
                OffsetNumber *redirected, int nredirected,
                OffsetNumber *nowdead, int ndead,
                OffsetNumber *nowunused, int nunused,
-               bool redirect_move)
+               TransactionId latestRemovedXid, bool redirect_move)
 {
     xl_heap_clean xlrec;
     uint8        info;
@@ -3794,6 +3852,7 @@ log_heap_clean(Relation reln, Buffer buffer,

     xlrec.node = reln->rd_node;
     xlrec.block = BufferGetBlockNumber(buffer);
+    xlrec.latestRemovedXid = latestRemovedXid;
     xlrec.nredirected = nredirected;
     xlrec.ndead = ndead;

@@ -4068,6 +4127,34 @@ log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno,
 }

 /*
+ * Handles CLEANUP_INFO
+ */
+static void
+heap_xlog_cleanup_info(XLogRecPtr lsn, XLogRecord *record)
+{
+    xl_heap_cleanup_info *xlrec = (xl_heap_cleanup_info *) XLogRecGetData(record);
+
+    if (InHotStandby)
+    {
+        VirtualTransactionId *backends;
+
+        backends = GetConflictingVirtualXIDs(xlrec->latestRemovedXid,
+                                                    InvalidOid,
+                                                    true);
+        ResolveRecoveryConflictWithVirtualXIDs(backends,
+                                                "VACUUM index cleanup",
+                                                CONFLICT_MODE_ERROR_DEFERRABLE,
+                                                lsn);
+    }
+
+    /*
+     * Actual operation is a no-op. Record type exists to provide a means
+     * for conflict processing to occur before we begin index vacuum actions.
+     * see vacuumlazy.c
+     */
+}
+
+/*
  * Handles CLEAN and CLEAN_MOVE record types
  */
 static void
@@ -4085,12 +4172,32 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record, bool clean_move)
     int            nunused;
     Size        freespace;

+    /*
+     * We're about to remove tuples. In Hot Standby mode, ensure that there's
+     * no queries running for which the removed tuples are still visible.
+     */
+    if (InHotStandby)
+    {
+        VirtualTransactionId *backends;
+
+        backends = GetConflictingVirtualXIDs(xlrec->latestRemovedXid,
+                                                    InvalidOid,
+                                                    true);
+        ResolveRecoveryConflictWithVirtualXIDs(backends,
+                                                "VACUUM heap cleanup",
+                                                CONFLICT_MODE_ERROR_DEFERRABLE,
+                                                lsn);
+    }
+
+    RestoreBkpBlocks(lsn, record, true);
+
     if (record->xl_info & XLR_BKP_BLOCK_1)
         return;

-    buffer = XLogReadBuffer(xlrec->node, xlrec->block, false);
+    buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, xlrec->block, RBM_NORMAL);
     if (!BufferIsValid(buffer))
         return;
+    LockBufferForCleanup(buffer);
     page = (Page) BufferGetPage(buffer);

     if (XLByteLE(lsn, PageGetLSN(page)))
@@ -4145,12 +4252,19 @@ heap_xlog_freeze(XLogRecPtr lsn, XLogRecord *record)
     Buffer        buffer;
     Page        page;

+    /*
+     * Freezing tuples does not require conflict processing
+     */
+
+    RestoreBkpBlocks(lsn, record, false);
+
     if (record->xl_info & XLR_BKP_BLOCK_1)
         return;

-    buffer = XLogReadBuffer(xlrec->node, xlrec->block, false);
+    buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, xlrec->block, RBM_NORMAL);
     if (!BufferIsValid(buffer))
         return;
+    LockBufferForCleanup(buffer);
     page = (Page) BufferGetPage(buffer);

     if (XLByteLE(lsn, PageGetLSN(page)))
@@ -4740,6 +4854,11 @@ heap_redo(XLogRecPtr lsn, XLogRecord *record)
 {
     uint8        info = record->xl_info & ~XLR_INFO_MASK;

+    /*
+     * These operations don't overwrite MVCC data so no conflict
+     * processing is required. The ones in heap2 rmgr do.
+     */
+
     RestoreBkpBlocks(lsn, record, false);

     switch (info & XLOG_HEAP_OPMASK)
@@ -4781,17 +4900,17 @@ heap2_redo(XLogRecPtr lsn, XLogRecord *record)
     switch (info & XLOG_HEAP_OPMASK)
     {
         case XLOG_HEAP2_FREEZE:
-            RestoreBkpBlocks(lsn, record, false);
             heap_xlog_freeze(lsn, record);
             break;
         case XLOG_HEAP2_CLEAN:
-            RestoreBkpBlocks(lsn, record, true);
             heap_xlog_clean(lsn, record, false);
             break;
         case XLOG_HEAP2_CLEAN_MOVE:
-            RestoreBkpBlocks(lsn, record, true);
             heap_xlog_clean(lsn, record, true);
             break;
+        case XLOG_HEAP2_CLEANUP_INFO:
+            heap_xlog_cleanup_info(lsn, record);
+            break;
         default:
             elog(PANIC, "heap2_redo: unknown op code %u", info);
     }
@@ -4921,17 +5040,26 @@ heap2_desc(StringInfo buf, uint8 xl_info, char *rec)
     {
         xl_heap_clean *xlrec = (xl_heap_clean *) rec;

-        appendStringInfo(buf, "clean: rel %u/%u/%u; blk %u",
+        appendStringInfo(buf, "clean: rel %u/%u/%u; blk %u remxid %u",
                          xlrec->node.spcNode, xlrec->node.dbNode,
-                         xlrec->node.relNode, xlrec->block);
+                         xlrec->node.relNode, xlrec->block,
+                         xlrec->latestRemovedXid);
     }
     else if (info == XLOG_HEAP2_CLEAN_MOVE)
     {
         xl_heap_clean *xlrec = (xl_heap_clean *) rec;

-        appendStringInfo(buf, "clean_move: rel %u/%u/%u; blk %u",
+        appendStringInfo(buf, "clean_move: rel %u/%u/%u; blk %u remxid %u",
                          xlrec->node.spcNode, xlrec->node.dbNode,
-                         xlrec->node.relNode, xlrec->block);
+                         xlrec->node.relNode, xlrec->block,
+                         xlrec->latestRemovedXid);
+    }
+    else if (info == XLOG_HEAP2_CLEANUP_INFO)
+    {
+        xl_heap_cleanup_info *xlrec = (xl_heap_cleanup_info *) rec;
+
+        appendStringInfo(buf, "cleanup info: remxid %u",
+                         xlrec->latestRemovedXid);
     }
     else
         appendStringInfo(buf, "UNKNOWN");
diff --git a/src/backend/access/heap/pruneheap.c b/src/backend/access/heap/pruneheap.c
index 0d5974f..4793929 100644
--- a/src/backend/access/heap/pruneheap.c
+++ b/src/backend/access/heap/pruneheap.c
@@ -30,7 +30,8 @@
 typedef struct
 {
     TransactionId new_prune_xid;    /* new prune hint value for page */
-    int            nredirected;    /* numbers of entries in arrays below */
+    TransactionId latestRemovedXid; /* latest xid to be removed by this prune */
+    int            nredirected;        /* numbers of entries in arrays below */
     int            ndead;
     int            nunused;
     /* arrays that accumulate indexes of items to be changed */
@@ -85,6 +86,14 @@ heap_page_prune_opt(Relation relation, Buffer buffer, TransactionId OldestXmin)
         return;

     /*
+     * We can't write WAL in recovery mode, so there's no point trying to
+     * clean the page. The master will likely issue a cleaning WAL record
+     * soon anyway, so this is no particular loss.
+     */
+    if (RecoveryInProgress())
+        return;
+
+    /*
      * We prune when a previous UPDATE failed to find enough space on the page
      * for a new tuple version, or when free space falls below the relation's
      * fill-factor target (but not less than 10%).
@@ -176,6 +185,7 @@ heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
      * of our working state.
      */
     prstate.new_prune_xid = InvalidTransactionId;
+    prstate.latestRemovedXid = InvalidTransactionId;
     prstate.nredirected = prstate.ndead = prstate.nunused = 0;
     memset(prstate.marked, 0, sizeof(prstate.marked));

@@ -257,7 +267,7 @@ heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
                                     prstate.redirected, prstate.nredirected,
                                     prstate.nowdead, prstate.ndead,
                                     prstate.nowunused, prstate.nunused,
-                                    redirect_move);
+                                    prstate.latestRemovedXid, redirect_move);

             PageSetLSN(BufferGetPage(buffer), recptr);
             PageSetTLI(BufferGetPage(buffer), ThisTimeLineID);
@@ -395,6 +405,8 @@ heap_prune_chain(Relation relation, Buffer buffer, OffsetNumber rootoffnum,
                 == HEAPTUPLE_DEAD && !HeapTupleHeaderIsHotUpdated(htup))
             {
                 heap_prune_record_unused(prstate, rootoffnum);
+                HeapTupleHeaderAdvanceLatestRemovedXid(htup,
+                                                       &prstate->latestRemovedXid);
                 ndeleted++;
             }

@@ -520,7 +532,11 @@ heap_prune_chain(Relation relation, Buffer buffer, OffsetNumber rootoffnum,
          * find another DEAD tuple is a fairly unusual corner case.)
          */
         if (tupdead)
+        {
             latestdead = offnum;
+            HeapTupleHeaderAdvanceLatestRemovedXid(htup,
+                                                   &prstate->latestRemovedXid);
+        }
         else if (!recent_dead)
             break;

diff --git a/src/backend/access/index/genam.c b/src/backend/access/index/genam.c
index 1c1cd34..ec26f9b 100644
--- a/src/backend/access/index/genam.c
+++ b/src/backend/access/index/genam.c
@@ -91,8 +91,19 @@ RelationGetIndexScan(Relation indexRelation,
     else
         scan->keyData = NULL;

+    /*
+     * During recovery we ignore killed tuples and don't bother to kill them
+     * either. We do this because the xmin on the primary node could easily
+     * be later than the xmin on the standby node, so that what the primary
+     * thinks is killed is supposed to be visible on standby. So for correct
+     * MVCC for queries during recovery we must ignore these hints and check
+     * all tuples. Do *not* set ignore_killed_tuples to true when running
+     * in a transaction that was started during recovery. AMs can set it to
+     * false at any time. xactStartedInRecovery should not be touched by AMs.
+     */
     scan->kill_prior_tuple = false;
-    scan->ignore_killed_tuples = true;    /* default setting */
+    scan->xactStartedInRecovery = TransactionStartedDuringRecovery();
+    scan->ignore_killed_tuples = !scan->xactStartedInRecovery;

     scan->opaque = NULL;

diff --git a/src/backend/access/index/indexam.c b/src/backend/access/index/indexam.c
index c86cd52..69c9473 100644
--- a/src/backend/access/index/indexam.c
+++ b/src/backend/access/index/indexam.c
@@ -455,9 +455,12 @@ index_getnext(IndexScanDesc scan, ScanDirection direction)

             /*
              * If we scanned a whole HOT chain and found only dead tuples,
-             * tell index AM to kill its entry for that TID.
+             * tell index AM to kill its entry for that TID. We do not do
+             * this when in recovery because it may violate MVCC to do so.
+             * see comments in RelationGetIndexScan().
              */
-            scan->kill_prior_tuple = scan->xs_hot_dead;
+            if (!scan->xactStartedInRecovery)
+                scan->kill_prior_tuple = scan->xs_hot_dead;

             /*
              * The AM's gettuple proc finds the next index entry matching the
diff --git a/src/backend/access/nbtree/README b/src/backend/access/nbtree/README
index 81d56b3..8739d9c 100644
--- a/src/backend/access/nbtree/README
+++ b/src/backend/access/nbtree/README
@@ -401,6 +401,27 @@ of the WAL entry.)  If the parent page becomes half-dead but is not
 immediately deleted due to a subsequent crash, there is no loss of
 consistency, and the empty page will be picked up by the next VACUUM.

+Scans during Recovery
+---------------------
+
+The btree index type can be safely used during recovery. During recovery
+we have at most one writer and potentially many readers. In that
+situation the locking requirements can be relaxed and we do not need
+double locking during block splits. Each WAL record makes changes to a
+single level of the btree using the correct locking sequence and so
+is safe for concurrent readers. Some readers may observe a block split
+in progress as they descend the tree, but they will simply move right
+onto the correct page.
+
+During recovery all index scans start with ignore_killed_tuples = false
+and we never set kill_prior_tuple. We do this because the oldest xmin
+on the standby server can be older than the oldest xmin on the master
+server, which means tuples can be marked as killed even when they are
+still visible on the standby. We don't WAL log tuple killed bits, but
+they can still appear in the standby because of full page writes. So
+we must always ignore them in standby, and that means it's not worth
+setting them either.
+
 Other Things That Are Handy to Know
 -----------------------------------

diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c
index a983571..841ffba 100644
--- a/src/backend/access/nbtree/nbtinsert.c
+++ b/src/backend/access/nbtree/nbtinsert.c
@@ -2025,7 +2025,7 @@ _bt_vacuum_one_page(Relation rel, Buffer buffer)
     }

     if (ndeletable > 0)
-        _bt_delitems(rel, buffer, deletable, ndeletable);
+        _bt_delitems(rel, buffer, deletable, ndeletable, false, 0);

     /*
      * Note: if we didn't find any LP_DEAD items, then the page's
diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c
index 901b2b5..7871524 100644
--- a/src/backend/access/nbtree/nbtpage.c
+++ b/src/backend/access/nbtree/nbtpage.c
@@ -653,14 +653,24 @@ _bt_page_recyclable(Page page)
  *
  * This routine assumes that the caller has pinned and locked the buffer.
  * Also, the given itemnos *must* appear in increasing order in the array.
+ *
+ * We record VACUUMs and b-tree deletes differently in WAL. InHotStandby
+ * we need to be able to pin all of the blocks in the btree in physical
+ * order when replaying the effects of a VACUUM, just as we do for the
+ * original VACUUM itself. lastBlockVacuumed allows us to tell whether an
+ * intermediate range of blocks has had no changes at all by VACUUM,
+ * and so must be scanned anyway during replay.
  */
 void
 _bt_delitems(Relation rel, Buffer buf,
-             OffsetNumber *itemnos, int nitems)
+             OffsetNumber *itemnos, int nitems, bool isVacuum,
+             BlockNumber lastBlockVacuumed)
 {
     Page        page = BufferGetPage(buf);
     BTPageOpaque opaque;

+    Assert(isVacuum || lastBlockVacuumed == 0);
+
     /* No ereport(ERROR) until changes are logged */
     START_CRIT_SECTION();

@@ -688,15 +698,36 @@ _bt_delitems(Relation rel, Buffer buf,
     /* XLOG stuff */
     if (!rel->rd_istemp)
     {
-        xl_btree_delete xlrec;
         XLogRecPtr    recptr;
         XLogRecData rdata[2];

-        xlrec.node = rel->rd_node;
-        xlrec.block = BufferGetBlockNumber(buf);
+        if (isVacuum)
+        {
+            xl_btree_vacuum xlrec_vacuum;
+            xlrec_vacuum.node = rel->rd_node;
+            xlrec_vacuum.block = BufferGetBlockNumber(buf);
+
+            xlrec_vacuum.lastBlockVacuumed = lastBlockVacuumed;
+            rdata[0].data = (char *) &xlrec_vacuum;
+            rdata[0].len = SizeOfBtreeVacuum;
+        }
+        else
+        {
+            xl_btree_delete xlrec_delete;
+            xlrec_delete.node = rel->rd_node;
+            xlrec_delete.block = BufferGetBlockNumber(buf);
+
+            /*
+             * We would like to set an accurate latestRemovedXid, but there
+             * is no easy way of obtaining a useful value. So we use the
+             * probably far too conservative value of RecentGlobalXmin instead.
+             * XXX: this comment is bogus? We don't use RecentGlobalXmin
+             */
+            xlrec_delete.latestRemovedXid = InvalidTransactionId;
+            rdata[0].data = (char *) &xlrec_delete;
+            rdata[0].len = SizeOfBtreeDelete;
+        }

-        rdata[0].data = (char *) &xlrec;
-        rdata[0].len = SizeOfBtreeDelete;
         rdata[0].buffer = InvalidBuffer;
         rdata[0].next = &(rdata[1]);

@@ -719,7 +750,10 @@ _bt_delitems(Relation rel, Buffer buf,
         rdata[1].buffer_std = true;
         rdata[1].next = NULL;

-        recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_DELETE, rdata);
+        if (isVacuum)
+            recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_VACUUM, rdata);
+        else
+            recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_DELETE, rdata);

         PageSetLSN(page, recptr);
         PageSetTLI(page, ThisTimeLineID);
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index 79d4c66..525d838 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -57,7 +57,8 @@ typedef struct
     IndexBulkDeleteCallback callback;
     void       *callback_state;
     BTCycleId    cycleid;
-    BlockNumber lastUsedPage;
+    BlockNumber lastBlockVacuumed;     /* last blkno reached by Vacuum scan */
+    BlockNumber lastUsedPage;        /* blkno of last non-recyclable page */
     BlockNumber totFreePages;    /* true total # of free pages */
     MemoryContext pagedelcontext;
 } BTVacState;
@@ -629,6 +630,7 @@ btvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
     vstate.callback = callback;
     vstate.callback_state = callback_state;
     vstate.cycleid = cycleid;
+    vstate.lastBlockVacuumed = BTREE_METAPAGE; /* Initialise at first block */
     vstate.lastUsedPage = BTREE_METAPAGE;
     vstate.totFreePages = 0;

@@ -858,7 +860,19 @@ restart:
          */
         if (ndeletable > 0)
         {
-            _bt_delitems(rel, buf, deletable, ndeletable);
+            BlockNumber    lastBlockVacuumed = BufferGetBlockNumber(buf);
+
+            _bt_delitems(rel, buf, deletable, ndeletable, true, vstate->lastBlockVacuumed);
+
+            /*
+             * Keep track of the block number of the lastBlockVacuumed, so
+             * we can scan those blocks as well during WAL replay. This then
+             * provides concurrency protection and allows btrees to be used
+             * while in recovery.
+             */
+            if (lastBlockVacuumed > vstate->lastBlockVacuumed)
+                vstate->lastBlockVacuumed = lastBlockVacuumed;
+
             stats->tuples_removed += ndeletable;
             /* must recompute maxoff */
             maxoff = PageGetMaxOffsetNumber(page);
diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c
index 895d641..b982cdf 100644
--- a/src/backend/access/nbtree/nbtxlog.c
+++ b/src/backend/access/nbtree/nbtxlog.c
@@ -16,7 +16,11 @@

 #include "access/nbtree.h"
 #include "access/transam.h"
+#include "access/xact.h"
 #include "storage/bufmgr.h"
+#include "storage/procarray.h"
+#include "utils/inval.h"
+#include "miscadmin.h"

 /*
  * We must keep track of expected insertions due to page splits, and apply
@@ -459,6 +463,96 @@ btree_xlog_split(bool onleft, bool isroot,
 }

 static void
+btree_xlog_vacuum(XLogRecPtr lsn, XLogRecord *record)
+{
+    xl_btree_vacuum *xlrec;
+    Buffer        buffer;
+    Page        page;
+    BTPageOpaque opaque;
+
+    xlrec = (xl_btree_vacuum *) XLogRecGetData(record);
+
+    /*
+     * We need to ensure every block is unpinned between the
+     * lastBlockVacuumed and the current block, if there are any.
+     * This ensures that every block in the index is touched during
+     * VACUUM as required to ensure scans work correctly.
+     */
+    if (standbyState == STANDBY_READY &&
+        (xlrec->lastBlockVacuumed + 1) != xlrec->block)
+    {
+        BlockNumber blkno = xlrec->lastBlockVacuumed + 1;
+
+        for (; blkno < xlrec->block; blkno++)
+        {
+            /*
+             * XXX we don't actually need to read the block, we
+             * just need to confirm it is unpinned. If we had a special call
+             * into the buffer manager we could optimise this so that
+             * if the block is not in shared_buffers we confirm it as unpinned.
+             *
+             * Another simple optimization would be to check if there's any
+             * backends running; if not, we could just skip this.
+             */
+            buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, blkno, RBM_NORMAL);
+            if (BufferIsValid(buffer))
+            {
+                LockBufferForCleanup(buffer);
+                UnlockReleaseBuffer(buffer);
+            }
+        }
+    }
+
+    /*
+     * If the block was restored from a full page image, nothing more to do.
+     * The RestoreBkpBlocks() call already pinned and took cleanup lock on
+     * it. XXX: Perhaps we should call RestoreBkpBlocks() *after* the loop
+     * above, to make the disk access more sequential.
+     */
+    if (record->xl_info & XLR_BKP_BLOCK_1)
+        return;
+
+    /*
+     * Like in btvacuumpage(), we need to take a cleanup lock on every leaf
+     * page. See nbtree/README for details.
+     */
+    buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, xlrec->block, RBM_NORMAL);
+    if (!BufferIsValid(buffer))
+        return;
+    LockBufferForCleanup(buffer);
+    page = (Page) BufferGetPage(buffer);
+
+    if (XLByteLE(lsn, PageGetLSN(page)))
+    {
+        UnlockReleaseBuffer(buffer);
+        return;
+    }
+
+    if (record->xl_len > SizeOfBtreeVacuum)
+    {
+        OffsetNumber *unused;
+        OffsetNumber *unend;
+
+        unused = (OffsetNumber *) ((char *) xlrec + SizeOfBtreeVacuum);
+        unend = (OffsetNumber *) ((char *) xlrec + record->xl_len);
+
+        PageIndexMultiDelete(page, unused, unend - unused);
+    }
+
+    /*
+     * Mark the page as not containing any LP_DEAD items --- see comments in
+     * _bt_delitems().
+     */
+    opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+    opaque->btpo_flags &= ~BTP_HAS_GARBAGE;
+
+    PageSetLSN(page, lsn);
+    PageSetTLI(page, ThisTimeLineID);
+    MarkBufferDirty(buffer);
+    UnlockReleaseBuffer(buffer);
+}
+
+static void
 btree_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
 {
     xl_btree_delete *xlrec;
@@ -470,6 +564,11 @@ btree_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
         return;

     xlrec = (xl_btree_delete *) XLogRecGetData(record);
+
+    /*
+     * We don't need to take a cleanup lock to apply these changes.
+     * See nbtree/README for details.
+     */
     buffer = XLogReadBuffer(xlrec->node, xlrec->block, false);
     if (!BufferIsValid(buffer))
         return;
@@ -714,7 +813,44 @@ btree_redo(XLogRecPtr lsn, XLogRecord *record)
 {
     uint8        info = record->xl_info & ~XLR_INFO_MASK;

-    RestoreBkpBlocks(lsn, record, false);
+    /*
+     * Btree delete records can conflict with standby queries. You might
+     * think that vacuum records would conflict as well, but we've handled
+     * that already. XLOG_HEAP2_CLEANUP_INFO records provide the highest xid
+     * cleaned by the vacuum of the heap and so we can resolve any conflicts
+     * just once when that arrives. After that any we know that no conflicts
+     * exist from individual btree vacuum records on that index.
+     */
+    if (InHotStandby)
+    {
+        if (info == XLOG_BTREE_DELETE)
+        {
+            xl_btree_delete *xlrec = (xl_btree_delete *) XLogRecGetData(record);
+            VirtualTransactionId *backends;
+
+            /*
+             * XXX Currently we put everybody on death row, because
+             * currently _bt_delitems() supplies InvalidTransactionId.
+             * This can be fairly painful, so providing a better value
+             * here is worth some thought and possibly some effort to
+             * improve.
+             */
+            backends = GetConflictingVirtualXIDs(xlrec->latestRemovedXid,
+                                    InvalidOid,
+                                    true);
+
+            ResolveRecoveryConflictWithVirtualXIDs(backends,
+                                    "b-tree delete",
+                                    CONFLICT_MODE_ERROR_DEFERRABLE,
+                                    lsn);
+        }
+    }
+
+    /*
+     * Vacuum needs to pin and take cleanup lock on every leaf page,
+     * a regular exclusive lock is enough for all other purposes.
+     */
+    RestoreBkpBlocks(lsn, record, (info == XLOG_BTREE_VACUUM));

     switch (info)
     {
@@ -739,6 +875,9 @@ btree_redo(XLogRecPtr lsn, XLogRecord *record)
         case XLOG_BTREE_SPLIT_R_ROOT:
             btree_xlog_split(false, true, lsn, record);
             break;
+        case XLOG_BTREE_VACUUM:
+            btree_xlog_vacuum(lsn, record);
+            break;
         case XLOG_BTREE_DELETE:
             btree_xlog_delete(lsn, record);
             break;
@@ -843,13 +982,24 @@ btree_desc(StringInfo buf, uint8 xl_info, char *rec)
                                  xlrec->level, xlrec->firstright);
                 break;
             }
+        case XLOG_BTREE_VACUUM:
+            {
+                xl_btree_vacuum *xlrec = (xl_btree_vacuum *) rec;
+
+                appendStringInfo(buf, "vacuum: rel %u/%u/%u; blk %u, lastBlockVacuumed %u",
+                                 xlrec->node.spcNode, xlrec->node.dbNode,
+                                 xlrec->node.relNode, xlrec->block,
+                                 xlrec->lastBlockVacuumed);
+                break;
+            }
         case XLOG_BTREE_DELETE:
             {
                 xl_btree_delete *xlrec = (xl_btree_delete *) rec;

-                appendStringInfo(buf, "delete: rel %u/%u/%u; blk %u",
+                appendStringInfo(buf, "delete: rel %u/%u/%u; blk %u, latestRemovedXid %u",
                                  xlrec->node.spcNode, xlrec->node.dbNode,
-                                 xlrec->node.relNode, xlrec->block);
+                                 xlrec->node.relNode, xlrec->block,
+                                 xlrec->latestRemovedXid);
                 break;
             }
         case XLOG_BTREE_DELETE_PAGE:
diff --git a/src/backend/access/transam/README b/src/backend/access/transam/README
index a88563e..46b48f0 100644
--- a/src/backend/access/transam/README
+++ b/src/backend/access/transam/README
@@ -649,3 +649,34 @@ fsync it down to disk without any sort of interlock, as soon as it finishes
 the bulk update.  However, all these paths are designed to write data that
 no other transaction can see until after T1 commits.  The situation is thus
 not different from ordinary WAL-logged updates.
+
+Transaction Emulation during Recovery
+-------------------------------------
+
+During Recovery we replay transaction changes in the order they occurred.
+As part of this replay we emulate some transactional behaviour, so that
+read only backends can take MVCC snapshots. We do this by maintaining a
+list of XIDs belonging to transactions that are being replayed, so that
+each transaction that has recorded WAL records for database writes exist
+in the array until it commits. Further details are given in comments in
+procarray.c.
+
+Many actions write no WAL records at all, for example read only transactions.
+These have no effect on MVCC in recovery and we can pretend they never
+occurred at all. Subtransaction commit does not write a WAL record either
+and has very little effect, since lock waiters need to wait for the
+parent transaction to complete.
+
+Not all transactional behaviour is emulated, for example we do not insert
+a transaction entry into the lock table, nor do we maintain the transaction
+stack in memory. Clog entries are made normally. Multitrans is not maintained
+because its purpose is to record tuple level locks that an application has
+requested to prevent write locks. Since write locks cannot be obtained at all,
+there is never any conflict and so there is no reason to update multitrans.
+Subtrans is maintained during recovery but the details of the transaction
+tree are ignored and all subtransactions reference the top-level TransactionId
+directly. Since commit is atomic this provides correct lock wait behaviour
+yet simplifies emulation of subtransactions considerably.
+
+Further details on locking mechanics in recovery are given in comments
+with the Lock rmgr code.
diff --git a/src/backend/access/transam/clog.c b/src/backend/access/transam/clog.c
index 8c1ccb6..435276b 100644
--- a/src/backend/access/transam/clog.c
+++ b/src/backend/access/transam/clog.c
@@ -574,7 +574,7 @@ ExtendCLOG(TransactionId newestXact)
     LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);

     /* Zero the page and make an XLOG entry about it */
-    ZeroCLOGPage(pageno, true);
+    ZeroCLOGPage(pageno, !InRecovery);

     LWLockRelease(CLogControlLock);
 }
diff --git a/src/backend/access/transam/multixact.c b/src/backend/access/transam/multixact.c
index 6f86961..fc8ef67 100644
--- a/src/backend/access/transam/multixact.c
+++ b/src/backend/access/transam/multixact.c
@@ -57,6 +57,7 @@
 #include "storage/backendid.h"
 #include "storage/lmgr.h"
 #include "storage/procarray.h"
+#include "utils/builtins.h"
 #include "utils/memutils.h"


@@ -209,7 +210,6 @@ static MultiXactId GetNewMultiXactId(int nxids, MultiXactOffset *offset);
 static MultiXactId mXactCacheGetBySet(int nxids, TransactionId *xids);
 static int    mXactCacheGetById(MultiXactId multi, TransactionId **xids);
 static void mXactCachePut(MultiXactId multi, int nxids, TransactionId *xids);
-static int    xidComparator(const void *arg1, const void *arg2);

 #ifdef MULTIXACT_DEBUG
 static char *mxid_to_string(MultiXactId multi, int nxids, TransactionId *xids);
@@ -1210,27 +1210,6 @@ mXactCachePut(MultiXactId multi, int nxids, TransactionId *xids)
     MXactCache = entry;
 }

-/*
- * xidComparator
- *        qsort comparison function for XIDs
- *
- * We don't need to use wraparound comparison for XIDs, and indeed must
- * not do so since that does not respect the triangle inequality!  Any
- * old sort order will do.
- */
-static int
-xidComparator(const void *arg1, const void *arg2)
-{
-    TransactionId xid1 = *(const TransactionId *) arg1;
-    TransactionId xid2 = *(const TransactionId *) arg2;
-
-    if (xid1 > xid2)
-        return 1;
-    if (xid1 < xid2)
-        return -1;
-    return 0;
-}
-
 #ifdef MULTIXACT_DEBUG
 static char *
 mxid_to_string(MultiXactId multi, int nxids, TransactionId *xids)
@@ -1927,11 +1906,18 @@ multixact_redo(XLogRecPtr lsn, XLogRecord *record)
             if (TransactionIdPrecedes(max_xid, xids[i]))
                 max_xid = xids[i];
         }
+
+        /* We don't expect anyone else to modify nextXid, hence we
+         * don't need to hold a lock while checking this. We still acquire
+         * the lock to modify it, though.
+         */
         if (TransactionIdFollowsOrEquals(max_xid,
                                          ShmemVariableCache->nextXid))
         {
+            LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
             ShmemVariableCache->nextXid = max_xid;
             TransactionIdAdvance(ShmemVariableCache->nextXid);
+            LWLockRelease(XidGenLock);
         }
     }
     else
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index 0273b0e..252f4ee 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -20,6 +20,7 @@
 #include "commands/dbcommands.h"
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
+#include "storage/sinval.h"
 #include "storage/freespace.h"


@@ -32,7 +33,7 @@ const RmgrData RmgrTable[RM_MAX_ID + 1] = {
     {"Tablespace", tblspc_redo, tblspc_desc, NULL, NULL, NULL},
     {"MultiXact", multixact_redo, multixact_desc, NULL, NULL, NULL},
     {"Reserved 7", NULL, NULL, NULL, NULL, NULL},
-    {"Reserved 8", NULL, NULL, NULL, NULL, NULL},
+    {"Relation", relation_redo, relation_desc, NULL, NULL, NULL},
     {"Heap2", heap2_redo, heap2_desc, NULL, NULL, NULL},
     {"Heap", heap_redo, heap_desc, NULL, NULL, NULL},
     {"Btree", btree_redo, btree_desc, btree_xlog_startup, btree_xlog_cleanup, btree_safe_restartpoint},
diff --git a/src/backend/access/transam/transam.c b/src/backend/access/transam/transam.c
index 2a1eab4..6fb2d3f 100644
--- a/src/backend/access/transam/transam.c
+++ b/src/backend/access/transam/transam.c
@@ -35,9 +35,6 @@ static TransactionId cachedFetchXid = InvalidTransactionId;
 static XidStatus cachedFetchXidStatus;
 static XLogRecPtr cachedCommitLSN;

-/* Handy constant for an invalid xlog recptr */
-static const XLogRecPtr InvalidXLogRecPtr = {0, 0};
-
 /* Local functions */
 static XidStatus TransactionLogFetch(TransactionId transactionId);

diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 195c90c..a228fd1 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -143,7 +143,10 @@ static void RecordTransactionCommitPrepared(TransactionId xid,
                                 int nchildren,
                                 TransactionId *children,
                                 int nrels,
-                                RelFileNode *rels);
+                                RelFileNode *rels,
+                                int ninvalmsgs,
+                                SharedInvalidationMessage *invalmsgs,
+                                bool initfileinval);
 static void RecordTransactionAbortPrepared(TransactionId xid,
                                int nchildren,
                                TransactionId *children,
@@ -705,10 +708,11 @@ TwoPhaseGetDummyProc(TransactionId xid)
  *    2. TransactionId[] (subtransactions)
  *    3. RelFileNode[] (files to be deleted at commit)
  *    4. RelFileNode[] (files to be deleted at abort)
- *    5. TwoPhaseRecordOnDisk
- *    6. ...
- *    7. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
- *    8. CRC32
+ *    5. SharedInvalidationMessage[] (inval messages to be sent at commit)
+ *    6. TwoPhaseRecordOnDisk
+ *    7. ...
+ *    8. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
+ *    9. CRC32
  *
  * Each segment except the final CRC32 is MAXALIGN'd.
  */
@@ -729,6 +733,8 @@ typedef struct TwoPhaseFileHeader
     int32        nsubxacts;        /* number of following subxact XIDs */
     int32        ncommitrels;    /* number of delete-on-commit rels */
     int32        nabortrels;        /* number of delete-on-abort rels */
+    int32        ninvalmsgs;        /* number of cache invalidation messages */
+    bool        initfileinval;    /* does relcache init file need invalidation? */
     char        gid[GIDSIZE];    /* GID for transaction */
 } TwoPhaseFileHeader;

@@ -804,6 +810,7 @@ StartPrepare(GlobalTransaction gxact)
     TransactionId *children;
     RelFileNode *commitrels;
     RelFileNode *abortrels;
+    SharedInvalidationMessage *invalmsgs;

     /* Initialize linked list */
     records.head = palloc0(sizeof(XLogRecData));
@@ -828,11 +835,16 @@ StartPrepare(GlobalTransaction gxact)
     hdr.nsubxacts = xactGetCommittedChildren(&children);
     hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels, NULL);
     hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels, NULL);
+    hdr.ninvalmsgs = xactGetCommittedInvalidationMessages(&invalmsgs,
+                                                          &hdr.initfileinval);
     StrNCpy(hdr.gid, gxact->gid, GIDSIZE);

     save_state_data(&hdr, sizeof(TwoPhaseFileHeader));

-    /* Add the additional info about subxacts and deletable files */
+    /*
+     * Add the additional info about subxacts, deletable files and
+     * cache invalidation messages.
+     */
     if (hdr.nsubxacts > 0)
     {
         save_state_data(children, hdr.nsubxacts * sizeof(TransactionId));
@@ -849,6 +861,12 @@ StartPrepare(GlobalTransaction gxact)
         save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileNode));
         pfree(abortrels);
     }
+    if (hdr.ninvalmsgs > 0)
+    {
+        save_state_data(invalmsgs,
+                        hdr.ninvalmsgs * sizeof(SharedInvalidationMessage));
+        pfree(invalmsgs);
+    }
 }

 /*
@@ -1153,6 +1171,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
     RelFileNode *abortrels;
     RelFileNode *delrels;
     int            ndelrels;
+    SharedInvalidationMessage *invalmsgs;
     int            i;

     /*
@@ -1184,6 +1203,8 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
     bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
     abortrels = (RelFileNode *) bufptr;
     bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
+    invalmsgs = (SharedInvalidationMessage *) bufptr;
+    bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));

     /* compute latestXid among all children */
     latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);
@@ -1199,7 +1220,9 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
     if (isCommit)
         RecordTransactionCommitPrepared(xid,
                                         hdr->nsubxacts, children,
-                                        hdr->ncommitrels, commitrels);
+                                        hdr->ncommitrels, commitrels,
+                                        hdr->ninvalmsgs, invalmsgs,
+                                        hdr->initfileinval);
     else
         RecordTransactionAbortPrepared(xid,
                                        hdr->nsubxacts, children,
@@ -1246,6 +1269,18 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
         smgrclose(srel);
     }

+    /*
+     * Handle cache invalidation messages.
+     *
+     * Relcache init file invalidation requires processing both
+     * before and after we send the SI messages. See AtEOXact_Inval()
+     */
+    if (hdr->initfileinval)
+        RelationCacheInitFileInvalidate(true);
+    SendSharedInvalidMessages(invalmsgs, hdr->ninvalmsgs);
+    if (hdr->initfileinval)
+        RelationCacheInitFileInvalidate(false);
+
     /* And now do the callbacks */
     if (isCommit)
         ProcessRecords(bufptr, xid, twophase_postcommit_callbacks);
@@ -1656,6 +1691,7 @@ RecoverPreparedTransactions(void)
             bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
             bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
             bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
+            bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));

             /*
              * Reconstruct subtrans state for the transaction --- needed
@@ -1695,6 +1731,118 @@ RecoverPreparedTransactions(void)
 }

 /*
+ * StandbyRecoverPreparedTransactions
+ *
+ * Scan the pg_twophase directory and reload shared-memory state for each
+ * prepared transaction (reacquire locks, etc).  This is run during database
+ * startup, when in standby mode.
+ *
+ * Returns an array of all top-level xids. Number of xids in the array is
+ * returned in *nxids.
+ */
+TransactionId *
+StandbyRecoverPreparedTransactions(int *nxids_p)
+{
+    char        dir[MAXPGPATH];
+    DIR           *cldir;
+    struct dirent *clde;
+    TransactionId *xids = NULL;
+    int            nxids = 0;
+    int            allocsize = 0;
+
+    snprintf(dir, MAXPGPATH, "%s", TWOPHASE_DIR);
+
+    cldir = AllocateDir(dir);
+    while ((clde = ReadDir(cldir, dir)) != NULL)
+    {
+        if (strlen(clde->d_name) == 8 &&
+            strspn(clde->d_name, "0123456789ABCDEF") == 8)
+        {
+            TransactionId xid;
+            char       *buf;
+            char       *bufptr;
+            TwoPhaseFileHeader *hdr;
+            TransactionId *subxids;
+            int            i;
+
+            xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
+
+            /* Already processed? */
+            if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
+            {
+                ereport(WARNING,
+                        (errmsg("removing stale two-phase state file \"%s\"",
+                                clde->d_name)));
+                RemoveTwoPhaseFile(xid, true);
+                continue;
+            }
+
+            /* Read and validate file */
+            buf = ReadTwoPhaseFile(xid);
+            if (buf == NULL)
+            {
+                ereport(WARNING,
+                      (errmsg("removing corrupt two-phase state file \"%s\"",
+                              clde->d_name)));
+                RemoveTwoPhaseFile(xid, true);
+                continue;
+            }
+
+            ereport(LOG,
+                    (errmsg("recovering prepared transaction %u", xid)));
+
+            /* Deconstruct header */
+            hdr = (TwoPhaseFileHeader *) buf;
+            Assert(TransactionIdEquals(hdr->xid, xid));
+            bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
+            subxids = (TransactionId *) bufptr;
+            bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
+            bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
+            bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
+            bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
+
+            /*
+             * Reconstruct subtrans state for the transaction --- needed
+             * because pg_subtrans is not preserved over a restart.  Note that
+             * we are linking all the subtransactions directly to the
+             * top-level XID; there may originally have been a more complex
+             * hierarchy, but there's no need to restore that exactly.
+             */
+            for (i = 0; i < hdr->nsubxacts; i++)
+                SubTransSetParent(subxids[i], xid);
+
+            /*
+             * Recover other state (notably locks) using resource managers
+             */
+            ProcessRecords(bufptr, xid, twophase_standby_recover_callbacks);
+
+            if(nxids == allocsize)
+            {
+                if (nxids == 0)
+                {
+                    allocsize = 10;
+                    xids = palloc(allocsize * sizeof(TransactionId));
+                }
+                else
+                {
+                    allocsize = allocsize * 2;
+                    xids = repalloc(xids, allocsize * sizeof(TransactionId));
+                }
+            }
+
+            xids[nxids++] = xid;
+
+            pfree(buf);
+        }
+    }
+    FreeDir(cldir);
+
+    *nxids_p = nxids;
+    return xids;
+}
+
+
+/*
  *    RecordTransactionCommitPrepared
  *
  * This is basically the same as RecordTransactionCommit: in particular,
@@ -1708,9 +1856,12 @@ RecordTransactionCommitPrepared(TransactionId xid,
                                 int nchildren,
                                 TransactionId *children,
                                 int nrels,
-                                RelFileNode *rels)
+                                RelFileNode *rels,
+                                int ninvalmsgs,
+                                SharedInvalidationMessage *invalmsgs,
+                                bool initfileinval)
 {
-    XLogRecData rdata[3];
+    XLogRecData rdata[4];
     int            lastrdata = 0;
     xl_xact_commit_prepared xlrec;
     XLogRecPtr    recptr;
@@ -1723,8 +1874,12 @@ RecordTransactionCommitPrepared(TransactionId xid,
     /* Emit the XLOG commit record */
     xlrec.xid = xid;
     xlrec.crec.xact_time = GetCurrentTimestamp();
+    xlrec.crec.xinfo = initfileinval ? XACT_COMPLETION_UPDATE_RELCACHE_FILE : 0;
+    xlrec.crec.nmsgs = 0;
     xlrec.crec.nrels = nrels;
     xlrec.crec.nsubxacts = nchildren;
+    xlrec.crec.nmsgs = ninvalmsgs;
+
     rdata[0].data = (char *) (&xlrec);
     rdata[0].len = MinSizeOfXactCommitPrepared;
     rdata[0].buffer = InvalidBuffer;
@@ -1746,6 +1901,15 @@ RecordTransactionCommitPrepared(TransactionId xid,
         rdata[2].buffer = InvalidBuffer;
         lastrdata = 2;
     }
+    /* dump cache invalidation messages */
+    if (ninvalmsgs > 0)
+    {
+        rdata[lastrdata].next = &(rdata[3]);
+        rdata[3].data = (char *) invalmsgs;
+        rdata[3].len = ninvalmsgs * sizeof(SharedInvalidationMessage);
+        rdata[3].buffer = InvalidBuffer;
+        lastrdata = 3;
+    }
     rdata[lastrdata].next = NULL;

     recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_PREPARED, rdata);
diff --git a/src/backend/access/transam/twophase_rmgr.c b/src/backend/access/transam/twophase_rmgr.c
index 4ff9549..6fba48e 100644
--- a/src/backend/access/transam/twophase_rmgr.c
+++ b/src/backend/access/transam/twophase_rmgr.c
@@ -18,14 +18,12 @@
 #include "commands/async.h"
 #include "pgstat.h"
 #include "storage/lock.h"
-#include "utils/inval.h"


 const TwoPhaseCallback twophase_recover_callbacks[TWOPHASE_RM_MAX_ID + 1] =
 {
     NULL,                        /* END ID */
     lock_twophase_recover,        /* Lock */
-    NULL,                        /* Inval */
     NULL,                        /* notify/listen */
     NULL                        /* pgstat */
 };
@@ -34,7 +32,6 @@ const TwoPhaseCallback twophase_postcommit_callbacks[TWOPHASE_RM_MAX_ID + 1] =
 {
     NULL,                        /* END ID */
     lock_twophase_postcommit,    /* Lock */
-    inval_twophase_postcommit,    /* Inval */
     notify_twophase_postcommit, /* notify/listen */
     pgstat_twophase_postcommit    /* pgstat */
 };
@@ -43,7 +40,14 @@ const TwoPhaseCallback twophase_postabort_callbacks[TWOPHASE_RM_MAX_ID + 1] =
 {
     NULL,                        /* END ID */
     lock_twophase_postabort,    /* Lock */
-    NULL,                        /* Inval */
     NULL,                        /* notify/listen */
     pgstat_twophase_postabort    /* pgstat */
 };
+
+const TwoPhaseCallback twophase_standby_recover_callbacks[TWOPHASE_RM_MAX_ID + 1] =
+{
+    NULL,                        /* END ID */
+    lock_twophase_standby_recover,        /* Lock */
+    NULL,                        /* notify/listen */
+    NULL                        /* pgstat */
+};
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index c3606e0..90d8ac6 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -40,6 +40,7 @@
 #include "storage/fd.h"
 #include "storage/lmgr.h"
 #include "storage/procarray.h"
+#include "storage/sinval.h"
 #include "storage/sinvaladt.h"
 #include "storage/smgr.h"
 #include "utils/combocid.h"
@@ -139,6 +140,7 @@ typedef struct TransactionStateData
     Oid            prevUser;        /* previous CurrentUserId setting */
     bool        prevSecDefCxt;    /* previous SecurityDefinerContext setting */
     bool        prevXactReadOnly;        /* entry-time xact r/o state */
+    bool        startedInRecovery;    /* did we start in recovery? */
     struct TransactionStateData *parent;        /* back link to parent */
 } TransactionStateData;

@@ -167,9 +169,17 @@ static TransactionStateData TopTransactionStateData = {
     InvalidOid,                    /* previous CurrentUserId setting */
     false,                        /* previous SecurityDefinerContext setting */
     false,                        /* entry-time xact r/o state */
+    false,                        /* startedInRecovery */
     NULL                        /* link to parent state block */
 };

+/*
+ * unreportedXids holds XIDs of all subtransactions that have not yet been
+ * reported in a XLOG_XACT_ASSIGNMENT record.
+ */
+static int nUnreportedXids;
+static TransactionId unreportedXids[PGPROC_MAX_CACHED_SUBXIDS];
+
 static TransactionState CurrentTransactionState = &TopTransactionStateData;

 /*
@@ -392,6 +402,9 @@ AssignTransactionId(TransactionState s)
     bool        isSubXact = (s->parent != NULL);
     ResourceOwner currentOwner;

+    if (RecoveryInProgress())
+        elog(ERROR, "cannot assign TransactionIds during recovery");
+
     /* Assert that caller didn't screw up */
     Assert(!TransactionIdIsValid(s->transactionId));
     Assert(s->state == TRANS_INPROGRESS);
@@ -435,8 +448,55 @@ AssignTransactionId(TransactionState s)
     }
     PG_END_TRY();
     CurrentResourceOwner = currentOwner;
-}

+    /*
+     * Every PGPROC_MAX_CACHED_SUBXIDS assigned transaction ids within each
+     * top-level transaction we issue a WAL record for the assignment. We
+     * include the top-level xid and all the subxids that have not yet been
+     * reported using XLOG_XACT_ASSIGNMENT records.
+     *
+     * This is required to limit the amount of shared memory required in a
+     * hot standby server to keep track of in-progress XIDs. See notes for
+     * RecordKnownAssignedTransactionIds().
+     *
+     * We don't actually keep track of the immediate parent of each subxid,
+     * only the top-level transaction that each subxact belongs to. This
+     * is correct in recovery only because aborted subtransactions are
+     * separately WAL logged.
+     */
+    if (isSubXact && XLogArchivingActive())
+    {
+        unreportedXids[nUnreportedXids] = s->transactionId;
+        nUnreportedXids++;
+
+        if (nUnreportedXids >= PGPROC_MAX_CACHED_SUBXIDS)
+        {
+            XLogRecData rdata[2];
+            xl_xact_assignment    xlrec;
+
+            /*
+             * We say "IfAny" to avoid recursing back here.
+             */
+            xlrec.xtop = GetTopTransactionIdIfAny();
+            Assert(TransactionIdIsValid(xlrec.xtop));
+            xlrec.nsubxacts = nUnreportedXids;
+
+            rdata[0].data = (char *) (&xlrec);
+            rdata[0].len = MinSizeOfXactAssignment;
+            rdata[0].buffer = InvalidBuffer;
+            rdata[0].next = &rdata[1];
+
+            rdata[1].data = (char *) unreportedXids;
+            rdata[1].len = sizeof(TransactionId) * nUnreportedXids;
+            rdata[1].buffer = InvalidBuffer;
+            rdata[1].next = NULL;
+
+            (void) XLogInsert(RM_XACT_ID, XLOG_XACT_ASSIGNMENT, rdata);
+
+            nUnreportedXids = 0;
+        }
+    }
+}

 /*
  *    GetCurrentSubTransactionId
@@ -596,6 +656,16 @@ TransactionIdIsCurrentTransactionId(TransactionId xid)
     return false;
 }

+/*
+ *    TransactionStartedDuringRecovery, used during index scans
+ */
+bool
+TransactionStartedDuringRecovery(void)
+{
+    TransactionState s = CurrentTransactionState;
+
+    return s->startedInRecovery;
+}

 /*
  *    CommandCounterIncrement
@@ -811,7 +881,7 @@ AtSubStart_ResourceOwner(void)
  * This is exported only to support an ugly hack in VACUUM FULL.
  */
 TransactionId
-RecordTransactionCommit(void)
+RecordTransactionCommit(bool isVacuumFull)
 {
     TransactionId xid = GetTopTransactionIdIfAny();
     bool        markXidCommitted = TransactionIdIsValid(xid);
@@ -821,11 +891,15 @@ RecordTransactionCommit(void)
     bool        haveNonTemp;
     int            nchildren;
     TransactionId *children;
+    int            nmsgs;
+    SharedInvalidationMessage *invalMessages = NULL;
+    bool        RelcacheInitFileInval;

     /* Get data needed for commit record */
     nrels = smgrGetPendingDeletes(true, &rels, &haveNonTemp);
     nchildren = xactGetCommittedChildren(&children);
-
+    nmsgs = xactGetCommittedInvalidationMessages(&invalMessages,
+                                                 &RelcacheInitFileInval);
     /*
      * If we haven't been assigned an XID yet, we neither can, nor do we want
      * to write a COMMIT record.
@@ -859,7 +933,7 @@ RecordTransactionCommit(void)
         /*
          * Begin commit critical section and insert the commit XLOG record.
          */
-        XLogRecData rdata[3];
+        XLogRecData rdata[4];
         int            lastrdata = 0;
         xl_xact_commit xlrec;

@@ -867,6 +941,15 @@ RecordTransactionCommit(void)
         BufmgrCommit();

         /*
+         * Set flags required for recovery processing of commits.
+         */
+        xlrec.xinfo = 0;
+        if (RelcacheInitFileInval)
+            xlrec.xinfo |= XACT_COMPLETION_UPDATE_RELCACHE_FILE;
+        if (isVacuumFull)
+            xlrec.xinfo |= XACT_COMPLETION_VACUUM_FULL;
+
+        /*
          * Mark ourselves as within our "commit critical section".    This
          * forces any concurrent checkpoint to wait until we've updated
          * pg_clog.  Without this, it is possible for the checkpoint to set
@@ -890,6 +973,7 @@ RecordTransactionCommit(void)
         xlrec.xact_time = xactStopTimestamp;
         xlrec.nrels = nrels;
         xlrec.nsubxacts = nchildren;
+        xlrec.nmsgs = nmsgs;
         rdata[0].data = (char *) (&xlrec);
         rdata[0].len = MinSizeOfXactCommit;
         rdata[0].buffer = InvalidBuffer;
@@ -911,6 +995,15 @@ RecordTransactionCommit(void)
             rdata[2].buffer = InvalidBuffer;
             lastrdata = 2;
         }
+        /* dump shared cache invalidation messages */
+        if (nmsgs > 0)
+        {
+            rdata[lastrdata].next = &(rdata[3]);
+            rdata[3].data = (char *) invalMessages;
+            rdata[3].len = nmsgs * sizeof(SharedInvalidationMessage);
+            rdata[3].buffer = InvalidBuffer;
+            lastrdata = 3;
+        }
         rdata[lastrdata].next = NULL;

         (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT, rdata);
@@ -1352,6 +1445,8 @@ AtSubAbort_childXids(void)
     s->childXids = NULL;
     s->nChildXids = 0;
     s->maxChildXids = 0;
+
+    /* We could prune the unreportedXids array here. But we don't bother. */
 }

 /* ----------------------------------------------------------------
@@ -1476,6 +1571,11 @@ StartTransaction(void)
     currentCommandIdUsed = false;

     /*
+     * initialize reported xid accounting
+     */
+    nUnreportedXids = 0;
+
+    /*
      * must initialize resource-management stuff first
      */
     AtStart_Memory();
@@ -1522,6 +1622,7 @@ StartTransaction(void)
     s->childXids = NULL;
     s->nChildXids = 0;
     s->maxChildXids = 0;
+    s->startedInRecovery = RecoveryInProgress();
     GetUserIdAndContext(&s->prevUser, &s->prevSecDefCxt);
     /* SecurityDefinerContext should never be set outside a transaction */
     Assert(!s->prevSecDefCxt);
@@ -1619,7 +1720,7 @@ CommitTransaction(void)
     /*
      * Here is where we really truly commit.
      */
-    latestXid = RecordTransactionCommit();
+    latestXid = RecordTransactionCommit(false);

     TRACE_POSTGRESQL_TRANSACTION_COMMIT(MyProc->lxid);

@@ -1853,7 +1954,6 @@ PrepareTransaction(void)
     StartPrepare(gxact);

     AtPrepare_Notify();
-    AtPrepare_Inval();
     AtPrepare_Locks();
     AtPrepare_PgStat();

@@ -4195,32 +4295,222 @@ xactGetCommittedChildren(TransactionId **ptr)
 }

 /*
+ * Record an enhanced snapshot of running transactions into WAL.
+ *
+ * The definitions of RunningTransactionData and xl_xact_running_xacts
+ * are similar. We keep them separate because xl_xact_running_xacts
+ * is a contiguous chunk of memory and never exists fully until it is
+ * assembled in WAL.
+ */
+XLogRecPtr
+LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
+{
+    xl_xact_running_xacts    xlrec;
+    XLogRecData             rdata[3];
+    int                        lastrdata = 0;
+
+    xlrec.xcnt = CurrRunningXacts->xcnt;
+    xlrec.numLocks = CurrRunningXacts->numLocks;
+    xlrec.lock_overflow = CurrRunningXacts->lock_overflow;
+    xlrec.subxid_overflow = CurrRunningXacts->subxid_overflow;
+    xlrec.nextXid = CurrRunningXacts->nextXid;
+    xlrec.oldestRunningXid = CurrRunningXacts->oldestRunningXid;
+
+    /* Header */
+    rdata[0].data = (char *) (&xlrec);
+    rdata[0].len = MinSizeOfXactRunningXacts;
+    rdata[0].buffer = InvalidBuffer;
+
+    /* array of TransactionIds */
+    if (xlrec.xcnt > 0)
+    {
+        rdata[0].next = &(rdata[1]);
+        rdata[1].data = (char *) CurrRunningXacts->xids;
+        rdata[1].len = xlrec.xcnt * sizeof(TransactionId);
+        rdata[1].buffer = InvalidBuffer;
+        lastrdata = 1;
+    }
+
+    /* array of Locks */
+    if (xlrec.numLocks > 0)
+    {
+        rdata[lastrdata].next = &(rdata[2]);
+        rdata[2].data = (char *) CurrRunningXacts->loggableLocks;
+        rdata[2].len = xlrec.numLocks * sizeof(xl_rel_lock);
+        rdata[2].buffer = InvalidBuffer;
+        lastrdata = 2;
+    }
+
+    rdata[lastrdata].next = NULL;
+
+    return XLogInsert(RM_XACT_ID, XLOG_XACT_RUNNING_XACTS, rdata);
+}
+
+/*
+ * We need to issue shared invalidations and hold locks. Holding locks
+ * means others may want to wait on us, so we need to make lock table
+ * inserts to appear like a transaction. We could create and delete
+ * lock table entries for each transaction but its simpler just to create
+ * one permanent entry and leave it there all the time. Locks are then
+ * acquired and released as needed. Yes, this means you can see the
+ * Startup process in pg_locks once we have run this.
+ */
+void
+InitRecoveryTransactionEnvironment(void)
+{
+    VirtualTransactionId vxid;
+
+    /*
+     * Initialise shared invalidation management for Startup process,
+     * being careful to register ourselves as a sendOnly process so
+     * we don't need to read messages, nor will we get signalled
+     * when the queue starts filling up.
+     */
+    SharedInvalBackendInit(true);
+
+    /*
+     * Record the PID and PGPROC structure of the startup process.
+     */
+    PublishStartupProcessInformation();
+
+    /*
+     * Lock a virtual transaction id for Startup process.
+     *
+     * We need to do GetNextLocalTransactionId() because
+     * SharedInvalBackendInit() leaves localTransactionid invalid and
+     * the lock manager doesn't like that at all.
+     *
+     * Note that we don't need to run XactLockTableInsert() because nobody
+     * needs to wait on xids. That sounds a little strange, but table locks
+     * are held by vxids and row level locks are held by xids. All queries
+     * hold AccessShareLocks so never block while we write or lock new rows.
+     */
+    vxid.backendId = MyBackendId;
+    vxid.localTransactionId = GetNextLocalTransactionId();
+    VirtualXactLockTableInsert(vxid);
+}
+
+void
+XactClearRecoveryTransactions(void)
+{
+    /*
+     * Remove entries from shared data structures
+     */
+    ExpireAllKnownAssignedTransactionIds();
+    RelationReleaseAllRecoveryLocks();
+}
+
+/*
  *    XLOG support routines
  */

+/*
+ * Before 8.5 this was a fairly short function, but now it performs many
+ * actions for which the order of execution is critical.
+ */
 static void
-xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid)
+xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid, XLogRecPtr lsn)
 {
     TransactionId *sub_xids;
+    SharedInvalidationMessage *inval_msgs;
     TransactionId max_xid;
     int            i;

-    /* Mark the transaction committed in pg_clog */
+    /* subxid array follows relfilenodes */
     sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
-    TransactionIdCommitTree(xid, xlrec->nsubxacts, sub_xids);
+    /* invalidation messages array follows subxids */
+    inval_msgs = (SharedInvalidationMessage *) &(sub_xids[xlrec->nsubxacts]);
+
+    max_xid = TransactionIdLatest(xid, xlrec->nsubxacts, sub_xids);

     /* Make sure nextXid is beyond any XID mentioned in the record */
-    max_xid = xid;
-    for (i = 0; i < xlrec->nsubxacts; i++)
-    {
-        if (TransactionIdPrecedes(max_xid, sub_xids[i]))
-            max_xid = sub_xids[i];
-    }
+    /* We don't expect anyone else to modify nextXid, hence we
+     * don't need to hold a lock while checking this. We still acquire
+     * the lock to modify it, though.
+     */
     if (TransactionIdFollowsOrEquals(max_xid,
                                      ShmemVariableCache->nextXid))
     {
+        LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
         ShmemVariableCache->nextXid = max_xid;
         TransactionIdAdvance(ShmemVariableCache->nextXid);
+        LWLockRelease(XidGenLock);
+    }
+
+    /*
+     * Mark the transaction committed in pg_clog.
+     */
+    if (!InHotStandby)
+        TransactionIdCommitTree(xid, xlrec->nsubxacts, sub_xids);
+    else
+    {
+        /*
+         * Just when you thought it was safe to go swimming again,
+         * along comes a nasty hack with bells on. Half way through
+         * VACUUM FULL it emits a false commit record, so it ends up
+         * emitting two commit records with the same xid. Oh, and it
+         * musn't release locks at the first commit either. So we
+         * have to specially mark the commit record "ignore me".
+         * On primary it actually marks clog committed yet stays
+         * visible in procarray. Cthulhu fhtagn. Run away screaming.
+         */
+        if (XactCompletionVacuumFull(xlrec))
+        {
+            elog(trace_recovery(DEBUG4), "skipping VACUUM FULL pseudo-commit %u", xid);
+            return;
+        }
+
+        /*
+         * Mark the transaction committed in pg_clog. We use async commit
+         * protocol during recovery to provide information on database
+         * consistency for when users try to set hint bits. It is important
+         * that we do not set hint bits until the minRecoveryPoint is past
+         * this commit record. This ensures that if we crash we don't see
+         * hint bits set on changes made by transactions that haven't yet
+         * recovered. It's unlikely but it's good to be safe.
+         */
+        TransactionIdAsyncCommitTree(xid, xlrec->nsubxacts, sub_xids, lsn);
+
+        /*
+         * We must mark clog before we update the ProcArray.
+         */
+        ExpireTreeKnownAssignedTransactionIds(xid, xlrec->nsubxacts, sub_xids);
+
+        /*
+         * Send any cache invalidations attached to the commit. We must
+         * maintain the same order of invalidation then release locks
+         * as occurs in RecordTransactionCommit.
+         */
+        if (xlrec->nmsgs > 0)
+        {
+            /*
+             * Relcache init file invalidation requires processing both
+             * before and after we send the SI messages. See AtEOXact_Inval()
+             */
+            if (XactCompletionRelcacheInitFileInval(xlrec))
+                RelationCacheInitFileInvalidate(true);
+
+            SendSharedInvalidMessages(inval_msgs, xlrec->nmsgs);
+
+            if (XactCompletionRelcacheInitFileInval(xlrec))
+                RelationCacheInitFileInvalidate(false);
+        }
+
+        /*
+         * Release locks, if any. We do this for both two phase and normal
+         * one phase transactions. In effect we are ignoring the prepare
+         * phase and just going straight to lock release.
+         */
+        RelationReleaseRecoveryLockTree(xid, xlrec->nsubxacts, sub_xids);
+    }
+
+    /* Same here, don't use lock to test, but need one to modify */
+    if (TransactionIdFollowsOrEquals(max_xid,
+                                     ShmemVariableCache->latestCompletedXid))
+    {
+        LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+        ShmemVariableCache->latestCompletedXid = max_xid;
+        LWLockRelease(ProcArrayLock);
     }

     /* Make sure files supposed to be dropped are dropped */
@@ -4241,6 +4531,15 @@ xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid)
     }
 }

+/*
+ * Be careful with the order of execution, as with xact_redo_commit().
+ * The two functions are similar but differ in key places.
+ *
+ * Note also that an abort can be for a subtransaction and its children,
+ * not just for a top level abort. That means we have to consider
+ * topxid != xid, whereas in commit we would find topxid == xid always
+ * because subtransaction commit is never WAL logged.
+ */
 static void
 xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
 {
@@ -4248,22 +4547,51 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
     TransactionId max_xid;
     int            i;

-    /* Mark the transaction aborted in pg_clog */
     sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
-    TransactionIdAbortTree(xid, xlrec->nsubxacts, sub_xids);
+    max_xid = TransactionIdLatest(xid, xlrec->nsubxacts, sub_xids);

     /* Make sure nextXid is beyond any XID mentioned in the record */
-    max_xid = xid;
-    for (i = 0; i < xlrec->nsubxacts; i++)
-    {
-        if (TransactionIdPrecedes(max_xid, sub_xids[i]))
-            max_xid = sub_xids[i];
-    }
+    /* We don't expect anyone else to modify nextXid, hence we
+     * don't need to hold a lock while checking this. We still acquire
+     * the lock to modify it, though.
+     */
     if (TransactionIdFollowsOrEquals(max_xid,
                                      ShmemVariableCache->nextXid))
     {
+        LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
         ShmemVariableCache->nextXid = max_xid;
         TransactionIdAdvance(ShmemVariableCache->nextXid);
+        LWLockRelease(XidGenLock);
+    }
+
+    /* Mark the transaction aborted in pg_clog, no need for async stuff */
+    TransactionIdAbortTree(xid, xlrec->nsubxacts, sub_xids);
+
+    if (InHotStandby)
+    {
+        /*
+         * We must mark clog before we update the ProcArray.
+         */
+        ExpireTreeKnownAssignedTransactionIds(xid, xlrec->nsubxacts, sub_xids);
+
+        /*
+         * There are no flat files that need updating, nor invalidation
+         * messages to send or undo.
+         */
+
+        /*
+         * Release locks, if any. There are no invalidations to send.
+         */
+        RelationReleaseRecoveryLockTree(xid, xlrec->nsubxacts, sub_xids);
+    }
+
+    /* Same here, don't use lock to test, but need one to modify */
+    if (TransactionIdFollowsOrEquals(max_xid,
+                                     ShmemVariableCache->latestCompletedXid))
+    {
+        LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+        ShmemVariableCache->latestCompletedXid = max_xid;
+        LWLockRelease(ProcArrayLock);
     }

     /* Make sure files supposed to be dropped are dropped */
@@ -4296,7 +4624,7 @@ xact_redo(XLogRecPtr lsn, XLogRecord *record)
     {
         xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);

-        xact_redo_commit(xlrec, record->xl_xid);
+        xact_redo_commit(xlrec, record->xl_xid, lsn);
     }
     else if (info == XLOG_XACT_ABORT)
     {
@@ -4314,7 +4642,7 @@ xact_redo(XLogRecPtr lsn, XLogRecord *record)
     {
         xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) XLogRecGetData(record);

-        xact_redo_commit(&xlrec->crec, xlrec->xid);
+        xact_redo_commit(&xlrec->crec, xlrec->xid, lsn);
         RemoveTwoPhaseFile(xlrec->xid, false);
     }
     else if (info == XLOG_XACT_ABORT_PREPARED)
@@ -4324,6 +4652,35 @@ xact_redo(XLogRecPtr lsn, XLogRecord *record)
         xact_redo_abort(&xlrec->arec, xlrec->xid);
         RemoveTwoPhaseFile(xlrec->xid, false);
     }
+    else if (info == XLOG_XACT_ASSIGNMENT)
+    {
+        xl_xact_assignment *xlrec = (xl_xact_assignment *) XLogRecGetData(record);
+
+        if (InHotStandby)
+            ProcArrayApplyXidAssignment(xlrec->xtop,
+                                        xlrec->nsubxacts, xlrec->xsub);
+    }
+    else if (info == XLOG_XACT_RUNNING_XACTS)
+    {
+        xl_xact_running_xacts *xlrec = (xl_xact_running_xacts *) XLogRecGetData(record);
+
+        if (standbyState != STANDBY_DISABLED)
+        {
+            RunningTransactionsData running;
+
+            running.xcnt = xlrec->xcnt;
+            running.numLocks = xlrec->numLocks;
+            running.lock_overflow = xlrec->lock_overflow;
+            running.subxid_overflow = xlrec->subxid_overflow;
+            running.nextXid = xlrec->nextXid;
+            running.oldestRunningXid = xlrec->oldestRunningXid;
+            running.xids = xlrec->xids;
+            /* Locks array is after the xids and subxids, see snapshot.h */
+            running.loggableLocks = (xl_rel_lock *) &xlrec->xids[xlrec->xcnt];
+
+            ProcArrayApplyRecoveryInfo(lsn, &running);
+        }
+    }
     else
         elog(PANIC, "xact_redo: unknown op code %u", info);
 }
@@ -4332,11 +4689,19 @@ static void
 xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
 {
     int            i;
+    TransactionId *xacts;
+    SharedInvalidationMessage *msgs;
+
+    xacts = (TransactionId *) &xlrec->xnodes[xlrec->nrels];
+    msgs = (SharedInvalidationMessage *) &xacts[xlrec->nsubxacts];
+
+    if (XactCompletionRelcacheInitFileInval(xlrec))
+        appendStringInfo(buf, "; relcache init file inval");

     appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
     if (xlrec->nrels > 0)
     {
-        appendStringInfo(buf, "; rels:");
+        appendStringInfo(buf, "; %d rels:", xlrec->nrels);
         for (i = 0; i < xlrec->nrels; i++)
         {
             char       *path = relpath(xlrec->xnodes[i], MAIN_FORKNUM);
@@ -4347,13 +4712,25 @@ xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
     }
     if (xlrec->nsubxacts > 0)
     {
-        TransactionId *xacts = (TransactionId *)
-        &xlrec->xnodes[xlrec->nrels];
-
-        appendStringInfo(buf, "; subxacts:");
+        appendStringInfo(buf, "; %d subxacts:", xlrec->nsubxacts);
         for (i = 0; i < xlrec->nsubxacts; i++)
             appendStringInfo(buf, " %u", xacts[i]);
     }
+    if (xlrec->nmsgs > 0)
+    {
+        appendStringInfo(buf, "; %d inval msgs:", xlrec->nmsgs);
+        for (i = 0; i < xlrec->nmsgs; i++)
+        {
+            SharedInvalidationMessage *msg = &msgs[i];
+
+            if (msg->id >= 0)
+                appendStringInfo(buf,  "catcache id%d ", msg->id);
+            else if (msg->id == SHAREDINVALRELCACHE_ID)
+                appendStringInfo(buf,  "relcache ");
+            else if (msg->id == SHAREDINVALSMGR_ID)
+                appendStringInfo(buf,  "smgr ");
+        }
+    }
 }

 static void
@@ -4376,14 +4753,47 @@ xact_desc_abort(StringInfo buf, xl_xact_abort *xlrec)
     if (xlrec->nsubxacts > 0)
     {
         TransactionId *xacts = (TransactionId *)
-        &xlrec->xnodes[xlrec->nrels];
+                                    &xlrec->xnodes[xlrec->nrels];

-        appendStringInfo(buf, "; subxacts:");
+        appendStringInfo(buf, "; %d subxacts:", xlrec->nsubxacts);
         for (i = 0; i < xlrec->nsubxacts; i++)
             appendStringInfo(buf, " %u", xacts[i]);
     }
 }

+static void
+xact_desc_assignment(StringInfo buf, xl_xact_assignment *xlrec)
+{
+    int            i;
+
+    appendStringInfo(buf, "%u subxacts:", xlrec->nsubxacts);
+
+    for (i = 0; i < xlrec->nsubxacts; i++)
+        appendStringInfo(buf, " %u", xlrec->xsub[i]);
+}
+
+static void
+xact_desc_running_xacts(StringInfo buf, xl_xact_running_xacts *xlrec)
+{
+    int            i;
+
+    appendStringInfo(buf,
+                     "nextXid %u oldestRunningXid %u",
+                     xlrec->nextXid,
+                     xlrec->oldestRunningXid);
+    if (xlrec->xcnt > 0)
+    {
+        appendStringInfo(buf, "; %d xacts:", xlrec->xcnt);
+        for (i = 0; i < xlrec->xcnt; i++)
+            appendStringInfo(buf, " %u", xlrec->xids[i]);
+    }
+
+    if (xlrec->subxid_overflow)
+        appendStringInfo(buf, "; subxid ovf");
+    if (xlrec->lock_overflow)
+        appendStringInfo(buf, "; lock ovf");
+}
+
 void
 xact_desc(StringInfo buf, uint8 xl_info, char *rec)
 {
@@ -4411,16 +4821,35 @@ xact_desc(StringInfo buf, uint8 xl_info, char *rec)
     {
         xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) rec;

-        appendStringInfo(buf, "commit %u: ", xlrec->xid);
+        appendStringInfo(buf, "commit prepared %u: ", xlrec->xid);
         xact_desc_commit(buf, &xlrec->crec);
     }
     else if (info == XLOG_XACT_ABORT_PREPARED)
     {
         xl_xact_abort_prepared *xlrec = (xl_xact_abort_prepared *) rec;

-        appendStringInfo(buf, "abort %u: ", xlrec->xid);
+        appendStringInfo(buf, "abort prepared %u: ", xlrec->xid);
         xact_desc_abort(buf, &xlrec->arec);
     }
+    else if (info == XLOG_XACT_ASSIGNMENT)
+    {
+        xl_xact_assignment *xlrec = (xl_xact_assignment *) rec;
+
+        /*
+         * Note that we ignore the WAL record's xid, since we're more
+         * interested in the top-level xid that issued the record
+         * and which xids are being reported here.
+         */
+        appendStringInfo(buf, "xid assignment xtop %u: ", xlrec->xtop);
+        xact_desc_assignment(buf, xlrec);
+    }
+    else if (info == XLOG_XACT_RUNNING_XACTS)
+    {
+        xl_xact_running_xacts *xlrec = (xl_xact_running_xacts *) rec;
+
+        appendStringInfo(buf, "running xacts: ");
+        xact_desc_running_xacts(buf, xlrec);
+    }
     else
         appendStringInfo(buf, "UNKNOWN");
 }
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 90b89b8..827de14 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -133,6 +133,11 @@ TimeLineID    ThisTimeLineID = 0;
  */
 bool        InRecovery = false;

+/* Are we in Hot Standby mode? */
+HotStandbyState        standbyState = STANDBY_DISABLED;
+
+static     XLogRecPtr    LastRec;
+
 /*
  * Local copy of SharedRecoveryInProgress variable. True actually means "not
  * known, need to check the shared state".
@@ -165,6 +170,10 @@ static bool recoveryTargetExact = false;
 static bool recoveryTargetInclusive = true;
 static TransactionId recoveryTargetXid;
 static TimestampTz recoveryTargetTime;
+
+/* Initial setting for maxStandbyDelay, in seconds. */
+#define DEFAULT_MAX_STANDBY_DELAY     30
+
 static TimestampTz recoveryLastXTime = 0;

 /* if recoveryStopsHere returns true, it saves actual stop xid/time here */
@@ -359,6 +368,10 @@ typedef struct XLogCtlData

     /* end+1 of the last record replayed (or being replayed) */
     XLogRecPtr    replayEndRecPtr;
+    /* timestamp of last record replayed (or being replayed) */
+    TimestampTz    recoveryLastXTime;
+
+    int         maxStandbyDelay;

     slock_t        info_lck;        /* locks shared variables shown above */
 } XLogCtlData;
@@ -463,6 +476,8 @@ static void readRecoveryCommandFile(void);
 static void exitArchiveRecovery(TimeLineID endTLI,
                     uint32 endLogId, uint32 endLogSeg);
 static bool recoveryStopsHere(XLogRecord *record, bool *includeThis);
+static void SetMaxStandbyDelay(int delay);
+static void CheckMaxConnections(int maxcon);
 static void LocalSetXLogInsertAllowed(void);
 static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);

@@ -2103,9 +2118,41 @@ XLogAsyncCommitFlush(void)
 bool
 XLogNeedsFlush(XLogRecPtr record)
 {
-    /* XLOG doesn't need flushing during recovery */
+    /*
+     * During recovery, we don't flush WAL but update minRecoveryPoint
+     * instead. So "needs flush" is taken to mean whether minRecoveryPoint
+     * would need to be updated.
+     */
     if (RecoveryInProgress())
-        return false;
+    {
+        /* Quick exit if already known updated */
+        if (XLByteLE(record, minRecoveryPoint) || !updateMinRecoveryPoint)
+            return false;
+
+
+        /*
+         * Update local copy of minRecoveryPoint. But if the lock is busy,
+         * just return a conservative guess.
+         */
+        if (!LWLockConditionalAcquire(ControlFileLock, LW_SHARED))
+            return true;
+        minRecoveryPoint = ControlFile->minRecoveryPoint;
+        LWLockRelease(ControlFileLock);
+
+        /*
+         * An invalid minRecoveryPoint means that we need to recover all the WAL,
+         * i.e., we're doing crash recovery.  We never modify the control file's
+         * value in that case, so we can short-circuit future checks here too.
+         */
+        if (minRecoveryPoint.xlogid == 0 && minRecoveryPoint.xrecoff == 0)
+            updateMinRecoveryPoint = false;
+
+        /* check again */
+        if (XLByteLE(record, minRecoveryPoint) || !updateMinRecoveryPoint)
+            return false;
+        else
+            return true;
+    }

     /* Quick exit if already known flushed */
     if (XLByteLE(record, LogwrtResult.Flush))
@@ -3259,10 +3306,11 @@ CleanupBackupHistory(void)
  * ignoring them as already applied, but that's not a huge drawback.
  *
  * If 'cleanup' is true, a cleanup lock is used when restoring blocks.
- * Otherwise, a normal exclusive lock is used.    At the moment, that's just
- * pro forma, because there can't be any regular backends in the system
- * during recovery.  The 'cleanup' argument applies to all backup blocks
- * in the WAL record, that suffices for now.
+ * Otherwise, a normal exclusive lock is used.    During crash recovery, that's
+ * just pro forma because there can't be any regular backends in the system,
+ * but in hot standby mode the distinction is important. The 'cleanup'
+ * argument applies to all backup blocks in the WAL record, that suffices for
+ * now.
  */
 void
 RestoreBkpBlocks(XLogRecPtr lsn, XLogRecord *record, bool cleanup)
@@ -4793,6 +4841,9 @@ readRecoveryCommandFile(void)
     TimeLineID    rtli = 0;
     bool        rtliGiven = false;
     bool        syntaxError = false;
+    int            maxStandbyDelay = DEFAULT_MAX_STANDBY_DELAY;
+    /* recovery_connections defaults to true, if recovery.conf exists */
+    bool        recoveryConnectionsEnabled = true;

     fd = AllocateFile(RECOVERY_COMMAND_FILE, "r");
     if (fd == NULL)
@@ -4931,6 +4982,35 @@ readRecoveryCommandFile(void)
             ereport(LOG,
                     (errmsg("recovery_target_inclusive = %s", tok2)));
         }
+        else if (strcmp(tok1, "recovery_connections") == 0)
+        {
+            /*
+             * enables/disables snapshot processing and user connections
+             */
+            if (!parse_bool(tok2, &recoveryConnectionsEnabled))
+                  ereport(ERROR,
+                            (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                      errmsg("parameter \"recovery_connections\" requires a Boolean value")));
+            ereport(LOG,
+                    (errmsg("recovery_connections = %s", tok2)));
+        }
+        else if (strcmp(tok1, "max_standby_delay") == 0)
+        {
+            /*
+             * Maximum delay in standby mode before we start canceling
+             * queries. Ignored if recovery_connections is false.
+             */
+            errno = 0;
+            maxStandbyDelay = (TransactionId) strtoul(tok2, NULL, 0);
+            if (errno == EINVAL || errno == ERANGE)
+                ereport(FATAL,
+                 (errmsg("max_standby_delay is not a valid number: \"%s\"",
+                         tok2)));
+
+            ereport(LOG,
+                    (errmsg("max_standby_delay = %u",
+                            maxStandbyDelay)));
+        }
         else
             ereport(FATAL,
                     (errmsg("unrecognized recovery parameter \"%s\"",
@@ -4954,6 +5034,12 @@ readRecoveryCommandFile(void)
     /* Enable fetching from archive recovery area */
     InArchiveRecovery = true;

+    if (recoveryConnectionsEnabled)
+    {
+        SetMaxStandbyDelay(maxStandbyDelay);
+        standbyState = STANDBY_UNINITIALIZED;
+    }
+
     /*
      * If user specified recovery_target_timeline, validate it or compute the
      * "latest" value.    We can't do this until after we've gotten the restore
@@ -5217,6 +5303,79 @@ recoveryStopsHere(XLogRecord *record, bool *includeThis)
 }

 /*
+ * Returns bool with current recovery mode, a global state.
+ */
+Datum
+pg_is_in_recovery(PG_FUNCTION_ARGS)
+{
+    PG_RETURN_BOOL(RecoveryInProgress());
+}
+
+/*
+ * Returns timestamp of last recovered commit/abort record.
+ */
+TimestampTz
+GetLatestXLogTime(void)
+{
+    /* use volatile pointer to prevent code rearrangement */
+    volatile XLogCtlData *xlogctl = XLogCtl;
+
+    SpinLockAcquire(&xlogctl->info_lck);
+    recoveryLastXTime = xlogctl->recoveryLastXTime;
+    SpinLockRelease(&xlogctl->info_lck);
+
+    return recoveryLastXTime;
+}
+
+/*
+ * Returns maxStandbyDelay in seconds, or -1 if wait forever
+ */
+int
+GetMaxStandbyDelay(void)
+{
+    /* use volatile pointer to prevent code rearrangement */
+    volatile XLogCtlData *xlogctl = XLogCtl;
+    int        delay;
+
+    SpinLockAcquire(&xlogctl->info_lck);
+    delay = xlogctl->maxStandbyDelay;
+    SpinLockRelease(&xlogctl->info_lck);
+
+    return delay;
+}
+
+static void
+SetMaxStandbyDelay(int delay)
+{
+    /* use volatile pointer to prevent code rearrangement */
+    volatile XLogCtlData *xlogctl = XLogCtl;
+
+    /* 2E6 seconds is about 23 days. */
+    if (delay > INT_MAX || delay < -1)
+        ereport(ERROR,
+         (errmsg("max_standby_delay must be between -1 (wait forever) and 2 000 000 secs")));
+
+    SpinLockAcquire(&xlogctl->info_lck);
+    xlogctl->maxStandbyDelay = delay;
+    SpinLockRelease(&xlogctl->info_lck);
+}
+
+/*
+ * Check to see if max_connections is set high enough on this server
+ * to allow recovery connections to operate correctly. We ignore
+ * autovacuum_max_workers when we make this test.
+ */
+static void
+CheckMaxConnections(int maxcon)
+{
+    if (MaxConnections < maxcon)
+        ereport(ERROR,
+            (errmsg("recovery_connections cannot continue because"
+                    "max_connections %u set lower than WAL master (max_connections = %u)",
+                    MaxConnections, maxcon)));
+}
+
+/*
  * This must be called ONCE during postmaster or standalone-backend startup
  */
 void
@@ -5228,7 +5387,6 @@ StartupXLOG(void)
     bool        reachedStopPoint = false;
     bool        haveBackupLabel = false;
     XLogRecPtr    RecPtr,
-                LastRec,
                 checkPointLoc,
                 backupStopLoc,
                 EndOfLog;
@@ -5238,6 +5396,7 @@ StartupXLOG(void)
     uint32        freespace;
     TransactionId oldestActiveXID;
     bool        bgwriterLaunched = false;
+    bool        backendsAllowed = false;

     /*
      * Read control file and check XLOG status looks valid.
@@ -5506,6 +5665,54 @@ StartupXLOG(void)
                                 BACKUP_LABEL_FILE, BACKUP_LABEL_OLD)));
         }

+        /*
+         * Initialize recovery connections, if enabled. We won't let backends
+         * in yet, not until we've reached the min recovery point specified
+         * in control file and we've established a recovery snapshot from a
+         * running-xacts WAL record or a shutdown checkpoint with no prepared
+         * transactions.
+         */
+        if (standbyState >= STANDBY_UNINITIALIZED)
+        {
+            CheckMaxConnections(checkPoint.MaxConnections);
+            InitRecoveryTransactionEnvironment();
+            StartCleanupDelayStats();
+
+            /*
+             * If we're beginning at a shutdown checkpoint, we know that
+             * nothing was running on the master at this point
+             */
+            if (wasShutdown)
+            {
+                RunningTransactionsData running;
+                TransactionId oldestRunningXid;
+                TransactionId *xids;
+                int nxids;
+
+                oldestRunningXid = PrescanPreparedTransactions();
+
+                xids = StandbyRecoverPreparedTransactions(&nxids);
+
+                /* Construct a RunningTransactions snapshot */
+
+                running.xcnt = nxids;
+                running.numLocks = 0;
+                running.lock_overflow = false;
+                /*
+                 * StandbyRecoverPreparedTransactions called SubTransSetParent
+                 * for any subtransactions, so we consider this a non-overflowed
+                 * snapshot.
+                 */
+                running.subxid_overflow = false;
+                running.nextXid = checkPoint.nextXid;
+                running.oldestRunningXid = oldestRunningXid;
+                running.xids = xids;
+                running.loggableLocks = NULL;
+
+                ProcArrayApplyRecoveryInfo(checkPointLoc, &running);
+            }
+        }
+
         /* Initialize resource managers */
         for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
         {
@@ -5580,7 +5787,9 @@ StartupXLOG(void)
             do
             {
 #ifdef WAL_DEBUG
-                if (XLOG_DEBUG)
+                if (XLOG_DEBUG ||
+                    (rmid == RM_XACT_ID && trace_recovery_messages <= DEBUG2) ||
+                    (rmid != RM_XACT_ID && trace_recovery_messages <= DEBUG3))
                 {
                     StringInfoData buf;

@@ -5608,27 +5817,28 @@ StartupXLOG(void)
                 }

                 /*
-                 * Check if we were requested to exit without finishing
-                 * recovery.
+                 * Have we passed our safe starting point?
                  */
-                if (shutdown_requested)
-                    proc_exit(1);
+                if (!reachedMinRecoveryPoint &&
+                    XLByteLE(minRecoveryPoint, EndRecPtr))
+                {
+                    reachedMinRecoveryPoint = true;
+                    ereport(LOG,
+                            (errmsg("consistent recovery state reached")));
+                }

                 /*
-                 * Have we passed our safe starting point? If so, we can tell
-                 * postmaster that the database is consistent now.
+                 * Have we got a valid starting snapshot that will allow
+                 * queries to be run? If so, we can tell postmaster that
+                 * the database is consistent now, enabling connections.
                  */
-                if (!reachedMinRecoveryPoint &&
-                    XLByteLT(minRecoveryPoint, EndRecPtr))
+                if (standbyState == STANDBY_READY &&
+                    !backendsAllowed &&
+                    reachedMinRecoveryPoint &&
+                    IsUnderPostmaster)
                 {
-                    reachedMinRecoveryPoint = true;
-                    if (InArchiveRecovery)
-                    {
-                        ereport(LOG,
-                              (errmsg("consistent recovery state reached")));
-                        if (IsUnderPostmaster)
-                            SendPostmasterSignal(PMSIGNAL_RECOVERY_CONSISTENT);
-                    }
+                    backendsAllowed = true;
+                    SendPostmasterSignal(PMSIGNAL_RECOVERY_CONSISTENT);
                 }

                 /*
@@ -5662,8 +5872,13 @@ StartupXLOG(void)
                  */
                 SpinLockAcquire(&xlogctl->info_lck);
                 xlogctl->replayEndRecPtr = EndRecPtr;
+                xlogctl->recoveryLastXTime = recoveryLastXTime;
                 SpinLockRelease(&xlogctl->info_lck);

+                /* In Hot Standby mode, keep track of XIDs we've seen */
+                if (InHotStandby && TransactionIdIsValid(record->xl_xid))
+                    RecordKnownAssignedTransactionIds(record->xl_xid);
+
                 RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);

                 /* Pop the error context stack */
@@ -5892,13 +6107,29 @@ StartupXLOG(void)
     TransactionIdRetreat(ShmemVariableCache->latestCompletedXid);

     /* Start up the commit log and related stuff, too */
-    StartupCLOG();
-    StartupSUBTRANS(oldestActiveXID);
-    StartupMultiXact();
+    /* XXXHS: perhaps this should go after XactClearRecoveryTransactions */
+    /* XXX: These can get called twice in hot standby mode, if the recovery
+     * snapshot is in "pending" state */
+    if (!backendsAllowed)
+    {
+        StartupCLOG();
+        StartupSUBTRANS(oldestActiveXID);
+        StartupMultiXact();
+    }

     /* Reload shared-memory state for prepared transactions */
     RecoverPreparedTransactions();

+    /*
+     * Shutdown the recovery environment. This must occur after
+     * RecoverPreparedTransactions(), see notes for lock_twophase_recover()
+     */
+    if (standbyState >= STANDBY_UNINITIALIZED)
+    {
+        EndCleanupDelayStats();
+        XactClearRecoveryTransactions();
+    }
+
     /* Shut down readFile facility, free space */
     if (readFile >= 0)
     {
@@ -5964,8 +6195,9 @@ RecoveryInProgress(void)

         /*
          * Initialize TimeLineID and RedoRecPtr when we discover that recovery
-         * is finished.  (If you change this, see also
-         * LocalSetXLogInsertAllowed.)
+         * is finished. InitPostgres() relies upon this behaviour to ensure
+         * that InitXLOGAccess() is called at backend startup.  (If you change
+         * this, see also LocalSetXLogInsertAllowed.)
          */
         if (!LocalRecoveryInProgress)
             InitXLOGAccess();
@@ -6151,7 +6383,7 @@ InitXLOGAccess(void)
 {
     /* ThisTimeLineID doesn't change so we need no lock to copy it */
     ThisTimeLineID = XLogCtl->ThisTimeLineID;
-    Assert(ThisTimeLineID != 0);
+    Assert(ThisTimeLineID != 0 || IsBootstrapProcessingMode());

     /* Use GetRedoRecPtr to copy the RedoRecPtr safely */
     (void) GetRedoRecPtr();
@@ -6448,6 +6680,7 @@ CreateCheckPoint(int flags)
     /* Begin filling in the checkpoint WAL record */
     MemSet(&checkPoint, 0, sizeof(checkPoint));
     checkPoint.time = (pg_time_t) time(NULL);
+    checkPoint.MaxConnections = MaxConnections;

     /*
      * We must hold WALInsertLock while examining insert state to determine
@@ -6743,6 +6976,27 @@ CreateCheckPoint(int flags)
                                      CheckpointStats.ckpt_segs_recycled);

     LWLockRelease(CheckpointLock);
+
+    /*
+     * Take a snapshot of running transactions and write this to WAL.
+     * This allows us to reconstruct the state of running transactions
+     * during archive recovery, if required. If we aren't archiving,
+     * don't bother.
+     *
+     * If we are shutting down, or Startup process is completing crash
+     * recovery we don't need to write running xact data.
+     */
+    if (!shutdown && XLogArchivingActive() && !RecoveryInProgress())
+    {
+        /*
+         * GetRunningTransactionData() inserts WAL records while holding
+         * ProcArrayLock. Make sure we flush WAL first so we reduce the
+         * chance of needing to flush WAL during XLogInsert(), which might
+         * mean we hold ProcArrayLock across an I/O, which could be bad.
+         */
+        XLogBackgroundFlush();
+        GetRunningTransactionData();
+    }
 }

 /*
@@ -6781,6 +7035,11 @@ RecoveryRestartPoint(const CheckPoint *checkPoint)
     volatile XLogCtlData *xlogctl = XLogCtl;

     /*
+     * Regular reports of wait statistics. Unrelated to restartpoints.
+     */
+    ReportCleanupDelayStats();
+
+    /*
      * Is it safe to checkpoint?  We must ask each of the resource managers
      * whether they have any partial state information that might prevent a
      * correct restart from this point.  If so, we skip this opportunity, but
@@ -6791,7 +7050,7 @@ RecoveryRestartPoint(const CheckPoint *checkPoint)
         if (RmgrTable[rmid].rm_safe_restartpoint != NULL)
             if (!(RmgrTable[rmid].rm_safe_restartpoint()))
             {
-                elog(DEBUG2, "RM %d not safe to record restart point at %X/%X",
+                elog(trace_recovery(DEBUG2), "RM %d not safe to record restart point at %X/%X",
                      rmid,
                      checkPoint->redo.xlogid,
                      checkPoint->redo.xrecoff);
@@ -7036,6 +7295,41 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
         ShmemVariableCache->oldestXid = checkPoint.oldestXid;
         ShmemVariableCache->oldestXidDB = checkPoint.oldestXidDB;

+        /* We know nothing was running on the master at this point */
+        if (standbyState >= STANDBY_UNINITIALIZED)
+        {
+            RunningTransactionsData running;
+            TransactionId oldestRunningXid;
+            TransactionId *xids;
+            int nxids;
+
+            oldestRunningXid = PrescanPreparedTransactions();
+
+            xids = StandbyRecoverPreparedTransactions(&nxids);
+
+            /* Construct a RunningTransactions snapshot */
+
+            running.xcnt = nxids;
+            running.numLocks = 0;
+            running.lock_overflow = false;
+            /*
+             * StandbyRecoverPreparedTransactions called SubTransSetParent
+             * for any subtransactions, so we consider this a non-overflowed
+             * snapshot.
+             */
+            running.subxid_overflow = false;
+            running.nextXid = checkPoint.nextXid;
+            running.oldestRunningXid = oldestRunningXid;
+            running.xids = xids;
+            running.loggableLocks = NULL;
+
+            ProcArrayApplyRecoveryInfo(lsn, &running);
+        }
+
+        /* Check to see if any changes to max_connections give problems */
+        if (standbyState >= STANDBY_UNINITIALIZED)
+            CheckMaxConnections(checkPoint.MaxConnections);
+
         /* ControlFile->checkPointCopy always tracks the latest ckpt XID */
         ControlFile->checkPointCopy.nextXidEpoch = checkPoint.nextXidEpoch;
         ControlFile->checkPointCopy.nextXid = checkPoint.nextXid;
@@ -7155,6 +7449,9 @@ xlog_outrec(StringInfo buf, XLogRecord *record)
                      record->xl_prev.xlogid, record->xl_prev.xrecoff,
                      record->xl_xid);

+    appendStringInfo(buf, "; len %u",
+                     record->xl_len);
+
     for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
     {
         if (record->xl_info & XLR_SET_BKP_BLOCK(i))
@@ -7311,6 +7608,12 @@ pg_start_backup(PG_FUNCTION_ARGS)
                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                  errmsg("must be superuser to run a backup")));

+    if (RecoveryInProgress())
+        ereport(ERROR,
+                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                 errmsg("recovery is in progress"),
+                 errhint("WAL control functions cannot be executed during recovery.")));
+
     if (!XLogArchivingActive())
         ereport(ERROR,
                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -7498,6 +7801,12 @@ pg_stop_backup(PG_FUNCTION_ARGS)
                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                  (errmsg("must be superuser to run a backup"))));

+    if (RecoveryInProgress())
+        ereport(ERROR,
+                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                 errmsg("recovery is in progress"),
+                 errhint("WAL control functions cannot be executed during recovery.")));
+
     if (!XLogArchivingActive())
         ereport(ERROR,
                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -7659,6 +7968,12 @@ pg_switch_xlog(PG_FUNCTION_ARGS)
                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
              (errmsg("must be superuser to switch transaction log files"))));

+    if (RecoveryInProgress())
+        ereport(ERROR,
+                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                 errmsg("recovery is in progress"),
+                 errhint("WAL control functions cannot be executed during recovery.")));
+
     switchpoint = RequestXLogSwitch();

     /*
@@ -7681,6 +7996,12 @@ pg_current_xlog_location(PG_FUNCTION_ARGS)
 {
     char        location[MAXFNAMELEN];

+    if (RecoveryInProgress())
+        ereport(ERROR,
+                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                 errmsg("recovery is in progress"),
+                 errhint("WAL control functions cannot be executed during recovery.")));
+
     /* Make sure we have an up-to-date local LogwrtResult */
     {
         /* use volatile pointer to prevent code rearrangement */
@@ -7708,6 +8029,12 @@ pg_current_xlog_insert_location(PG_FUNCTION_ARGS)
     XLogRecPtr    current_recptr;
     char        location[MAXFNAMELEN];

+    if (RecoveryInProgress())
+        ereport(ERROR,
+                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                 errmsg("recovery is in progress"),
+                 errhint("WAL control functions cannot be executed during recovery.")));
+
     /*
      * Get the current end-of-WAL position ... shared lock is sufficient
      */
diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c
index 035baee..6e91c92 100644
--- a/src/backend/commands/dbcommands.c
+++ b/src/backend/commands/dbcommands.c
@@ -26,6 +26,7 @@

 #include "access/genam.h"
 #include "access/heapam.h"
+#include "access/transam.h"
 #include "access/xact.h"
 #include "access/xlogutils.h"
 #include "catalog/catalog.h"
@@ -51,6 +52,7 @@
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
+#include "utils/inval.h"
 #include "utils/lsyscache.h"
 #include "utils/pg_locale.h"
 #include "utils/snapmgr.h"
@@ -1937,6 +1939,27 @@ dbase_redo(XLogRecPtr lsn, XLogRecord *record)

         dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);

+        if (InHotStandby)
+        {
+            VirtualTransactionId *database_users;
+
+            /*
+             * Find all users connected to this database and ask them
+             * politely to immediately kill their sessions before processing
+             * the drop database record, after the usual grace period.
+             * We don't wait for commit because drop database is
+             * non-transactional.
+             */
+            database_users = GetConflictingVirtualXIDs(InvalidTransactionId,
+                                                        xlrec->db_id,
+                                                        false);
+
+            ResolveRecoveryConflictWithVirtualXIDs(database_users,
+                                                    "drop database",
+                                                    CONFLICT_MODE_FATAL,
+                                                    InvalidXLogRecPtr);
+        }
+
         /* Drop pages for this database that are in the shared buffer cache */
         DropDatabaseBuffers(xlrec->db_id);

diff --git a/src/backend/commands/lockcmds.c b/src/backend/commands/lockcmds.c
index 1e5c92e..e8cc15f 100644
--- a/src/backend/commands/lockcmds.c
+++ b/src/backend/commands/lockcmds.c
@@ -47,6 +47,13 @@ LockTableCommand(LockStmt *lockstmt)

         reloid = RangeVarGetRelid(relation, false);

+        /*
+         * During recovery we only accept these variations:
+         * LOCK TABLE foo IN ACCESS SHARE MODE which is effectively a no-op
+         */
+        if (lockstmt->mode != AccessShareLock)
+            PreventCommandDuringRecovery();
+
         LockTableRecurse(reloid, relation,
                          lockstmt->mode, lockstmt->nowait, recurse);
     }
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index 4210f44..703dbd1 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -458,6 +458,9 @@ nextval_internal(Oid relid)
                 rescnt = 0;
     bool        logit = false;

+    /* All nextval() write to database and must be prevented during recovery */
+    PreventCommandDuringRecovery();
+
     /* open and AccessShareLock sequence */
     init_sequence(relid, &elm, &seqrel);

@@ -1343,6 +1346,11 @@ seq_redo(XLogRecPtr lsn, XLogRecord *record)
     /* Backup blocks are not used in seq records */
     Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));

+#ifdef NOT_USED
+    /* XXX we don't use backup blocks in this record type */
+    RestoreBkpBlocks(lsn, record, false);
+#endif
+
     if (info != XLOG_SEQ_LOG)
         elog(PANIC, "seq_redo: unknown op code %u", info);

diff --git a/src/backend/commands/tablespace.c b/src/backend/commands/tablespace.c
index f119cf0..e75757b 100644
--- a/src/backend/commands/tablespace.c
+++ b/src/backend/commands/tablespace.c
@@ -50,6 +50,7 @@

 #include "access/heapam.h"
 #include "access/sysattr.h"
+#include "access/transam.h"
 #include "access/xact.h"
 #include "catalog/catalog.h"
 #include "catalog/dependency.h"
@@ -60,10 +61,12 @@
 #include "miscadmin.h"
 #include "postmaster/bgwriter.h"
 #include "storage/fd.h"
+#include "storage/procarray.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
 #include "utils/guc.h"
+#include "utils/inval.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
@@ -1328,11 +1331,59 @@ tblspc_redo(XLogRecPtr lsn, XLogRecord *record)
     {
         xl_tblspc_drop_rec *xlrec = (xl_tblspc_drop_rec *) XLogRecGetData(record);

+        /*
+         * If we issued a WAL record for a drop tablespace it is
+         * because there were no files in it at all. That means that
+         * no permanent objects can exist in it at this point.
+         *
+         * It is possible for standby users to be using this tablespace
+         * as a location for their temporary files, so if we fail to
+         * remove all files then do conflict processing and try again,
+         * if currently enabled.
+         */
         if (!remove_tablespace_directories(xlrec->ts_id, true))
-            ereport(ERROR,
+        {
+            VirtualTransactionId *temp_file_users;
+
+            /*
+             * Standby users may be currently using this tablespace for
+             * for their temporary files. We only care about current
+             * users because temp_tablespace parameter will just ignore
+             * tablespaces that no longer exist.
+             *
+             * Ask everybody to cancel their queries immediately so
+             * we can ensure no temp files remain and we can remove the
+             * tablespace. Nuke the entire site from orbit, it's the only
+             * way to be sure.
+             *
+             * XXX: We could work out the pids of active backends
+             * using this tablespace by examining the temp filenames in the
+             * directory. We would then convert the pids into VirtualXIDs
+             * before attempting to cancel them.
+             *
+             * We don't wait for commit because drop tablespace is
+             * non-transactional.
+             */
+            temp_file_users = GetConflictingVirtualXIDs(InvalidTransactionId,
+                                                        InvalidOid,
+                                                        false);
+            ResolveRecoveryConflictWithVirtualXIDs(temp_file_users,
+                                                    "drop tablespace",
+                                                    CONFLICT_MODE_ERROR_IF_NOT_IDLE,
+                                                    InvalidXLogRecPtr);
+
+            /*
+             * If we did recovery processing then hopefully the
+             * backends who wrote temp files should have cleaned up and
+             * exited by now. So lets recheck before we throw an error.
+             * If !process_conflicts then this will just fail again.
+             */
+            if (!remove_tablespace_directories(xlrec->ts_id, true))
+                ereport(ERROR,
                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                      errmsg("tablespace %u is not empty",
                             xlrec->ts_id)));
+        }
     }
     else
         elog(PANIC, "tblspc_redo: unknown op code %u", info);
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 7f2e270..7255e75 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -140,6 +140,7 @@ typedef struct VRelStats
     /* vtlinks array for tuple chain following - sorted by new_tid */
     int            num_vtlinks;
     VTupleLink    vtlinks;
+    TransactionId    latestRemovedXid;
 } VRelStats;

 /*----------------------------------------------------------------------
@@ -223,7 +224,7 @@ static void scan_heap(VRelStats *vacrelstats, Relation onerel,
 static bool repair_frag(VRelStats *vacrelstats, Relation onerel,
             VacPageList vacuum_pages, VacPageList fraged_pages,
             int nindexes, Relation *Irel);
-static void move_chain_tuple(Relation rel,
+static void move_chain_tuple(VRelStats *vacrelstats, Relation rel,
                  Buffer old_buf, Page old_page, HeapTuple old_tup,
                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
                  ExecContext ec, ItemPointer ctid, bool cleanVpd);
@@ -236,7 +237,7 @@ static void update_hint_bits(Relation rel, VacPageList fraged_pages,
                  int num_moved);
 static void vacuum_heap(VRelStats *vacrelstats, Relation onerel,
             VacPageList vacpagelist);
-static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage);
+static void vacuum_page(VRelStats *vacrelstats, Relation onerel, Buffer buffer, VacPage vacpage);
 static void vacuum_index(VacPageList vacpagelist, Relation indrel,
              double num_tuples, int keep_tuples);
 static void scan_index(Relation indrel, double num_tuples);
@@ -1277,6 +1278,7 @@ full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt)
     vacrelstats->rel_tuples = 0;
     vacrelstats->rel_indexed_tuples = 0;
     vacrelstats->hasindex = false;
+    vacrelstats->latestRemovedXid = InvalidTransactionId;

     /* scan the heap */
     vacuum_pages.num_pages = fraged_pages.num_pages = 0;
@@ -1682,6 +1684,9 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
             {
                 ItemId        lpp;

+                HeapTupleHeaderAdvanceLatestRemovedXid(tuple.t_data,
+                                            &vacrelstats->latestRemovedXid);
+
                 /*
                  * Here we are building a temporary copy of the page with dead
                  * tuples removed.    Below we will apply
@@ -1999,7 +2004,7 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                 /* there are dead tuples on this page - clean them */
                 Assert(!isempty);
                 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
-                vacuum_page(onerel, buf, last_vacuum_page);
+                vacuum_page(vacrelstats, onerel, buf, last_vacuum_page);
                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
             }
             else
@@ -2488,7 +2493,7 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                     tuple.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
                     tuple_len = tuple.t_len = ItemIdGetLength(Citemid);

-                    move_chain_tuple(onerel, Cbuf, Cpage, &tuple,
+                    move_chain_tuple(vacrelstats, onerel, Cbuf, Cpage, &tuple,
                                      dst_buffer, dst_page, destvacpage,
                                      &ec, &Ctid, vtmove[ti].cleanVpd);

@@ -2574,7 +2579,7 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                 dst_page = BufferGetPage(dst_buffer);
                 /* if this page was not used before - clean it */
                 if (!PageIsEmpty(dst_page) && dst_vacpage->offsets_used == 0)
-                    vacuum_page(onerel, dst_buffer, dst_vacpage);
+                    vacuum_page(vacrelstats, onerel, dst_buffer, dst_vacpage);
             }
             else
                 LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
@@ -2727,7 +2732,7 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
         HOLD_INTERRUPTS();
         heldoff = true;
         ForceSyncCommit();
-        (void) RecordTransactionCommit();
+        (void) RecordTransactionCommit(true);
     }

     /*
@@ -2755,7 +2760,7 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
             LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
             page = BufferGetPage(buf);
             if (!PageIsEmpty(page))
-                vacuum_page(onerel, buf, *curpage);
+                vacuum_page(vacrelstats, onerel, buf, *curpage);
             UnlockReleaseBuffer(buf);
         }
     }
@@ -2891,7 +2896,7 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                 recptr = log_heap_clean(onerel, buf,
                                         NULL, 0, NULL, 0,
                                         unused, uncnt,
-                                        false);
+                                        vacrelstats->latestRemovedXid, false);
                 PageSetLSN(page, recptr);
                 PageSetTLI(page, ThisTimeLineID);
             }
@@ -2943,7 +2948,7 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
  *        already too long and almost unreadable.
  */
 static void
-move_chain_tuple(Relation rel,
+move_chain_tuple(VRelStats *vacrelstats, Relation rel,
                  Buffer old_buf, Page old_page, HeapTuple old_tup,
                  Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
                  ExecContext ec, ItemPointer ctid, bool cleanVpd)
@@ -3001,7 +3006,7 @@ move_chain_tuple(Relation rel,
         int            sv_offsets_used = dst_vacpage->offsets_used;

         dst_vacpage->offsets_used = 0;
-        vacuum_page(rel, dst_buf, dst_vacpage);
+        vacuum_page(vacrelstats, rel, dst_buf, dst_vacpage);
         dst_vacpage->offsets_used = sv_offsets_used;
     }

@@ -3341,7 +3346,7 @@ vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
             buf = ReadBufferExtended(onerel, MAIN_FORKNUM, (*vacpage)->blkno,
                                      RBM_NORMAL, vac_strategy);
             LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
-            vacuum_page(onerel, buf, *vacpage);
+            vacuum_page(vacrelstats, onerel, buf, *vacpage);
             UnlockReleaseBuffer(buf);
         }
     }
@@ -3371,7 +3376,7 @@ vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
  * Caller must hold pin and lock on buffer.
  */
 static void
-vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage)
+vacuum_page(VRelStats *vacrelstats, Relation onerel, Buffer buffer, VacPage vacpage)
 {
     Page        page = BufferGetPage(buffer);
     int            i;
@@ -3400,7 +3405,7 @@ vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage)
         recptr = log_heap_clean(onerel, buffer,
                                 NULL, 0, NULL, 0,
                                 vacpage->offsets, vacpage->offsets_free,
-                                false);
+                                vacrelstats->latestRemovedXid, false);
         PageSetLSN(page, recptr);
         PageSetTLI(page, ThisTimeLineID);
     }
diff --git a/src/backend/commands/vacuumlazy.c b/src/backend/commands/vacuumlazy.c
index d94bac0..d4a6b5f 100644
--- a/src/backend/commands/vacuumlazy.c
+++ b/src/backend/commands/vacuumlazy.c
@@ -98,6 +98,7 @@ typedef struct LVRelStats
     int            max_dead_tuples;    /* # slots allocated in array */
     ItemPointer dead_tuples;    /* array of ItemPointerData */
     int            num_index_scans;
+    TransactionId latestRemovedXid;
 } LVRelStats;


@@ -264,6 +265,34 @@ lazy_vacuum_rel(Relation onerel, VacuumStmt *vacstmt,
     return heldoff;
 }

+/*
+ * For Hot Standby we need to know the highest transaction id that will
+ * be removed by any change. VACUUM proceeds in a number of passes so
+ * we need to consider how each pass operates. The first phase runs
+ * heap_page_prune(), which can issue XLOG_HEAP2_CLEAN records as it
+ * progresses - these will have a latestRemovedXid on each record.
+ * In some cases this removes all of the tuples to be removed, though
+ * often we have dead tuples with index pointers so we must remember them
+ * for removal in phase 3. Index records for those rows are removed
+ * in phase 2 and index blocks do not have MVCC information attached.
+ * So before we can allow removal of any index tuples we need to issue
+ * a WAL record containing the latestRemovedXid of rows that will be
+ * removed in phase three. This allows recovery queries to block at the
+ * correct place, i.e. before phase two, rather than during phase three
+ * which would be after the rows have become inaccessible.
+ */
+static void
+vacuum_log_cleanup_info(Relation rel, LVRelStats *vacrelstats)
+{
+    /*
+     * No need to log changes for temp tables, they do not contain
+     * data visible on the standby server.
+     */
+    if (rel->rd_istemp || !XLogArchivingActive())
+        return;
+
+    (void) log_heap_cleanup_info(rel->rd_node, vacrelstats->latestRemovedXid);
+}

 /*
  *    lazy_scan_heap() -- scan an open heap relation
@@ -314,6 +343,7 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
     nblocks = RelationGetNumberOfBlocks(onerel);
     vacrelstats->rel_pages = nblocks;
     vacrelstats->nonempty_pages = 0;
+    vacrelstats->latestRemovedXid = InvalidTransactionId;

     lazy_space_alloc(vacrelstats, nblocks);

@@ -372,6 +402,9 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
         if ((vacrelstats->max_dead_tuples - vacrelstats->num_dead_tuples) < MaxHeapTuplesPerPage &&
             vacrelstats->num_dead_tuples > 0)
         {
+            /* Log cleanup info before we touch indexes */
+            vacuum_log_cleanup_info(onerel, vacrelstats);
+
             /* Remove index entries */
             for (i = 0; i < nindexes; i++)
                 lazy_vacuum_index(Irel[i],
@@ -381,6 +414,7 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
             lazy_vacuum_heap(onerel, vacrelstats);
             /* Forget the now-vacuumed tuples, and press on */
             vacrelstats->num_dead_tuples = 0;
+            vacrelstats->latestRemovedXid = InvalidTransactionId;
             vacrelstats->num_index_scans++;
         }

@@ -612,6 +646,8 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
             if (tupgone)
             {
                 lazy_record_dead_tuple(vacrelstats, &(tuple.t_self));
+                HeapTupleHeaderAdvanceLatestRemovedXid(tuple.t_data,
+                                                &vacrelstats->latestRemovedXid);
                 tups_vacuumed += 1;
             }
             else
@@ -660,6 +696,7 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
             lazy_vacuum_page(onerel, blkno, buf, 0, vacrelstats);
             /* Forget the now-vacuumed tuples, and press on */
             vacrelstats->num_dead_tuples = 0;
+            vacrelstats->latestRemovedXid = InvalidTransactionId;
             vacuumed_pages++;
         }

@@ -723,6 +760,9 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
     /* XXX put a threshold on min number of tuples here? */
     if (vacrelstats->num_dead_tuples > 0)
     {
+        /* Log cleanup info before we touch indexes */
+        vacuum_log_cleanup_info(onerel, vacrelstats);
+
         /* Remove index entries */
         for (i = 0; i < nindexes; i++)
             lazy_vacuum_index(Irel[i],
@@ -867,7 +907,7 @@ lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
         recptr = log_heap_clean(onerel, buffer,
                                 NULL, 0, NULL, 0,
                                 unused, uncnt,
-                                false);
+                                vacrelstats->latestRemovedXid, false);
         PageSetLSN(page, recptr);
         PageSetTLI(page, ThisTimeLineID);
     }
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index e94181a..3678ea4 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -245,8 +245,9 @@ static bool RecoveryError = false;        /* T if WAL recovery failed */
  * When archive recovery is finished, the startup process exits with exit
  * code 0 and we switch to PM_RUN state.
  *
- * Normal child backends can only be launched when we are in PM_RUN state.
- * (We also allow it in PM_WAIT_BACKUP state, but only for superusers.)
+ * Normal child backends can only be launched when we are in PM_RUN or
+ * PM_RECOVERY_CONSISTENT state.  (We also allow launch of normal
+ * child backends in PM_WAIT_BACKUP state, but only for superusers.)
  * In other states we handle connection requests by launching "dead_end"
  * child processes, which will simply send the client an error message and
  * quit.  (We track these in the BackendList so that we can know when they
@@ -1868,7 +1869,7 @@ static enum CAC_state
 canAcceptConnections(void)
 {
     /*
-     * Can't start backends when in startup/shutdown/recovery state.
+     * Can't start backends when in startup/shutdown/inconsistent recovery state.
      *
      * In state PM_WAIT_BACKUP only superusers can connect (this must be
      * allowed so that a superuser can end online backup mode); we return
@@ -1882,9 +1883,11 @@ canAcceptConnections(void)
             return CAC_SHUTDOWN;    /* shutdown is pending */
         if (!FatalError &&
             (pmState == PM_STARTUP ||
-             pmState == PM_RECOVERY ||
-             pmState == PM_RECOVERY_CONSISTENT))
+             pmState == PM_RECOVERY))
             return CAC_STARTUP; /* normal startup */
+        if (!FatalError &&
+             pmState == PM_RECOVERY_CONSISTENT)
+            return CAC_OK; /* connection OK during recovery */
         return CAC_RECOVERY;    /* else must be crash recovery */
     }

@@ -4003,9 +4006,8 @@ sigusr1_handler(SIGNAL_ARGS)
         Assert(PgStatPID == 0);
         PgStatPID = pgstat_start();

-        /* XXX at this point we could accept read-only connections */
-        ereport(DEBUG1,
-                (errmsg("database system is in consistent recovery mode")));
+        ereport(LOG,
+                 (errmsg("database system is ready to accept read only connections")));

         pmState = PM_RECOVERY_CONSISTENT;
     }
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index de28374..1444f72 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -33,6 +33,8 @@
 #include <sys/file.h>
 #include <unistd.h>

+#include "access/xact.h"
+#include "access/xlogdefs.h"
 #include "catalog/catalog.h"
 #include "miscadmin.h"
 #include "pg_trace.h"
@@ -78,7 +80,9 @@ static bool IsForInput;

 /* local state for LockBufferForCleanup */
 static volatile BufferDesc *PinCountWaitBuf = NULL;
-
+static long        CleanupWaitSecs = 0;
+static int        CleanupWaitUSecs = 0;
+static bool        CleanupWaitStats = false;

 static Buffer ReadBuffer_common(SMgrRelation reln, bool isLocalBuf,
                   ForkNumber forkNum, BlockNumber blockNum,
@@ -89,6 +93,7 @@ static void PinBuffer_Locked(volatile BufferDesc *buf);
 static void UnpinBuffer(volatile BufferDesc *buf, bool fixOwner);
 static void BufferSync(int flags);
 static int    SyncOneBuffer(int buf_id, bool skip_recently_used);
+static void CleanupDelayStats(TimestampTz start_ts, TimestampTz end_ts);
 static void WaitIO(volatile BufferDesc *buf);
 static bool StartBufferIO(volatile BufferDesc *buf, bool forInput);
 static void TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty,
@@ -2441,6 +2446,8 @@ LockBufferForCleanup(Buffer buffer)

     for (;;)
     {
+        TimestampTz     start_ts = 0;
+
         /* Try to acquire lock */
         LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
         LockBufHdr(bufHdr);
@@ -2463,9 +2470,14 @@ LockBufferForCleanup(Buffer buffer)
         PinCountWaitBuf = bufHdr;
         UnlockBufHdr(bufHdr);
         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+        if (CleanupWaitStats)
+            start_ts = GetCurrentTimestamp();
         /* Wait to be signaled by UnpinBuffer() */
         ProcWaitForSignal();
         PinCountWaitBuf = NULL;
+        if (CleanupWaitStats)
+            CleanupDelayStats(start_ts, GetCurrentTimestamp());
+
         /* Loop back and try again */
     }
 }
@@ -2518,6 +2530,54 @@ ConditionalLockBufferForCleanup(Buffer buffer)
     return false;
 }

+/*
+ * On standby servers only the Startup process applies Cleanup. As a result
+ * a single buffer pin can be enough to effectively halt recovery for short
+ * periods. We need special instrumentation to monitor this so we can judge
+ * whether additional measures are required to control the negative effects.
+ */
+void
+StartCleanupDelayStats(void)
+{
+    CleanupWaitSecs = 0;
+    CleanupWaitUSecs = 0;
+    CleanupWaitStats = true;
+}
+
+void
+EndCleanupDelayStats(void)
+{
+    CleanupWaitStats = false;
+}
+
+/*
+ * Called by Startup process whenever we request restartpoint
+ */
+void
+ReportCleanupDelayStats(void)
+{
+    Assert(InRecovery);
+
+    ereport(DEBUG1, (errmsg("cleanup wait total=%ld.%03d s",
+                CleanupWaitSecs, CleanupWaitUSecs / 1000)));
+}
+
+static void
+CleanupDelayStats(TimestampTz start_ts, TimestampTz end_ts)
+{
+    long            wait_secs;
+    int                wait_usecs;
+
+    TimestampDifference(start_ts, end_ts, &wait_secs, &wait_usecs);
+
+    CleanupWaitSecs +=wait_secs;
+    CleanupWaitUSecs +=wait_usecs;
+    if (CleanupWaitUSecs > 999999)
+    {
+        CleanupWaitSecs += 1;
+        CleanupWaitUSecs -= 1000000;
+    }
+}

 /*
  *    Functions for buffer I/O handling
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index d3b94e7..3b4b2ae 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -17,6 +17,20 @@
  * as are the myProcLocks lists.  They can be distinguished from regular
  * backend PGPROCs at need by checking for pid == 0.
  *
+ * During recovery, we also keep a list of XIDs representing transactions
+ * that are known to be running at current point in WAL recovery. This
+ * list is kept in the KnownAssignedXids array, and updated by watching
+ * the sequence of arriving xids. This is very important because if we leave
+ * those xids out of the snapshot then they will appear to be already complete.
+ * Later, when they have actually completed this could lead to confusion as to
+ * whether those xids are visible or not, blowing a huge hole in MVCC.
+ * We need 'em.
+ *
+ * It is theoretically possible for a FATAL error to explode before writing
+ * an abort record. This could tie up KnownAssignedXids indefinitely, so
+ * we prune the array when a valid list of running xids arrives. These quirks,
+ * if they do ever exist in reality will not effect the correctness of
+ * snapshots.
  *
  * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
@@ -31,14 +45,19 @@

 #include <signal.h>

+#include "access/clog.h"
+#include "access/multixact.h"
 #include "access/subtrans.h"
 #include "access/transam.h"
 #include "access/xact.h"
 #include "access/twophase.h"
 #include "miscadmin.h"
 #include "storage/procarray.h"
+#include "utils/builtins.h"
+#include "utils/inval.h"
 #include "utils/snapmgr.h"

+static RunningTransactionsData    CurrentRunningXactsData;

 /* Our shared memory area */
 typedef struct ProcArrayStruct
@@ -46,6 +65,13 @@ typedef struct ProcArrayStruct
     int            numProcs;        /* number of valid procs entries */
     int            maxProcs;        /* allocated size of procs array */

+    int            maxKnownAssignedXids;    /* allocated size of known assigned xids */
+    /*
+     * Highest subxid that overflowed KnownAssignedXids array. Similar to
+     * overflowing cached subxids in PGPROC entries.
+     */
+    TransactionId    lastOverflowedXid;
+
     /*
      * We declare procs[] as 1 entry because C wants a fixed-size array, but
      * actually it is maxProcs entries long.
@@ -55,6 +81,19 @@ typedef struct ProcArrayStruct

 static ProcArrayStruct *procArray;

+/*
+ * Bookkeeping for tracking emulated transactions in recovery
+ */
+static HTAB *KnownAssignedXidsHash;
+static TransactionId    latestObservedXid = InvalidTransactionId;
+
+/*
+ * If we're in STANDBY_SNAPSHOT_PENDING state, standbySnapshotPendingXmin is
+ * the highest xid that might still be running that we don't have in
+ * KnownAssignedXids.
+ */
+static TransactionId standbySnapshotPendingXmin;
+static TransactionId nextWraparoundCheckXid = InvalidTransactionId;

 #ifdef XIDCACHE_DEBUG

@@ -90,6 +129,16 @@ static void DisplayXidCache(void);
 #define xc_slow_answer_inc()        ((void) 0)
 #endif   /* XIDCACHE_DEBUG */

+/* Primitives for KnownAssignedXids array handling for standby */
+static Size KnownAssignedXidsShmemSize(int size);
+static void KnownAssignedXidsInit(int size);
+static int  KnownAssignedXidsGet(TransactionId *xarray, TransactionId *xmin,
+                                 TransactionId xmax);
+static bool IsXidKnownAssigned(TransactionId xid);
+static void KnownAssignedXidsAdd(TransactionId *xids, int nxids);
+static void KnownAssignedXidsRemove(TransactionId xid);
+static void KnownAssignedXidsRemoveMany(TransactionId xid);
+static void KnownAssignedXidsDisplay(int trace_level);

 /*
  * Report shared-memory space needed by CreateSharedProcArray.
@@ -100,8 +149,14 @@ ProcArrayShmemSize(void)
     Size        size;

     size = offsetof(ProcArrayStruct, procs);
-    size = add_size(size, mul_size(sizeof(PGPROC *),
-                                 add_size(MaxBackends, max_prepared_xacts)));
+
+    /* Normal processing - MyProc slots */
+#define PROCARRAY_MAXPROCS (MaxBackends + max_prepared_xacts)
+    size = add_size(size, mul_size(sizeof(PGPROC *), PROCARRAY_MAXPROCS));
+
+    /* Recovery processing  - KnownAssignedXids */
+#define MAX_KNOWN_ASSIGNED_XIDS ((PGPROC_MAX_CACHED_SUBXIDS + 1) * PROCARRAY_MAXPROCS)
+    size = add_size(size, KnownAssignedXidsShmemSize(MAX_KNOWN_ASSIGNED_XIDS));

     return size;
 }
@@ -116,15 +171,31 @@ CreateSharedProcArray(void)

     /* Create or attach to the ProcArray shared structure */
     procArray = (ProcArrayStruct *)
-        ShmemInitStruct("Proc Array", ProcArrayShmemSize(), &found);
+        ShmemInitStruct("Proc Array",
+                            mul_size(sizeof(PGPROC *), PROCARRAY_MAXPROCS),
+                            &found);
+
+    /*
+     * XXX currently we don't know that we are InHotStandby until
+     * after we have created shared memory. If recovery.conf was read
+     * earlier then we would know whether to create this or not. We
+     * only need it if we startup in recovery. This is 26kB by default,
+     * plus hash table overhead, so don't worry too much. It's passive
+     * so there's less danger of it causing trouble when not in use.
+     */
+    KnownAssignedXidsInit(MAX_KNOWN_ASSIGNED_XIDS);

     if (!found)
     {
         /*
          * We're the first - initialize.
          */
+        /* Normal processing */
         procArray->numProcs = 0;
-        procArray->maxProcs = MaxBackends + max_prepared_xacts;
+        procArray->maxProcs = PROCARRAY_MAXPROCS;
+
+        procArray->maxKnownAssignedXids = MAX_KNOWN_ASSIGNED_XIDS;
+        procArray->lastOverflowedXid = InvalidTransactionId;
     }
 }

@@ -302,6 +373,7 @@ ProcArrayClearTransaction(PGPROC *proc)
     proc->xid = InvalidTransactionId;
     proc->lxid = InvalidLocalTransactionId;
     proc->xmin = InvalidTransactionId;
+    proc->recoveryConflictMode = 0;

     /* redundant, but just in case */
     proc->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
@@ -312,6 +384,228 @@ ProcArrayClearTransaction(PGPROC *proc)
     proc->subxids.overflowed = false;
 }

+/*
+ * ProcArrayApplyRecoveryInfo -- apply recovery info about xids and locks
+ *
+ * Takes us through 3 states: Uninitialized, Pending and Ready.
+ * Normal case is to go all the way to Ready straight away, though there
+ * are atypical cases where we need to take it in steps.
+ *
+ * Use the data about running transactions on master to create the initial
+ * state of KnownAssignedXids. We also these records to regularly prune
+ * KnownAssignedXids because we know it is possible that some transactions
+ * with FATAL errors do not write abort records, which could cause eventual
+ * overflow.
+ *
+ * Only used during recovery. Notice the signature is very similar to a
+ * _redo function and its difficult to decide exactly where this code should
+ * reside.
+ */
+void
+ProcArrayApplyRecoveryInfo(XLogRecPtr lsn, RunningTransactions running)
+{
+    int                xid_index;    /* main loop */
+    TransactionId    *xids;
+    int                nxids;
+    TransactionId    oldestXid;
+    int                i;
+
+    Assert(standbyState >= STANDBY_UNINITIALIZED);
+
+    /*
+     * Remove stale transactions, if any.
+     */
+    ExpireOldKnownAssignedTransactionIds(running->oldestRunningXid);
+    RelationReleaseOldRecoveryLocks(running->oldestRunningXid);
+
+    /*
+     * If our snapshot is already valid, nothing else to do...
+     */
+    if (standbyState == STANDBY_READY)
+        return;
+
+    /*
+     * If our initial RunningXactData had an overflowed snapshot then we
+     * knew we were missing some subxids from our snapshot. We can use
+     * this data as an initial snapshot, but we cannot yet mark it valid.
+     * We know that the missing subxids are equal to or earlier than
+     * nextXid. After we initialise we continue to apply changes during
+     * recovery, so once the oldestRunningXid is later than the nextXid
+     * from the initial snapshot we know that we no longer have missing
+     * information and can mark the snapshot as valid.
+     */
+    if (standbyState == STANDBY_SNAPSHOT_PENDING)
+    {
+        if (TransactionIdPrecedes(standbySnapshotPendingXmin,
+                                  running->oldestRunningXid))
+        {
+            standbyState = STANDBY_READY;
+            elog(trace_recovery(DEBUG2),
+                    "running xact data now proven complete");
+            elog(trace_recovery(DEBUG2),
+                    "recovery snapshots are now enabled");
+        }
+        return;
+    }
+
+    /*
+     * Can't initialise with an incomplete set of lock information.
+     * XXX: Can't we go into pending state like with overflowed subxids?
+     */
+    if (running->lock_overflow)
+    {
+        elog(trace_recovery(DEBUG2),
+                "running xact data has incomplete lock data");
+        return;
+    }
+
+    /*
+     * OK, we need to initialise from the RunningXactData record
+     */
+    latestObservedXid = running->nextXid;
+    TransactionIdRetreat(latestObservedXid);
+
+    /*
+     * If the snapshot overflowed, then we still initialise with what we
+     * know, but the recovery snapshot isn't fully valid yet because we
+     * know there are some subxids missing (ergo we don't know which ones)
+     */
+    if (!running->subxid_overflow)
+        standbyState = STANDBY_READY;
+    else
+    {
+        standbyState = STANDBY_SNAPSHOT_PENDING;
+        standbySnapshotPendingXmin = latestObservedXid;
+        ereport(LOG,
+                (errmsg("consistent state delayed because recovery snapshot incomplete")));
+    }
+
+    nxids = running->xcnt;
+    xids = running->xids;
+
+    KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
+
+    /*
+     * Scan through the incoming array of RunningXacts and collect xids.
+     * We don't use SubtransSetParent because it doesn't matter yet. If
+     * we aren't overflowed then all xids will fit in snapshot and so we
+     * don't need subtrans. If we later overflow, an xid assignment record
+     * will add xids to subtrans. If RunningXacts is overflowed then we
+     * don't have enough information to correctly update subtrans anyway.
+     */
+    oldestXid = latestObservedXid;
+
+    for (xid_index = 0; xid_index < nxids; xid_index++)
+    {
+        if (TransactionIdPrecedes(xids[xid_index], oldestXid))
+            oldestXid = xids[xid_index];
+    }
+
+    /*
+     * Nobody else is running yet, but take locks anyhow XXX: read-only xacts are running alright
+     */
+    LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+
+    /* Reset latestCompletedXid */
+    ShmemVariableCache->latestCompletedXid = running->nextXid;
+    TransactionIdRetreat(ShmemVariableCache->latestCompletedXid);
+
+    /*
+     * Add our new xids into the array
+     */
+    KnownAssignedXidsAdd(xids, nxids);
+
+    KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
+
+    /*
+     * Update lastOverflowedXid if the snapshot had overflown. We don't know
+     * the exact value for this, so conservatively assume that it's nextXid-1
+     */
+    if (running->subxid_overflow &&
+        TransactionIdFollows(latestObservedXid, procArray->lastOverflowedXid))
+        procArray->lastOverflowedXid = latestObservedXid;
+
+    LWLockRelease(ProcArrayLock);
+
+    /*
+     * Acquire any AccessExclusiveLocks held by running transactions
+     */
+    for (i = 0; i < running->numLocks; i++)
+    {
+        xl_rel_lock *xlrec = &running->loggableLocks[i];
+        RelationAddRecoveryLock(xlrec->xid, xlrec->dbOid, xlrec->relOid);
+    }
+
+    /* nextXid must be beyond any observed xid */
+    if (TransactionIdFollows(running->nextXid, ShmemVariableCache->nextXid))
+        ShmemVariableCache->nextXid = running->nextXid;
+
+    /* Startup commit log and related stuff */
+    StartupCLOG();
+    StartupSUBTRANS(oldestXid);
+    StartupMultiXact();
+
+    elog(trace_recovery(DEBUG2),
+        "running transaction data initialized");
+    if (standbyState == STANDBY_READY)
+        elog(trace_recovery(DEBUG2),
+            "recovery snapshots are now enabled");
+}
+
+void
+ProcArrayApplyXidAssignment(TransactionId topxid,
+                            int nsubxids, TransactionId *subxids)
+{
+    TransactionId max_xid;
+    int        i;
+
+    if (standbyState < STANDBY_SNAPSHOT_PENDING)
+        return;
+
+    max_xid = TransactionIdLatest(topxid, nsubxids, subxids);
+
+    /*
+     * Mark all the subtransactions as observed.
+     *
+     * NOTE: This will fail if the subxid contains too many previously
+     * unobserved xids to fit into known-assigned-xids. That shouldn't happen
+     * as the code stands, because xid-assignment records should never contain
+     * more than PGPROC_MAX_CACHED_SUBXIDS entries.
+     */
+    RecordKnownAssignedTransactionIds(max_xid);
+
+    /*
+     * Notice that we update pg_subtrans with the top-level xid, rather
+     * than the parent xid. This is a difference between normal
+     * processing and recovery, yet is still correct in all cases. The
+     * reason is that subtransaction commit is not marked in clog until
+     * commit processing, so all aborted subtransactions have already been
+     * clearly marked in clog. As a result we are able to refer directly
+     * to the top-level transaction's state rather than skipping through
+     * all the intermediate states in the subtransaction tree.
+     */
+    for (i = 0; i < nsubxids; i++)
+        SubTransSetParent(subxids[i], topxid);
+
+    /*
+     * Uses same locking as transaction commit
+     */
+    LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+
+    /*
+     * Remove from known-assigned-xacts.
+     */
+    for (i = 0; i < nsubxids; i++)
+        KnownAssignedXidsRemove(subxids[i]);
+
+    /*
+     * Advance lastOverflowedXid when required.
+     */
+    if (TransactionIdPrecedes(procArray->lastOverflowedXid, max_xid))
+        procArray->lastOverflowedXid = max_xid;
+
+    LWLockRelease(ProcArrayLock);
+}

 /*
  * TransactionIdIsInProgress -- is given transaction running in some backend
@@ -384,8 +678,15 @@ TransactionIdIsInProgress(TransactionId xid)
      */
     if (xids == NULL)
     {
-        xids = (TransactionId *)
-            malloc(arrayP->maxProcs * sizeof(TransactionId));
+        /*
+         * In hot standby mode, reserve enough space to hold all xids in
+         * the known-assigned list. If we later finish recovery, we no longer
+         * need the bigger array, but we don't bother to shrink it.
+         */
+        int    maxxids = RecoveryInProgress() ?
+            arrayP->maxProcs : arrayP->maxKnownAssignedXids;
+
+        xids = (TransactionId *) malloc(maxxids * sizeof(TransactionId));
         if (xids == NULL)
             ereport(ERROR,
                     (errcode(ERRCODE_OUT_OF_MEMORY),
@@ -465,6 +766,30 @@ TransactionIdIsInProgress(TransactionId xid)
             xids[nxids++] = pxid;
     }

+    /* In hot standby mode, check known-assigned-xids list as well */
+    if (RecoveryInProgress())
+    {
+        /* none of the PGPROC entries should have XIDs in hot standby mode */
+        Assert(nxids == 0);
+
+        if (IsXidKnownAssigned(xid))
+        {
+            LWLockRelease(ProcArrayLock);
+            /* XXX: should we have a separate counter for this? */
+            /* xc_by_main_xid_inc(); */
+            return true;
+        }
+
+        /*
+         * If the known-assigned-xids list overflowed, we have to check
+         * pg_subtrans too. Copy the known-assigned list.
+         */
+        if (TransactionIdPrecedesOrEquals(xid, procArray->lastOverflowedXid))
+            nxids = KnownAssignedXidsGet(xids,
+                                         InvalidTransactionId,
+                                         InvalidTransactionId);
+    }
+
     LWLockRelease(ProcArrayLock);

     /*
@@ -590,6 +915,9 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
     TransactionId result;
     int            index;

+    /* Cannot look for individual databases during recovery */
+    Assert(allDbs || !RecoveryInProgress());
+
     LWLockAcquire(ProcArrayLock, LW_SHARED);

     /*
@@ -656,7 +984,7 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
  * but since PGPROC has only a limited cache area for subxact XIDs, full
  * information may not be available.  If we find any overflowed subxid arrays,
  * we have to mark the snapshot's subxid data as overflowed, and extra work
- * will need to be done to determine what's running (see XidInMVCCSnapshot()
+ * *may* need to be done to determine what's running (see XidInMVCCSnapshot()
  * in tqual.c).
  *
  * We also update the following backend-global variables:
@@ -698,7 +1026,8 @@ GetSnapshotData(Snapshot snapshot)
     if (snapshot->xip == NULL)
     {
         /*
-         * First call for this snapshot
+         * First call for this snapshot. Snapshot is same size whether
+         * or not we are in recovery, see later comments.
          */
         snapshot->xip = (TransactionId *)
             malloc(arrayP->maxProcs * sizeof(TransactionId));
@@ -715,6 +1044,8 @@ GetSnapshotData(Snapshot snapshot)
                      errmsg("out of memory")));
     }

+    snapshot->takenDuringRecovery = RecoveryInProgress();
+
     /*
      * It is sufficient to get shared lock on ProcArrayLock, even if we are
      * going to set MyProc->xmin.
@@ -763,6 +1094,7 @@ GetSnapshotData(Snapshot snapshot)
          */
         if (TransactionIdIsNormal(xid))
         {
+            Assert(!snapshot->takenDuringRecovery);
             if (TransactionIdFollowsOrEquals(xid, xmax))
                 continue;
             if (proc != MyProc)
@@ -795,6 +1127,7 @@ GetSnapshotData(Snapshot snapshot)

                 if (nxids > 0)
                 {
+                    Assert(!snapshot->takenDuringRecovery);
                     memcpy(snapshot->subxip + subcount,
                            (void *) proc->subxids.xids,
                            nxids * sizeof(TransactionId));
@@ -804,6 +1137,47 @@ GetSnapshotData(Snapshot snapshot)
         }
     }

+    /*
+     * If in recovery get any known assigned xids.
+     */
+    if (snapshot->takenDuringRecovery)
+    {
+        bool overflow = false;
+
+        Assert(count == 0);
+
+        /*
+         * We store all xids directly into subxip[]. Here's why:
+         *
+         * In recovery we don't know which xids are top-level and which are
+         * subxacts, a design choice that greatly simplifies xid processing.
+         *
+         * It seems like we would want to try to put xids into xip[] only,
+         * but that is fairly small. We would either need to make that bigger
+         * or to increase the rate at which we WAL-log xid assignment;
+         * neither is an appealing choice.
+         *
+         * We could try to store xids into xip[] first and then into subxip[]
+         * if there are too many xids. That only works if the snapshot doesn't
+         * overflow because we do not search subxip[] in that case. A simpler
+         * way is to just store all xids in the subxact array because this
+         * is by far the bigger array. We just leave the xip array empty.
+         *
+         * Either way we need to change the way XidInMVCCSnapshot() works
+         * depending upon when the snapshot was taken, or change normal
+         * snapshot processing so it matches.
+         */
+        subcount = GetKnownAssignedTransactions(snapshot->subxip,
+                                                &xmin, xmax, &overflow);
+
+        /*
+         * See if we have removed any subxids from KnownAssignedXids that
+         * we might need to see. If so, mark snapshot overflowed.
+         */
+        if (overflow)
+            subcount = -1;    /* overflowed */
+    }
+
     if (!TransactionIdIsValid(MyProc->xmin))
         MyProc->xmin = TransactionXmin = xmin;

@@ -840,6 +1214,175 @@ GetSnapshotData(Snapshot snapshot)
 }

 /*
+ * GetRunningTransactionData -- returns information about running transactions.
+ *
+ * Similar to GetSnapshotData but returning more information. We include
+ * all PGPROCs with an assigned TransactionId, even VACUUM processes.
+ *
+ * This is never executed during recovery so there is no need to look at
+ * KnownAssignedXids.
+ *
+ * We don't worry about updating other counters, we want to keep this as
+ * simple as possible and leave GetSnapshotData() as the primary code for
+ * that bookkeeping.
+ */
+void
+GetRunningTransactionData(void)
+{
+    ProcArrayStruct *arrayP = procArray;
+    RunningTransactions CurrentRunningXacts = (RunningTransactions) &CurrentRunningXactsData;
+    TransactionId latestCompletedXid;
+    TransactionId oldestRunningXid = InvalidTransactionId;
+    TransactionId *xids;
+    int            index;
+    int            count;
+    int            subcount;
+    int            topcount;
+    bool        suboverflowed;
+    XLogRecPtr    recptr;
+
+    /*
+     * Allocating space for maxProcs xids is usually overkill; numProcs would
+     * be sufficient.  But it seems better to do the malloc while not holding
+     * the lock, so we can't look at numProcs.  Likewise, we allocate much
+     * more subxip storage than is probably needed.
+     *
+     * Should only be allocated for bgwriter, since only ever executed
+     * during checkpoints.
+     */
+    if (CurrentRunningXacts->xids == NULL)
+    {
+        /*
+         * First call
+         */
+        CurrentRunningXacts->xids = (TransactionId *)
+            malloc(arrayP->maxProcs * (1 + PGPROC_MAX_CACHED_SUBXIDS) *
+                   sizeof(TransactionId));
+        if (CurrentRunningXacts->xids == NULL)
+            ereport(ERROR,
+                    (errcode(ERRCODE_OUT_OF_MEMORY),
+                     errmsg("out of memory")));
+
+        Assert(CurrentRunningXacts->loggableLocks == NULL);
+#define REASONABLE_MAX_NUM_LOCKS    (2 * arrayP->maxProcs)
+        CurrentRunningXacts->loggableLocks = (xl_rel_lock *)
+            malloc(REASONABLE_MAX_NUM_LOCKS * sizeof(xl_rel_lock));
+        if (CurrentRunningXacts->loggableLocks == NULL)
+            ereport(ERROR,
+                    (errcode(ERRCODE_OUT_OF_MEMORY),
+                     errmsg("out of memory")));
+    }
+
+    xids = CurrentRunningXacts->xids;
+
+    count = subcount = topcount = 0;
+    suboverflowed = false;
+
+    /*
+     * Ensure that no new WAL-loggable locks can be taken, nor
+     * can xids enter or leave the procarray while we obtain snapshot.
+     *
+     * XXX: Isn't SHARED enough here?
+     */
+    LWLockAcquire(RecoveryInfoLock, LW_EXCLUSIVE);
+    LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+    LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
+
+    latestCompletedXid = ShmemVariableCache->latestCompletedXid;
+
+    /* XXX: this comment makes no sense
+     * Spin over procArray checking xid, and subxids. Shared lock is enough
+     * because new transactions don't use locks at all, so LW_EXCLUSIVE
+     * wouldn't be enough to prevent them, so don't bother.
+     */
+    for (index = 0; index < arrayP->numProcs; index++)
+    {
+        volatile PGPROC *proc = arrayP->procs[index];
+        TransactionId xid;
+        int            nxids;
+
+        /* Fetch xid just once - see GetNewTransactionId */
+        xid = proc->xid;
+
+        /*
+         * We store all xids, even XIDs >= xmax and our own XID, if any.
+         * But we don't store transactions that don't have a TransactionId
+         * yet because they will not show as running on a standby server.
+         */
+        if (!TransactionIdIsValid(xid))
+            continue;
+
+        xids[count++] = xid;
+        topcount++;
+
+        if (!TransactionIdIsValid(oldestRunningXid) ||
+            TransactionIdPrecedes(xid, oldestRunningXid))
+            oldestRunningXid = xid;
+
+        /*
+         * Save subtransaction XIDs. Other backends can't add or remove entries
+         * while we're holding XidGenLock.
+         */
+        nxids = proc->subxids.nxids;
+        if (nxids > 0)
+        {
+            memcpy(&xids[count], (void *) proc->subxids.xids,
+                   nxids * sizeof(TransactionId));
+            count += nxids;
+            subcount += nxids;
+
+            if (proc->subxids.overflowed)
+                suboverflowed = true;
+        }
+    }
+
+    CurrentRunningXacts->xcnt = count;
+    CurrentRunningXacts->subxid_overflow = suboverflowed;
+    CurrentRunningXacts->nextXid = ShmemVariableCache->nextXid;
+    CurrentRunningXacts->oldestRunningXid = oldestRunningXid;
+
+    /*
+     * If we have some loggable locks then go and get their details.
+     */
+    CurrentRunningXacts->numLocks =
+        GetRunningTransactionLocks(CurrentRunningXacts->loggableLocks,
+                                   REASONABLE_MAX_NUM_LOCKS);
+
+    /*
+     * Mark snapshot invalid if the information is either incomplete or would
+     * be too large to reasonably cope with on standby. We log it anyway, so
+     * the the standby can establish a valid recovery after all currently
+     * running transactions have finished.
+     */
+    if (CurrentRunningXacts->numLocks >= REASONABLE_MAX_NUM_LOCKS)
+    {
+        CurrentRunningXacts->numLocks = 0;
+        CurrentRunningXacts->lock_overflow = true;
+    }
+    else
+        CurrentRunningXacts->lock_overflow = false;
+
+    recptr = LogCurrentRunningXacts(CurrentRunningXacts);
+
+    LWLockRelease(XidGenLock);
+    LWLockRelease(ProcArrayLock);
+    LWLockRelease(RecoveryInfoLock);
+
+    if (CurrentRunningXacts->subxid_overflow ||
+        CurrentRunningXacts->lock_overflow)
+        ereport(trace_recovery(DEBUG2),
+                (errmsg("snapshot of %u running transactions overflowed (lsn %X/%X)",
+                        topcount,
+                        recptr.xlogid, recptr.xrecoff)));
+    else
+        ereport(trace_recovery(DEBUG2),
+                (errmsg("snapshot of %u running transactions with %u subtransactions and %u locks (lsn %X/%X)",
+                        topcount, subcount,
+                        CurrentRunningXacts->numLocks,
+                        recptr.xlogid, recptr.xrecoff)));
+}
+
+/*
  * GetTransactionsInCommit -- Get the XIDs of transactions that are committing
  *
  * Constructs an array of XIDs of transactions that are currently in commit
@@ -1101,6 +1644,135 @@ GetCurrentVirtualXIDs(TransactionId limitXmin, bool excludeXmin0,
     return vxids;
 }

+/*
+ * GetConflictingVirtualXIDs -- returns an array of currently active VXIDs.
+ *
+ * The array is palloc'd and is terminated with an invalid VXID.
+ *
+ * If limitXmin is not InvalidTransactionId, we skip any backends
+ * with xmin > limitXmin.    If dbOid is valid we skip backends attached
+ * to other databases. Some callers choose to skipExistingConflicts.
+ *
+ * Be careful to *not* pfree the result from this function. We reuse
+ * this array sufficiently often that we use malloc for the result.
+ */
+VirtualTransactionId *
+GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid,
+                          bool skipExistingConflicts)
+{
+    static VirtualTransactionId *vxids;
+    ProcArrayStruct *arrayP = procArray;
+    int            count = 0;
+    int            index;
+
+    /*
+     * If not first time through, get workspace to remember main XIDs in. We
+     * malloc it permanently to avoid repeated palloc/pfree overhead.
+     * Allow result space, remembering room for a terminator.
+     */
+    if (vxids == NULL)
+    {
+        vxids = (VirtualTransactionId *)
+            malloc(sizeof(VirtualTransactionId) * (arrayP->maxProcs + 1));
+        if (vxids == NULL)
+            ereport(ERROR,
+                    (errcode(ERRCODE_OUT_OF_MEMORY),
+                     errmsg("out of memory")));
+    }
+
+    LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+
+    for (index = 0; index < arrayP->numProcs; index++)
+    {
+        volatile PGPROC *proc = arrayP->procs[index];
+
+        /* Exclude prepared transactions */
+        if (proc->pid == 0)
+            continue;
+
+        if (skipExistingConflicts && proc->recoveryConflictMode > 0)
+            continue;
+
+        if (!OidIsValid(dbOid) ||
+            proc->databaseId == dbOid)
+        {
+            /* Fetch xmin just once - can't change on us, but good coding */
+            TransactionId pxmin = proc->xmin;
+
+            /*
+             * If limitXmin is set we explicitly choose to ignore an invalid
+             * pxmin because this means that backend has no snapshot and
+             * cannot get another one while we hold exclusive lock.
+             */
+            if (!TransactionIdIsValid(limitXmin) ||
+                (!TransactionIdFollows(pxmin, limitXmin) && TransactionIdIsValid(pxmin)))
+            {
+                VirtualTransactionId vxid;
+
+                GET_VXID_FROM_PGPROC(vxid, *proc);
+                if (VirtualTransactionIdIsValid(vxid))
+                    vxids[count++] = vxid;
+            }
+        }
+    }
+
+    LWLockRelease(ProcArrayLock);
+
+    /* add the terminator */
+    vxids[count].backendId = InvalidBackendId;
+    vxids[count].localTransactionId = InvalidLocalTransactionId;
+
+    return vxids;
+}
+
+/*
+ * CancelVirtualTransaction - used in recovery conflict processing
+ *
+ * Returns pid of the process signaled, or 0 if not found.
+ */
+pid_t
+CancelVirtualTransaction(VirtualTransactionId vxid, int cancel_mode)
+{
+    ProcArrayStruct *arrayP = procArray;
+    int            index;
+    pid_t        pid = 0;
+
+    LWLockAcquire(ProcArrayLock, LW_SHARED);
+
+    for (index = 0; index < arrayP->numProcs; index++)
+    {
+        VirtualTransactionId procvxid;
+        PGPROC       *proc = arrayP->procs[index];
+
+        GET_VXID_FROM_PGPROC(procvxid, *proc);
+
+        if (procvxid.backendId == vxid.backendId &&
+            procvxid.localTransactionId == vxid.localTransactionId)
+        {
+            /*
+             * Issue orders for the proc to read next time it receives SIGINT
+             */
+            if (proc->recoveryConflictMode < cancel_mode)
+                proc->recoveryConflictMode = cancel_mode;
+
+            pid = proc->pid;
+            break;
+        }
+    }
+
+    LWLockRelease(ProcArrayLock);
+
+    if (pid != 0)
+    {
+        /*
+         * Kill the pid if it's still here. If not, that's what we wanted
+         * so ignore any errors.
+         */
+        kill(pid, SIGINT);
+    }
+
+    return pid;
+}

 /*
  * CountActiveBackends --- count backends (other than myself) that are in
@@ -1240,6 +1912,14 @@ CountOtherDBBackends(Oid databaseId, int *nbackends, int *nprepared)
     int            autovac_pids[MAXAUTOVACPIDS];
     int            tries;

+    /* Gives wrong answer in recovery, so make sure we don't use it */
+    /* XXX: Whadda you mean wrong answer? Seems fine to me, except that it
+     * doesn't count transactions running in the master and emulated in
+     * KnownAssignedXids. But to e.g wait for read-only backends in a database
+     * to exit before replaying DROP DATABSE, this seems valid.
+     */
+    Assert(!RecoveryInProgress());
+
     /* 50 tries with 100ms sleep between tries makes 5 sec total wait */
     for (tries = 0; tries < 50; tries++)
     {
@@ -1400,3 +2080,461 @@ DisplayXidCache(void)
 }

 #endif   /* XIDCACHE_DEBUG */
+
+/* ----------------------------------------------
+ *         KnownAssignedTransactions sub-module
+ * ----------------------------------------------
+ */
+
+/*
+ * In Hot Standby mode, we maintain a list of transactions that are (or were)
+ * running in the master at the current point in WAL.
+ *
+ * RecordKnownAssignedTransactionIds() should be run for *every* WAL record
+ * type apart from XLOG_XACT_RUNNING_XACTS, since that initialises the first
+ * snapshot so that RecordKnownAssignedTransactionIds() can be callsed. Uses
+ * local variables, so should only be called by Startup process.
+ *
+ * We record all xids that we know have been assigned. That includes
+ * all the xids on the WAL record, plus all unobserved xids that
+ * we can deduce have been assigned. We can deduce the existence of
+ * unobserved xids because we know xids are in sequence, with no gaps.
+ *
+ * During recovery we do not fret too much about the distinction between
+ * top-level xids and subtransaction xids. We hold both together in
+ * a hash table called KnownAssignedXids. In backends, this is copied into
+ * snapshots in GetSnapshotData(), taking advantage
+ * of the fact that XidInMVCCSnapshot() doesn't care about the distinction
+ * either. Subtransaction xids are effectively treated as top-level xids
+ * and in the typical case pg_subtrans is *not* maintained (and that
+ * does not effect visibility).
+ *
+ * KnownAssignedXids expands as new xids are observed or inferred, and
+ * contracts when transaction completion records arrive. We have room in a
+ * snapshot to hold maxProcs * (1 + PGPROC_MAX_CACHED_SUBXIDS) xids, so
+ * every transaction must report their subtransaction xids in a special
+ * WAL assignment record every PGPROC_MAX_CACHED_SUBXIDS. This allows us
+ * to remove the subtransaction xids and update pg_subtrans instead. Snapshots
+ * are still correct yet we don't overflow SnapshotData structure. When we do
+ * this we need
+ * to keep track of which xids caused the snapshot to overflow. We do that
+ * by simply tracking the lastOverflowedXid - if it is within the bounds of
+ * the KnownAssignedXids then we know the snapshot overflowed. (Note that
+ * subxid overflow occurs on primary when 65th subxid arrives, whereas on
+ * standby it occurs when 64th subxid arrives - that is not an error).
+ *
+ * Should FATAL errors result in a backend on primary disappearing before
+ * it can write an abort record then we just leave those xids in
+ * KnownAssignedXids. They actually aborted but we think they were running;
+ * the distinction is irrelevant because either way any changes done by the
+ * transaction are not visible to backends in the standby.
+ * We prune KnownAssignedXids when XLOG_XACT_RUNNING_XACTS arrives, to
+ * ensure we do not overflow.
+ *
+ * If we are in STANDBY_SNAPSHOT_PENDING state, then we may try to remove
+ * xids that are not present.
+ */
+void
+RecordKnownAssignedTransactionIds(TransactionId xid)
+{
+    /*
+     * Skip processing if the current snapshot is not initialized.
+     */
+    if (standbyState < STANDBY_SNAPSHOT_PENDING)
+        return;
+
+    ereport(trace_recovery(DEBUG4),
+                (errmsg("record known xact %u latestObservedXid %u",
+                            xid, latestObservedXid)));
+
+    /*
+     * Check for risk of transaction wraparound. As new xids arrive
+     * on the standby it is eventually possible for a long lived query
+     * to find that the snapshot xmin is older than one xid epoch, which
+     * would make newly arrived data suddenly appear as if it were very old.
+     * We must cancel queries before we wraparound. We do conflict
+     * processing here to cover that possibility, though note that
+     * we may also perform conflict processing again for a different
+     * reason specific to the type of WAL record, covered in the rmgrs.
+     */
+    if (TransactionIdFollowsOrEquals(xid, nextWraparoundCheckXid))
+    {
+        TransactionId xidWrapLimit;
+        TransactionId xidStopLimit;
+        VirtualTransactionId *old_queries;
+
+        /*
+         * Only need to check occasionally. nextWraparoundCheckXid is
+         * initialised on first use, since it starts at InvalidTransactionId
+         */
+        nextWraparoundCheckXid = xid + 65536;
+        if (nextWraparoundCheckXid < FirstNormalTransactionId)
+            nextWraparoundCheckXid += FirstNormalTransactionId;
+
+        /*
+         * The place where we actually get into deep trouble is halfway around
+         * from the oldest xmin.  (This calculation is
+         * probably off by one or two counts, because the special XIDs reduce the
+         * size of the loop a little bit.  But we throw in plenty of slop below,
+         * so it doesn't matter.)
+         */
+        xidWrapLimit = xid + (MaxTransactionId >> 1);
+        if (xidWrapLimit < FirstNormalTransactionId)
+            xidWrapLimit += FirstNormalTransactionId;
+
+        /*
+         * We'll refuse to allow queries to execute once we get
+         * within 1M transactions of data loss.  This leaves lots of room for the
+         * DBA to fool around fixing things in a standalone backend, while not
+         * being significant compared to total XID space. (Note that since
+         * vacuuming requires one transaction per table cleaned, we had better be
+         * sure there's lots of XIDs left...)
+         */
+        xidStopLimit = xidWrapLimit - 1000000;
+        if (xidStopLimit < FirstNormalTransactionId)
+            xidStopLimit -= FirstNormalTransactionId;
+
+        old_queries = GetConflictingVirtualXIDs(xid, InvalidOid, false);
+        ResolveRecoveryConflictWithVirtualXIDs(old_queries,
+                                                "xid anti-wraparound check",
+                                                CONFLICT_MODE_ERROR,
+                                                InvalidXLogRecPtr);
+    }
+
+    /*
+     * When a newly observed xid arrives, it is frequently the case
+     * that it is *not* the next xid in sequence. When this occurs, we
+     * must treat the intervening xids as running also.
+     */
+    if (TransactionIdFollows(xid, latestObservedXid))
+    {
+        TransactionId    next_expected_xid = latestObservedXid;
+        TransactionIdAdvance(next_expected_xid);
+
+        /*
+         * Locking requirement is currently higher than for xid assignment
+         * in normal running. However, we only get called here for new
+         * high xids - so on a multi-processor where it is common that xids
+         * arrive out of order the average number of locks per assignment
+         * will actually reduce. So not too worried about this locking.
+         *
+         * XXX It does seem possible that we could add a whole range
+         * of numbers atomically to KnownAssignedXids, if we use a sorted
+         * list for KnownAssignedXids. But that design also increases the
+         * length of time we hold lock when we process commits/aborts, so
+         * on balance don't worry about this.
+         */
+        LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+
+        while (TransactionIdPrecedesOrEquals(next_expected_xid, xid))
+        {
+            if (TransactionIdPrecedes(next_expected_xid, xid))
+                ereport(trace_recovery(DEBUG4),
+                        (errmsg("recording unobserved xid %u (latestObservedXid %u)",
+                                    next_expected_xid, latestObservedXid)));
+            KnownAssignedXidsAdd(&next_expected_xid, 1);
+
+            /*
+             * Extend clog and subtrans like we do in GetNewTransactionId()
+             * during normal operation
+             */
+            ExtendCLOG(next_expected_xid);
+            ExtendSUBTRANS(next_expected_xid);
+
+            TransactionIdAdvance(next_expected_xid);
+        }
+
+        LWLockRelease(ProcArrayLock);
+
+        latestObservedXid = xid;
+    }
+
+    /* nextXid must be beyond any observed xid */
+    if (TransactionIdFollowsOrEquals(latestObservedXid,
+                                     ShmemVariableCache->nextXid))
+    {
+        ShmemVariableCache->nextXid = latestObservedXid;
+        TransactionIdAdvance(ShmemVariableCache->nextXid);
+    }
+}
+
+void
+ExpireTreeKnownAssignedTransactionIds(TransactionId xid, int nsubxids,
+                                      TransactionId *subxids)
+{
+    int            i;
+
+    if (standbyState < STANDBY_SNAPSHOT_PENDING)
+        return;
+
+    /*
+     * Uses same locking as transaction commit
+     */
+    LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+
+    if (TransactionIdIsValid(xid))
+        KnownAssignedXidsRemove(xid);
+    for (i = 0; i < nsubxids; i++)
+        KnownAssignedXidsRemove(subxids[i]);
+
+    LWLockRelease(ProcArrayLock);
+}
+
+void
+ExpireAllKnownAssignedTransactionIds(void)
+{
+    LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+    KnownAssignedXidsRemoveMany(InvalidTransactionId);
+    LWLockRelease(ProcArrayLock);
+}
+
+void
+ExpireOldKnownAssignedTransactionIds(TransactionId xid)
+{
+    LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+    KnownAssignedXidsRemoveMany(xid);
+    LWLockRelease(ProcArrayLock);
+}
+
+/*
+ * Get an array of KnownAssignedXids, not necessarily in sorted order.
+ *
+ * Called with ProcArrayLock already held in SHARED mode
+ */
+int
+GetKnownAssignedTransactions(TransactionId *xarray, TransactionId *xmin,
+                             TransactionId xmax, bool *overflow)
+{
+    int count = KnownAssignedXidsGet(xarray, xmin, xmax);
+
+    if (TransactionIdPrecedes(*xmin, procArray->lastOverflowedXid))
+        *overflow = true;
+
+    return count;
+}
+
+/*
+ * Private module functions to manipulate KnownAssignedXids
+ *
+ * There are 3 main users of the KnownAssignedXids data structure:
+ *
+ *   * backends taking snapshots
+ *   * startup process adding new knownassigned xids
+ *   * startup process removing xids as transactions end
+ *
+ * If we make KnownAssignedXids a simple sorted array then the first two
+ * operations are fast, but the last one is at least O(N). If we make
+ * KnownAssignedXids a hash table then the last two operations are fast,
+ * though we have to do more work at snapshot time. Doing more work at
+ * commit could slow down taking snapshots anyway because of lwlock
+ * contention. Scanning the hash table is O(N) on the max size of the array,
+ * so performs poorly in comparison when we have very low numbers of
+ * write transactions to process. But at least it is constant overhead
+ * and a sequential memory scan will utilise hardware memory readahead
+ * to give much improved performance. In any case the emphasis must be on
+ * having the standby process changes quickly so that it can provide
+ * high availability. So we choose to implement as a hash table.
+ */
+
+static Size
+KnownAssignedXidsShmemSize(int size)
+{
+    return hash_estimate_size(size, sizeof(TransactionId));
+}
+
+static void
+KnownAssignedXidsInit(int size)
+{
+    HASHCTL        info;
+
+    /* assume no locking is needed yet */
+
+    info.keysize = sizeof(TransactionId);
+    info.entrysize = sizeof(TransactionId);
+    info.hash = tag_hash;
+
+    KnownAssignedXidsHash = ShmemInitHash("KnownAssignedXids Hash",
+                                  size, size,
+                                  &info,
+                                  HASH_ELEM | HASH_FUNCTION);
+
+    if (!KnownAssignedXidsHash)
+        elog(FATAL, "could not initialize known assigned xids hash table");
+}
+
+/*
+ * Add xids into KnownAssignedXids.
+ *
+ * Must be called while holding ProcArrayLock in Exclusive mode
+ */
+static void
+KnownAssignedXidsAdd(TransactionId *xids, int nxids)
+{
+    bool found;
+    int i;
+
+    /*
+     * XXX: We should check that we don't exceed maxKnownAssignedXids.
+     * Even though the hash table might hold a few more entries than that,
+     * we use fixed-size arrays of that size elsewhere and expected all
+     * entries in the hash table to fit.
+     */
+    for (i = 0; i < nxids; i++)
+    {
+        Assert(TransactionIdIsValid(xids[i]));
+
+        elog(trace_recovery(DEBUG4), "adding KnownAssignedXid %u", xids[i]);
+
+        (void) hash_search(KnownAssignedXidsHash, &xids[i], HASH_ENTER,
+                           &found);
+        if (found)
+        {
+            KnownAssignedXidsDisplay(LOG);
+            elog(ERROR, "adding duplicate KnownAssignedXid %u", xids[i]);
+        }
+    }
+}
+
+/*
+ * Is an xid present in KnownAssignedXids?
+ *
+ * Must be called while holding ProcArrayLock in shared mode
+ */
+static bool
+IsXidKnownAssigned(TransactionId xid)
+{
+    bool found;
+    (void) hash_search(KnownAssignedXidsHash, &xid, HASH_FIND, &found);
+    return found;
+}
+
+/*
+ * Remove one xid from anywhere in KnownAssignedXids.
+ *
+ * Must be called while holding ProcArrayLock in Exclusive mode
+ */
+static void
+KnownAssignedXidsRemove(TransactionId xid)
+{
+    bool found;
+
+    Assert(TransactionIdIsValid(xid));
+
+    elog(trace_recovery(DEBUG4), "remove KnownAssignedXid %u", xid);
+
+    (void) hash_search(KnownAssignedXidsHash, &xid, HASH_REMOVE, &found);
+
+    if (!found && TransactionIdFollows(xid, procArray->lastOverflowedXid))
+    {
+        /*
+         * This shouldn't happen, so something has gone wrong in our
+         * transaction tracking. However, the damage has been done already,
+         * and now that the transaction has finished, the anomaly should be
+         * over. There's not much point in ERRORing out anymore, so let's
+         * just LOG the situation and move on.
+         */
+        /* XXX: This can still happen: If a transaction with a subtransaction
+         * that haven't been reported yet aborts, and no WAL records have been
+         * written using the subxid, the abort record will contain that subxid
+         * and we haven't seen it before.
+         */
+        KnownAssignedXidsDisplay(LOG);
+        elog(LOG, "cannot remove KnownAssignedXid %u", xid);
+    }
+}
+
+/*
+ * Get an array of xids by scanning KnownAssignedXids.
+ *
+ * Must be called while holding ProcArrayLock (in shared mode)
+ */
+static int
+KnownAssignedXidsGet(TransactionId *xarray, TransactionId *xmin,
+                     TransactionId xmax)
+{
+    HASH_SEQ_STATUS status;
+    TransactionId *knownXid;
+    int            count = 0;
+
+    hash_seq_init(&status, KnownAssignedXidsHash);
+    while ((knownXid = (TransactionId *) hash_seq_search(&status)) != NULL)
+    {
+        *xarray = *knownXid;
+        xarray++;
+        count++;
+
+        /* update xmin if required */
+        if (TransactionIdPrecedes(*knownXid, *xmin))
+            *xmin = *knownXid;
+
+        /* XXX we don't filter on xmax, as is done in normal running. */
+    }
+
+    return count;
+}
+
+/*
+ * Prune KnownAssignedXids up to, but *not* including xid. If xid is invalid
+ * then clear the whole table.
+ *
+ * Must be called while holding ProcArrayLock in Exclusive mode.
+ */
+static void
+KnownAssignedXidsRemoveMany(TransactionId xid)
+{
+    TransactionId    *knownXid;
+    HASH_SEQ_STATUS status;
+
+    if (TransactionIdIsValid(xid))
+        elog(trace_recovery(DEBUG4), "prune KnownAssignedXids to %u", xid);
+    else
+        elog(trace_recovery(DEBUG4), "removing all KnownAssignedXids");
+
+    hash_seq_init(&status, KnownAssignedXidsHash);
+    while ((knownXid = (TransactionId *) hash_seq_search(&status)) != NULL)
+    {
+        TransactionId removeXid = *knownXid;
+        bool found;
+
+        if (!TransactionIdIsValid(xid) || TransactionIdPrecedes(removeXid, xid))
+        {
+            (void) hash_search(KnownAssignedXidsHash, &removeXid,
+                               HASH_REMOVE, &found);
+            Assert(found);
+        }
+    }
+}
+
+/*
+ * Display KnownAssignedXids to provide debug trail
+ *
+ * Must be called while holding ProcArrayLock (in shared mode)
+ */
+void
+KnownAssignedXidsDisplay(int trace_level)
+{
+    HASH_SEQ_STATUS status;
+    TransactionId *knownXid;
+    StringInfoData buf;
+    TransactionId   *xids;
+    int                nxids;
+    int                i;
+
+    xids = palloc(sizeof(TransactionId) * MAX_KNOWN_ASSIGNED_XIDS);
+    nxids = 0;
+
+    hash_seq_init(&status, KnownAssignedXidsHash);
+    while ((knownXid = (TransactionId *) hash_seq_search(&status)) != NULL)
+        xids[nxids++] = *knownXid;
+
+    qsort(xids, nxids, sizeof(TransactionId), xidComparator);
+
+    initStringInfo(&buf);
+
+    for (i = 0; i < nxids; i++)
+        appendStringInfo(&buf, "%u ", xids[i]);
+
+    elog(trace_level, "%d KnownAssignedXids %s", nxids, buf.data);
+
+    pfree(buf.data);
+}
diff --git a/src/backend/storage/ipc/sinvaladt.c b/src/backend/storage/ipc/sinvaladt.c
index aaefa27..707f34e 100644
--- a/src/backend/storage/ipc/sinvaladt.c
+++ b/src/backend/storage/ipc/sinvaladt.c
@@ -145,6 +145,13 @@ typedef struct ProcState
     bool        signaled;        /* backend has been sent catchup signal */

     /*
+     * Backend only sends invalidations, never receives them. This only makes sense
+     * for Startup process during recovery because it doesn't maintain a relcache,
+     * yet it fires inval messages to allow query backends to see schema changes.
+     */
+    bool        sendOnly;        /* backend only sends, never receives */
+
+    /*
      * Next LocalTransactionId to use for each idle backend slot.  We keep
      * this here because it is indexed by BackendId and it is convenient to
      * copy the value to and from local memory when MyBackendId is set. It's
@@ -249,7 +256,7 @@ CreateSharedInvalidationState(void)
  *        Initialize a new backend to operate on the sinval buffer
  */
 void
-SharedInvalBackendInit(void)
+SharedInvalBackendInit(bool sendOnly)
 {
     int            index;
     ProcState  *stateP = NULL;
@@ -308,6 +315,7 @@ SharedInvalBackendInit(void)
     stateP->nextMsgNum = segP->maxMsgNum;
     stateP->resetState = false;
     stateP->signaled = false;
+    stateP->sendOnly = sendOnly;

     LWLockRelease(SInvalWriteLock);

@@ -579,7 +587,9 @@ SICleanupQueue(bool callerHasWriteLock, int minFree)
     /*
      * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
      * furthest-back backend that needs signaling (if any), and reset any
-     * backends that are too far back.
+     * backends that are too far back.  Note that because we ignore sendOnly
+     * backends here it is possible for them to keep sending messages without
+     * a problem even when they are the only active backend.
      */
     min = segP->maxMsgNum;
     minsig = min - SIG_THRESHOLD;
@@ -591,7 +601,7 @@ SICleanupQueue(bool callerHasWriteLock, int minFree)
         int            n = stateP->nextMsgNum;

         /* Ignore if inactive or already in reset state */
-        if (stateP->procPid == 0 || stateP->resetState)
+        if (stateP->procPid == 0 || stateP->resetState || stateP->sendOnly)
             continue;

         /*
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index 0150d11..c80eb09 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -35,9 +35,11 @@
 #include "access/transam.h"
 #include "access/twophase.h"
 #include "access/twophase_rmgr.h"
+#include "access/xact.h"
 #include "miscadmin.h"
 #include "pg_trace.h"
 #include "pgstat.h"
+#include "utils/inval.h"
 #include "utils/memutils.h"
 #include "utils/ps_status.h"
 #include "utils/resowner.h"
@@ -483,6 +485,7 @@ LockAcquire(const LOCKTAG *locktag,
     int            partition;
     LWLockId    partitionLock;
     int            status;
+    bool        logLockrequest = false;

     if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
         elog(ERROR, "unrecognized lock method: %d", lockmethodid);
@@ -490,6 +493,15 @@ LockAcquire(const LOCKTAG *locktag,
     if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
         elog(ERROR, "unrecognized lock mode: %d", lockmode);

+    if (RecoveryInProgress() &&
+        locktag->locktag_type == LOCKTAG_OBJECT &&
+        lockmode > AccessShareLock)
+        ereport(ERROR,
+                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                 errmsg("cannot acquire lockmode %s on database objects while recovery is in progress",
+                                    lockMethodTable->lockModeNames[lockmode]),
+                 errhint("Only AccessShareLock can be acquired on database objects during recovery.")));
+
 #ifdef LOCK_DEBUG
     if (LOCK_DEBUG_ENABLED(locktag))
         elog(LOG, "LockAcquire: lock [%u,%u] %s",
@@ -554,6 +566,19 @@ LockAcquire(const LOCKTAG *locktag,
     }

     /*
+     * Does acquisition of this lock need to be replayed in a standby server?
+     * Only AccessExclusiveLocks can conflict with lock types that read-only
+     * transactions can acquire in a standby server.
+     *
+     * Make sure this definition matches the one GetRunningTransactionLocks().
+     */
+    if (lockmode >= AccessExclusiveLock &&
+        locktag->locktag_type == LOCKTAG_RELATION)
+    {
+        logLockrequest = true;
+    }
+
+    /*
      * Otherwise we've got to mess with the shared lock table.
      */
     hashcode = locallock->hashcode;
@@ -827,6 +852,60 @@ LockAcquire(const LOCKTAG *locktag,

     LWLockRelease(partitionLock);

+    /*
+     * We made it all the way here. We've got the Lock and we've got
+     * it for the first time in this transaction. So now it's time
+     * to send a WAL message, if required.
+     */
+    if (logLockrequest && !RecoveryInProgress() && XLogArchivingActive())
+    {
+        XLogRecData        rdata;
+        xl_rel_lock        xlrec;
+
+        /*
+         * Ensure that if bgwriter is just making a snapshot of locks
+         * and running-xacts, we won't write the WAL record until it's done.
+         * In case the other process didn't see our lock in the lock table yet,
+         * we want the XLOG_RELATION_LOCK WAL record to appear *after* the
+         * running-xacts record.
+         */
+        LWLockAcquire(RecoveryInfoLock, LW_SHARED);
+
+        /*
+         * First thing we do is ensure that a TransactionId has been
+         * assigned to this transaction. We don't actually need the xid yet
+         * but if we don't do this then RecordTransactionCommit() and
+         * RecordTransactionAbort() will optimise away the transaction
+         * completion record which recovery relies upon to release locks.
+         * It's a hack, but for a corner case not worth adding code for
+         * into the main commit path.
+         */
+        xlrec.xid = GetTopTransactionId();
+
+        Assert(OidIsValid(locktag->locktag_field2));
+
+        START_CRIT_SECTION();
+
+        /*
+         * Decode the locktag back to the original values, to avoid
+         * sending lots of empty bytes with every message.  See
+         * lock.h to check how a locktag is defined for LOCKTAG_RELATION
+         */
+        xlrec.dbOid = locktag->locktag_field1;
+        xlrec.relOid = locktag->locktag_field2;
+
+        rdata.data = (char *) (&xlrec);
+        rdata.len = sizeof(xl_rel_lock);
+        rdata.buffer = InvalidBuffer;
+        rdata.next = NULL;
+
+        (void) XLogInsert(RM_RELATION_ID, XLOG_RELATION_LOCK, &rdata);
+
+        END_CRIT_SECTION();
+
+        LWLockRelease(RecoveryInfoLock);
+    }
+
     return LOCKACQUIRE_OK;
 }

@@ -2193,6 +2272,80 @@ GetLockStatusData(void)
     return data;
 }

+/*
+ * Returns a list of currently held AccessExclusiveLocks, for use
+ * by GetRunningTransactionData(). Must be called with RelationInfoLock
+ * held and loggableLocks must be already allocated.
+ */
+int
+GetRunningTransactionLocks(xl_rel_lock *loggableLocks, int maximum)
+{
+    PROCLOCK   *proclock;
+    HASH_SEQ_STATUS seqstat;
+    int            i;
+    int         index;
+
+
+    /*
+     * Acquire lock on the entire shared lock data structure.
+     *
+     * XXX We could optimise this to look at one partition at a time
+     * because the only locks we want to look at cannot change because we hold
+     * RecoveryInfoLock. Other locks may change as we do this, but we don't
+     * care. A reason to *not* do this is we would need to scan the lock
+     * table N times, where N = NUM_LOCK_PARTITIONS, or find a better way.
+     *
+     * Must grab LWLocks in partition-number order to avoid LWLock deadlock.
+     */
+    for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
+        LWLockAcquire(FirstLockMgrLock + i, LW_SHARED);
+
+    /* Now scan the tables to copy the data */
+    hash_seq_init(&seqstat, LockMethodProcLockHash);
+
+    /*
+     * If lock is a currently granted AccessExclusiveLock then
+     * it will have just one proclock holder, so locks are never
+     * accessed twice in this particular case. Don't copy this code
+     * for use elsewhere because in the general case this will
+     * give you duplicate locks when looking at non-exclusive lock types.
+     */
+    index = 0;
+    while ((proclock = (PROCLOCK *) hash_seq_search(&seqstat)))
+    {
+        /* make sure this definition matches the one used in LockAcquire */
+        if ((proclock->holdMask & LOCKBIT_ON(AccessExclusiveLock)) &&
+            proclock->tag.myLock->tag.locktag_type == LOCKTAG_RELATION)
+        {
+            PGPROC    *proc = proclock->tag.myProc;
+            LOCK    *lock = proclock->tag.myLock;
+
+            Assert(lock->nGranted == 1);
+
+            if (index < maximum)
+            {
+                loggableLocks[index].xid     = proc->xid;
+                loggableLocks[index].dbOid  = lock->tag.locktag_field1;
+                loggableLocks[index].relOid = lock->tag.locktag_field2;
+            }
+
+            index++;
+        }
+    }
+
+    /*
+     * And release locks.  We do this in reverse order for two reasons: (1)
+     * Anyone else who needs more than one of the locks will be trying to lock
+     * them in increasing order; we don't want to release the other process
+     * until it can get all the locks it needs. (2) This avoids O(N^2)
+     * behavior inside LWLockRelease.
+     */
+    for (i = NUM_LOCK_PARTITIONS; --i >= 0;)
+        LWLockRelease(FirstLockMgrLock + i);
+
+    return index;
+}
+
 /* Provide the textual name of any lock mode */
 const char *
 GetLockmodeName(LOCKMETHODID lockmethodid, LOCKMODE mode)
@@ -2288,6 +2441,24 @@ DumpAllLocks(void)
  * Because this function is run at db startup, re-acquiring the locks should
  * never conflict with running transactions because there are none.  We
  * assume that the lock state represented by the stored 2PC files is legal.
+ *
+ * When switching from Hot Standby mode to normal operation, the locks will
+ * be already held by the startup process. The locks are acquired for the new
+ * procs without checking for conflicts, so we don'get a conflict between the
+ * startup process and the dummy procs, even though we will momentarily have
+ * a situation where two procs are holding the same AccessExclusiveLock,
+ * which isn't normally possible because the conflict. If we're in standby
+ * mode, but a recovery snapshot hasn't been established yet, it's possible
+ * that some but not all of the locks are already held by the startup process.
+ *
+ * This approach is simple, but also a bit dangerous, because if there isn't
+ * enough shared memory to acquire the locks, an error will be thrown, which
+ * is promoted to FATAL and recovery will abort, bringing down postmaster.
+ * A safer approach would be to transfer the locks like we do in
+ * AtPrepare_Locks, but then again, in hot standby mode it's possible for
+ * read-only backends to use up all the shared lock memory anyway, so that
+ * replaying the WAL record that needs to acquire a lock will throw an error
+ * and PANIC anyway.
  */
 void
 lock_twophase_recover(TransactionId xid, uint16 info,
@@ -2443,6 +2614,8 @@ lock_twophase_recover(TransactionId xid, uint16 info,

     /*
      * We ignore any possible conflicts and just grant ourselves the lock.
+     * Not only because we don't bother, but also to avoid deadlocks when
+     * switching from standby to normal mode. See function comment.
      */
     GrantLock(lock, proclock, lockmode);

@@ -2450,6 +2623,37 @@ lock_twophase_recover(TransactionId xid, uint16 info,
 }

 /*
+ * Re-acquire a lock belonging to a transaction that was prepared, when
+ * when starting up into hot standby mode.
+ */
+void
+lock_twophase_standby_recover(TransactionId xid, uint16 info,
+                              void *recdata, uint32 len)
+{
+    TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
+    LOCKTAG    *locktag;
+    LOCKMODE    lockmode;
+    LOCKMETHODID lockmethodid;
+
+    Assert(len == sizeof(TwoPhaseLockRecord));
+    locktag = &rec->locktag;
+    lockmode = rec->lockmode;
+    lockmethodid = locktag->locktag_lockmethodid;
+
+    if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
+        elog(ERROR, "unrecognized lock method: %d", lockmethodid);
+
+    if (lockmode == AccessExclusiveLock &&
+        locktag->locktag_type == LOCKTAG_RELATION)
+    {
+        RelationAddRecoveryLock(xid,
+                                locktag->locktag_field1 /* dboid */,
+                                locktag->locktag_field2 /* reloid */);
+    }
+}
+
+
+/*
  * 2PC processing routine for COMMIT PREPARED case.
  *
  * Find and release the lock indicated by the 2PC record.
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index ff2811f..352ff08 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -318,6 +318,7 @@ InitProcess(void)
     MyProc->waitProcLock = NULL;
     for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
         SHMQueueInit(&(MyProc->myProcLocks[i]));
+    MyProc->recoveryConflictMode = 0;

     /*
      * We might be reusing a semaphore that belonged to a failed process. So
@@ -374,6 +375,11 @@ InitProcessPhase2(void)
  * to the ProcArray or the sinval messaging mechanism, either.    They also
  * don't get a VXID assigned, since this is only useful when we actually
  * hold lockmgr locks.
+ *
+ * Startup process however uses locks but never waits for them in the
+ * normal backend sense. Startup process also takes part in sinval messaging
+ * as a sendOnly process, so never reads messages from sinval queue. So
+ * Startup process does have a VXID and does show up in pg_locks.
  */
 void
 InitAuxiliaryProcess(void)
@@ -462,6 +468,24 @@ InitAuxiliaryProcess(void)
 }

 /*
+ * Record the PID and PGPROC structures for the Startup process, for use in
+ * ProcSendSignal().  See comments there for further explanation.
+ */
+void
+PublishStartupProcessInformation(void)
+{
+    /* use volatile pointer to prevent code rearrangement */
+    volatile PROC_HDR *procglobal = ProcGlobal;
+
+    SpinLockAcquire(ProcStructLock);
+
+    procglobal->startupProc = MyProc;
+    procglobal->startupProcPid = MyProcPid;
+
+    SpinLockRelease(ProcStructLock);
+}
+
+/*
  * Check whether there are at least N free PGPROC objects.
  *
  * Note: this is designed on the assumption that N will generally be small.
@@ -1289,7 +1313,31 @@ ProcWaitForSignal(void)
 void
 ProcSendSignal(int pid)
 {
-    PGPROC       *proc = BackendPidGetProc(pid);
+    PGPROC       *proc = NULL;
+
+    if (RecoveryInProgress())
+    {
+        /* use volatile pointer to prevent code rearrangement */
+        volatile PROC_HDR *procglobal = ProcGlobal;
+
+        SpinLockAcquire(ProcStructLock);
+
+        /*
+         * Check to see whether it is the Startup process we wish to signal.
+         * This call is made by the buffer manager when it wishes to wake
+         * up a process that has been waiting for a pin in so it can obtain a
+         * cleanup lock using LockBufferForCleanup(). Startup is not a normal
+         * backend, so BackendPidGetProc() will not return any pid at all.
+         * So we remember the information for this special case.
+         */
+        if (pid == procglobal->startupProcPid)
+            proc = procglobal->startupProc;
+
+        SpinLockRelease(ProcStructLock);
+    }
+
+    if (proc == NULL)
+        proc = BackendPidGetProc(pid);

     if (proc != NULL)
         PGSemaphoreUnlock(&proc->sem);
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 0672652..005c1c0 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -66,6 +66,7 @@
 #include "tcop/pquery.h"
 #include "tcop/tcopprot.h"
 #include "tcop/utility.h"
+#include "utils/inval.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/ps_status.h"
@@ -2642,8 +2643,8 @@ StatementCancelHandler(SIGNAL_ARGS)
          * the interrupt immediately.  No point in interrupting if we're
          * waiting for input, however.
          */
-        if (ImmediateInterruptOK && InterruptHoldoffCount == 0 &&
-            CritSectionCount == 0 && !DoingCommandRead)
+        if (InterruptHoldoffCount == 0 && CritSectionCount == 0 &&
+            (DoingCommandRead || ImmediateInterruptOK))
         {
             /* bump holdoff count to make ProcessInterrupts() a no-op */
             /* until we are done getting ready for it */
@@ -2734,9 +2735,76 @@ ProcessInterrupts(void)
                     (errcode(ERRCODE_QUERY_CANCELED),
                      errmsg("canceling autovacuum task")));
         else
+        {
+            int cancelMode = MyProc->recoveryConflictMode;
+
+            if (DoingCommandRead && IsTransactionBlock())
+            {
+                switch (cancelMode)
+                {
+                    /*
+                     * XXXHS: We don't yet have a clean way to cancel an
+                     * idle-in-transaction session, so make it FATAL instead.
+                     */
+                    case CONFLICT_MODE_ERROR:
+                        cancelMode = CONFLICT_MODE_FATAL;
+                            break;
+
+                    case CONFLICT_MODE_ERROR_IF_NOT_IDLE:
+                        cancelMode = CONFLICT_MODE_NOT_SET;
+                            break;
+
+                    default:
+                            break;
+                }
+            }
+
+            switch (cancelMode)
+            {
+                case CONFLICT_MODE_FATAL:
+                        Assert(RecoveryInProgress());
+                        ereport(FATAL,
+                            (errcode(ERRCODE_QUERY_CANCELED),
+                             errmsg("canceling session due to conflict with recovery")));
+
+                case CONFLICT_MODE_ERROR_DEFERRABLE:
+                        /*
+                         * XXX This mode acts the same as CONFLICT_MODE_ERROR
+                         * though in the future it is possible that we might
+                         * defer cancelation until the point where a query
+                         * attempts to read a block that has been cleaned
+                         * by checking the block LSN in the buffer manager.
+                         * Until then, hasta la vista, baby: drop through.
+                         */
+
+                case CONFLICT_MODE_ERROR:
+                case CONFLICT_MODE_ERROR_IF_NOT_IDLE:
+                        /*
+                         * We are aborting because we need to release
+                         * locks. So we need to abort out of all
+                         * subtransactions to make sure we release
+                         * all locks at whatever their level.
+                         *
+                         * XXX Should we try to examine the
+                         * transaction tree and cancel just enough
+                         * subxacts to remove locks? Doubt it.
+                         */
+                        Assert(RecoveryInProgress());
+                        AbortOutOfAnyTransaction();
+                        ereport(ERROR,
+                            (errcode(ERRCODE_QUERY_CANCELED),
+                             errmsg("canceling statement due to conflict with recovery")));
+                        return;
+
+                default:
+                        /* No conflict pending, so fall through */
+                        break;
+            }
+
             ereport(ERROR,
                     (errcode(ERRCODE_QUERY_CANCELED),
                      errmsg("canceling statement due to user request")));
+        }
     }
     /* If we get here, do nothing (probably, QueryCancelPending was reset) */
 }
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 40252fa..104311b 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -290,9 +290,20 @@ ProcessUtility(Node *parsetree,
                                                   list_make1(item->arg),
                                                   true);
                                 else if (strcmp(item->defname, "transaction_read_only") == 0)
+                                {
+                                    A_Const       *con;
+
+                                    Assert(IsA(item->arg, A_Const));
+                                    con = (A_Const *) item->arg;
+                                    Assert(nodeTag(&con->val) == T_Integer);
+
+                                    if (!intVal(&con->val))
+                                        PreventCommandDuringRecovery();
+
                                     SetPGVariable("transaction_read_only",
                                                   list_make1(item->arg),
                                                   true);
+                                }
                             }
                         }
                         break;
@@ -307,6 +318,7 @@ ProcessUtility(Node *parsetree,
                         break;

                     case TRANS_STMT_PREPARE:
+                        PreventCommandDuringRecovery();
                         if (!PrepareTransactionBlock(stmt->gid))
                         {
                             /* report unsuccessful commit in completionTag */
@@ -316,11 +328,13 @@ ProcessUtility(Node *parsetree,
                         break;

                     case TRANS_STMT_COMMIT_PREPARED:
+                        PreventCommandDuringRecovery();
                         PreventTransactionChain(isTopLevel, "COMMIT PREPARED");
                         FinishPreparedTransaction(stmt->gid, true);
                         break;

                     case TRANS_STMT_ROLLBACK_PREPARED:
+                        PreventCommandDuringRecovery();
                         PreventTransactionChain(isTopLevel, "ROLLBACK PREPARED");
                         FinishPreparedTransaction(stmt->gid, false);
                         break;
@@ -695,6 +709,7 @@ ProcessUtility(Node *parsetree,
             break;

         case T_GrantStmt:
+            PreventCommandDuringRecovery();
             ExecuteGrantStmt((GrantStmt *) parsetree);
             break;

@@ -875,6 +890,7 @@ ProcessUtility(Node *parsetree,
         case T_NotifyStmt:
             {
                 NotifyStmt *stmt = (NotifyStmt *) parsetree;
+                PreventCommandDuringRecovery();

                 Async_Notify(stmt->conditionname);
             }
@@ -883,6 +899,7 @@ ProcessUtility(Node *parsetree,
         case T_ListenStmt:
             {
                 ListenStmt *stmt = (ListenStmt *) parsetree;
+                PreventCommandDuringRecovery();

                 Async_Listen(stmt->conditionname);
             }
@@ -891,6 +908,7 @@ ProcessUtility(Node *parsetree,
         case T_UnlistenStmt:
             {
                 UnlistenStmt *stmt = (UnlistenStmt *) parsetree;
+                PreventCommandDuringRecovery();

                 if (stmt->conditionname)
                     Async_Unlisten(stmt->conditionname);
@@ -910,10 +928,12 @@ ProcessUtility(Node *parsetree,
             break;

         case T_ClusterStmt:
+            PreventCommandDuringRecovery();
             cluster((ClusterStmt *) parsetree, isTopLevel);
             break;

         case T_VacuumStmt:
+            PreventCommandDuringRecovery();
             vacuum((VacuumStmt *) parsetree, InvalidOid, true, NULL, false,
                    isTopLevel);
             break;
@@ -1031,12 +1051,21 @@ ProcessUtility(Node *parsetree,
                 ereport(ERROR,
                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                          errmsg("must be superuser to do CHECKPOINT")));
-            RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
+            /*
+             * You might think we should have a PreventCommandDuringRecovery()
+             * here, but we interpret a CHECKPOINT command during recovery
+             * as a request for a restartpoint instead. We allow this since
+             * it can be a useful way of reducing switchover time when
+             * using various forms of replication.
+             */
+            RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_WAIT |
+                                (RecoveryInProgress() ? 0 : CHECKPOINT_FORCE));
             break;

         case T_ReindexStmt:
             {
                 ReindexStmt *stmt = (ReindexStmt *) parsetree;
+                PreventCommandDuringRecovery();

                 switch (stmt->kind)
                 {
@@ -2549,3 +2578,12 @@ GetCommandLogLevel(Node *parsetree)

     return lev;
 }
+
+void
+PreventCommandDuringRecovery(void)
+{
+    if (RecoveryInProgress())
+        ereport(ERROR,
+            (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
+             errmsg("cannot be executed during recovery")));
+}
diff --git a/src/backend/utils/adt/selfuncs.c b/src/backend/utils/adt/selfuncs.c
index de8ea09..b19d4d8 100644
--- a/src/backend/utils/adt/selfuncs.c
+++ b/src/backend/utils/adt/selfuncs.c
@@ -5922,6 +5922,21 @@ hashcostestimate(PG_FUNCTION_ARGS)
     Selectivity *indexSelectivity = (Selectivity *) PG_GETARG_POINTER(6);
     double       *indexCorrelation = (double *) PG_GETARG_POINTER(7);

+    /*
+     * Hash indexes are not recoverable, so should not be used
+     * during recovery mode. Strongly dissuade attempts to use them by adding
+     * the same disableCost as if enable_indexscan = off was selected,
+     * but only for this index type.
+     *
+     * XXXHS This is an ugly hack that we expect to remove again
+     * as soon as hash indexes write WAL records. We will still need
+     * something like this in future to prevent indexes that we
+     * know to be corrupt from being used during recovery, but that will
+     * require a wider and deeper fix than this.
+     */
+    if (RecoveryInProgress())
+        *indexStartupCost += 100000000.0;
+
     genericcostestimate(root, index, indexQuals, outer_rel, 0.0,
                         indexStartupCost, indexTotalCost,
                         indexSelectivity, indexCorrelation);
diff --git a/src/backend/utils/adt/txid.c b/src/backend/utils/adt/txid.c
index 7e51f9e..3ea7597 100644
--- a/src/backend/utils/adt/txid.c
+++ b/src/backend/utils/adt/txid.c
@@ -24,6 +24,7 @@
 #include "access/transam.h"
 #include "access/xact.h"
 #include "funcapi.h"
+#include "miscadmin.h"
 #include "libpq/pqformat.h"
 #include "utils/builtins.h"
 #include "utils/snapmgr.h"
@@ -338,6 +339,15 @@ txid_current(PG_FUNCTION_ARGS)
     txid        val;
     TxidEpoch    state;

+    /*
+     * Must prevent during recovery because if an xid is
+     * not assigned we try to assign one, which would fail.
+     * Programs already rely on this function to always
+     * return a valid current xid, so we should not change
+     * this to return NULL or similar invalid xid.
+     */
+    PreventCommandDuringRecovery();
+
     load_xid_epoch(&state);

     val = convert_xid(GetTopTransactionId(), &state);
diff --git a/src/backend/utils/adt/xid.c b/src/backend/utils/adt/xid.c
index 9ea702c..43e2271 100644
--- a/src/backend/utils/adt/xid.c
+++ b/src/backend/utils/adt/xid.c
@@ -102,6 +102,25 @@ xid_age(PG_FUNCTION_ARGS)
     PG_RETURN_INT32((int32) (now - xid));
 }

+/*
+ * xidComparator
+ *        qsort comparison function for XIDs
+ *
+ * We can't need to use wraparound comparison for XIDs because that does not
+ * respect the triangle inequality!  Any old sort order will do.
+ */
+int
+xidComparator(const void *arg1, const void *arg2)
+{
+    TransactionId xid1 = *(const TransactionId *) arg1;
+    TransactionId xid2 = *(const TransactionId *) arg2;
+
+    if (xid1 > xid2)
+        return 1;
+    if (xid1 < xid2)
+        return -1;
+    return 0;
+}

 /*****************************************************************************
  *     COMMAND IDENTIFIER ROUTINES                                             *
diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c
index 92e48d5..0861234 100644
--- a/src/backend/utils/cache/inval.c
+++ b/src/backend/utils/cache/inval.c
@@ -86,14 +86,19 @@
  */
 #include "postgres.h"

+#include "access/transam.h"
 #include "access/twophase_rmgr.h"
 #include "access/xact.h"
 #include "catalog/catalog.h"
 #include "miscadmin.h"
+#include "pgstat.h"
+#include "storage/lmgr.h"
+#include "storage/procarray.h"
 #include "storage/sinval.h"
 #include "storage/smgr.h"
 #include "utils/inval.h"
 #include "utils/memutils.h"
+#include "utils/ps_status.h"
 #include "utils/rel.h"
 #include "utils/syscache.h"

@@ -155,6 +160,13 @@ typedef struct TransInvalidationInfo

 static TransInvalidationInfo *transInvalInfo = NULL;

+static SharedInvalidationMessage *SharedInvalidMessagesArray;
+static int                     numSharedInvalidMessagesArray;
+static int                     maxSharedInvalidMessagesArray;
+
+static List *RecoveryLockList;
+
+
 /*
  * Dynamically-registered callback functions.  Current implementation
  * assumes there won't be very many of these at once; could improve if needed.
@@ -180,14 +192,6 @@ static struct RELCACHECALLBACK

 static int    relcache_callback_count = 0;

-/* info values for 2PC callback */
-#define TWOPHASE_INFO_MSG            0    /* SharedInvalidationMessage */
-#define TWOPHASE_INFO_FILE_BEFORE    1    /* relcache file inval */
-#define TWOPHASE_INFO_FILE_AFTER    2    /* relcache file inval */
-
-static void PersistInvalidationMessage(SharedInvalidationMessage *msg);
-
-
 /* ----------------------------------------------------------------
  *                Invalidation list support functions
  *
@@ -741,38 +745,8 @@ AtStart_Inval(void)
         MemoryContextAllocZero(TopTransactionContext,
                                sizeof(TransInvalidationInfo));
     transInvalInfo->my_level = GetCurrentTransactionNestLevel();
-}
-
-/*
- * AtPrepare_Inval
- *        Save the inval lists state at 2PC transaction prepare.
- *
- * In this phase we just generate 2PC records for all the pending invalidation
- * work.
- */
-void
-AtPrepare_Inval(void)
-{
-    /* Must be at top of stack */
-    Assert(transInvalInfo != NULL && transInvalInfo->parent == NULL);
-
-    /*
-     * Relcache init file invalidation requires processing both before and
-     * after we send the SI messages.
-     */
-    if (transInvalInfo->RelcacheInitFileInval)
-        RegisterTwoPhaseRecord(TWOPHASE_RM_INVAL_ID, TWOPHASE_INFO_FILE_BEFORE,
-                               NULL, 0);
-
-    AppendInvalidationMessages(&transInvalInfo->PriorCmdInvalidMsgs,
-                               &transInvalInfo->CurrentCmdInvalidMsgs);
-
-    ProcessInvalidationMessages(&transInvalInfo->PriorCmdInvalidMsgs,
-                                PersistInvalidationMessage);
-
-    if (transInvalInfo->RelcacheInitFileInval)
-        RegisterTwoPhaseRecord(TWOPHASE_RM_INVAL_ID, TWOPHASE_INFO_FILE_AFTER,
-                               NULL, 0);
+    SharedInvalidMessagesArray = NULL;
+    numSharedInvalidMessagesArray = 0;
 }

 /*
@@ -812,45 +786,102 @@ AtSubStart_Inval(void)
 }

 /*
- * PersistInvalidationMessage
- *        Write an invalidation message to the 2PC state file.
+ * Collect invalidation messages into SharedInvalidMessagesArray array.
  */
 static void
-PersistInvalidationMessage(SharedInvalidationMessage *msg)
+MakeSharedInvalidMessagesArray(const SharedInvalidationMessage *msgs, int n)
 {
-    RegisterTwoPhaseRecord(TWOPHASE_RM_INVAL_ID, TWOPHASE_INFO_MSG,
-                           msg, sizeof(SharedInvalidationMessage));
+    /*
+     * Initialise array first time through in each commit
+     */
+    if (SharedInvalidMessagesArray == NULL)
+    {
+        maxSharedInvalidMessagesArray = FIRSTCHUNKSIZE;
+        numSharedInvalidMessagesArray = 0;
+
+        /*
+         * Although this is being palloc'd we don't actually free it directly.
+         * We're so close to EOXact that we now we're going to lose it anyhow.
+         */
+        SharedInvalidMessagesArray = palloc(maxSharedInvalidMessagesArray
+                                            * sizeof(SharedInvalidationMessage));
+    }
+
+    if ((numSharedInvalidMessagesArray + n) > maxSharedInvalidMessagesArray)
+    {
+        while ((numSharedInvalidMessagesArray + n) > maxSharedInvalidMessagesArray)
+            maxSharedInvalidMessagesArray *= 2;
+
+        SharedInvalidMessagesArray = repalloc(SharedInvalidMessagesArray,
+                                            maxSharedInvalidMessagesArray
+                                            * sizeof(SharedInvalidationMessage));
+    }
+
+    /*
+     * Append the next chunk onto the array
+     */
+    memcpy(SharedInvalidMessagesArray + numSharedInvalidMessagesArray,
+            msgs, n * sizeof(SharedInvalidationMessage));
+    numSharedInvalidMessagesArray += n;
 }

 /*
- * inval_twophase_postcommit
- *        Process an invalidation message from the 2PC state file.
+ * xactGetCommittedInvalidationMessages() is executed by
+ * RecordTransactionCommit() to add invalidation messages onto the
+ * commit record. This applies only to commit message types, never to
+ * abort records. Must always run before AtEOXact_Inval(), since that
+ * removes the data we need to see.
+ *
+ * Remember that this runs before we have officially committed, so we
+ * must not do anything here to change what might occur *if* we should
+ * fail between here and the actual commit.
+ *
+ * Note that transactional validation does *not* write a invalidation
+ * WAL message using XLOG_RELATION_INVAL messages. Those are only used
+ * by non-transactional invalidation. see comments in
+ * EndNonTransactionalInvalidation().
+ *
+ * see also xact_redo_commit() and xact_desc_commit()
  */
-void
-inval_twophase_postcommit(TransactionId xid, uint16 info,
-                          void *recdata, uint32 len)
+int
+xactGetCommittedInvalidationMessages(SharedInvalidationMessage **msgs,
+                                     bool *RelcacheInitFileInval)
 {
-    SharedInvalidationMessage *msg;
+    MemoryContext oldcontext;

-    switch (info)
-    {
-        case TWOPHASE_INFO_MSG:
-            msg = (SharedInvalidationMessage *) recdata;
-            Assert(len == sizeof(SharedInvalidationMessage));
-            SendSharedInvalidMessages(msg, 1);
-            break;
-        case TWOPHASE_INFO_FILE_BEFORE:
-            RelationCacheInitFileInvalidate(true);
-            break;
-        case TWOPHASE_INFO_FILE_AFTER:
-            RelationCacheInitFileInvalidate(false);
-            break;
-        default:
-            Assert(false);
-            break;
-    }
-}
+    /* Must be at top of stack */
+    Assert(transInvalInfo != NULL && transInvalInfo->parent == NULL);
+
+    /*
+     * Relcache init file invalidation requires processing both before and
+     * after we send the SI messages.  However, we need not do anything
+     * unless we committed.
+     */
+    *RelcacheInitFileInval = transInvalInfo->RelcacheInitFileInval;

+    /*
+     * Walk through TransInvalidationInfo to collect all the messages
+     * into a single contiguous array of invalidation messages. It must
+     * be contiguous so we can copy directly into WAL message. Maintain the
+     * order that they would be processed in by AtEOXact_Inval(), to ensure
+     * emulated behaviour in redo is as similar as possible to original.
+     * We want the same bugs, if any, not new ones.
+     */
+    oldcontext = MemoryContextSwitchTo(CurTransactionContext);
+
+    ProcessInvalidationMessagesMulti(&transInvalInfo->CurrentCmdInvalidMsgs,
+                                     MakeSharedInvalidMessagesArray);
+    ProcessInvalidationMessagesMulti(&transInvalInfo->PriorCmdInvalidMsgs,
+                                     MakeSharedInvalidMessagesArray);
+    MemoryContextSwitchTo(oldcontext);
+
+    Assert(!(numSharedInvalidMessagesArray > 0 &&
+             SharedInvalidMessagesArray == NULL));
+
+    *msgs = SharedInvalidMessagesArray;
+
+    return numSharedInvalidMessagesArray;
+}

 /*
  * AtEOXact_Inval
@@ -1041,6 +1072,42 @@ BeginNonTransactionalInvalidation(void)
     Assert(transInvalInfo->CurrentCmdInvalidMsgs.cclist == NULL);
     Assert(transInvalInfo->CurrentCmdInvalidMsgs.rclist == NULL);
     Assert(transInvalInfo->RelcacheInitFileInval == false);
+
+    SharedInvalidMessagesArray = NULL;
+    numSharedInvalidMessagesArray = 0;
+}
+
+/*
+ * General function to log the SharedInvalidMessagesArray. Only current
+ * caller is EndNonTransactionalInvalidation(), but that may change.
+ */
+static void
+LogSharedInvalidMessagesArray(void)
+{
+    XLogRecData        rdata[2];
+    xl_rel_inval    xlrec;
+
+    if (numSharedInvalidMessagesArray == 0)
+        return;
+
+    START_CRIT_SECTION();
+
+    xlrec.nmsgs = numSharedInvalidMessagesArray;
+
+    rdata[0].data = (char *) (&xlrec);
+    rdata[0].len = MinSizeOfRelationInval;
+    rdata[0].buffer = InvalidBuffer;
+
+    rdata[0].next = &(rdata[1]);
+    rdata[1].data = (char *) SharedInvalidMessagesArray;
+    rdata[1].len = numSharedInvalidMessagesArray *
+                                sizeof(SharedInvalidationMessage);
+    rdata[1].buffer = InvalidBuffer;
+    rdata[1].next = NULL;
+
+    (void) XLogInsert(RM_RELATION_ID, XLOG_RELATION_INVAL, rdata);
+
+    END_CRIT_SECTION();
 }

 /*
@@ -1081,6 +1148,24 @@ EndNonTransactionalInvalidation(void)
     ProcessInvalidationMessagesMulti(&transInvalInfo->CurrentCmdInvalidMsgs,
                                      SendSharedInvalidMessages);

+    /*
+     * Write invalidation messages to WAL. This is not required for
+     * recovery, it is only required for standby servers. It's fairly
+     * low overhead so don't worry. This allows us to trigger inval
+     * messages on the standby as soon as we see these records.
+     * see relation_redo_inval()
+     *
+     * Note that transactional validation uses an array attached to
+     * a WAL commit record, so non-transactional messages are only
+     * written by VACUUM FULL on catalog tables.
+     */
+    if (XLogArchivingActive())
+    {
+        ProcessInvalidationMessagesMulti(&transInvalInfo->CurrentCmdInvalidMsgs,
+                                         MakeSharedInvalidMessagesArray);
+        LogSharedInvalidMessagesArray();
+    }
+
     /* Clean up and release memory */
     for (chunk = transInvalInfo->CurrentCmdInvalidMsgs.cclist;
          chunk != NULL;
@@ -1235,3 +1320,455 @@ CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,

     ++relcache_callback_count;
 }
+
+/*
+ * -----------------------------------------------------
+ *         Standby wait timers and backend cancel logic
+ * -----------------------------------------------------
+ */
+
+#define STANDBY_INITIAL_WAIT_US  1000
+static int standbyWait_us = STANDBY_INITIAL_WAIT_US;
+
+/*
+ * Standby wait logic for ResolveRecoveryConflictWithVirtualXIDs.
+ * We wait here for a while then return. If we decide we can't wait any
+ * more then we return true, if we can wait some more return false.
+ */
+static bool
+WaitExceedsMaxStandbyDelay(void)
+{
+    int        maxStandbyDelay = GetMaxStandbyDelay();
+    long    delay_secs;
+    int        delay_usecs;
+
+    /* max_standby_delay = -1 means wait forever, if necessary */
+    if (maxStandbyDelay < 0)
+        return false;
+
+    /* Are we past max_standby_delay? */
+    TimestampDifference(GetLatestXLogTime(), GetCurrentTimestamp(),
+                        &delay_secs, &delay_usecs);
+    if (delay_secs > maxStandbyDelay)
+        return true;
+
+    /*
+     * Sleep, then do bookkeeping.
+     */
+    pg_usleep(standbyWait_us);
+
+    /*
+     * Progressively increase the sleep times.
+     */
+    standbyWait_us *= 2;
+    if (standbyWait_us > 1000000)
+        standbyWait_us = 1000000;
+    if (standbyWait_us > maxStandbyDelay * 1000000 / 4)
+        standbyWait_us = maxStandbyDelay * 1000000 / 4;
+
+    return false;
+}
+
+/*
+ * This is the main executioner for any query backend that conflicts with
+ * recovery processing. Judgement has already been passed on it within
+ * a specific rmgr. Here we just issue the orders to the procs. The procs
+ * then throw the required error as instructed.
+ *
+ * We may ask for a specific cancel_mode, typically ERROR or FATAL.
+ *
+ * If we want an ERROR, we may defer that until the buffer manager
+ * sees a recently changed block. If we want this we must specify a
+ * valid conflict_lsn.
+ */
+void
+ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
+                                        char *reason, int cancel_mode,
+                                        XLogRecPtr conflict_lsn)
+{
+    char        waitactivitymsg[100];
+
+    Assert(cancel_mode > 0);
+
+    while (VirtualTransactionIdIsValid(*waitlist))
+    {
+        long wait_s;
+        int wait_us;
+        TimestampTz waitStart;
+        bool        logged;
+
+        waitStart = GetCurrentTimestamp();
+        standbyWait_us = STANDBY_INITIAL_WAIT_US;
+        logged = false;
+
+        /* wait until the virtual xid is gone */
+        while(!ConditionalVirtualXactLockTableWait(*waitlist))
+        {
+            /*
+             * Report if we have been waiting for a while now...
+             */
+            TimestampTz now = GetCurrentTimestamp();
+            TimestampDifference(waitStart, now, &wait_s, &wait_us);
+            if (!logged && (wait_s > 0 || wait_us > 500000))
+            {
+                const char *oldactivitymsg;
+                int            len;
+
+                oldactivitymsg = get_ps_display(&len);
+                snprintf(waitactivitymsg, sizeof(waitactivitymsg),
+                         "waiting for max_standby_delay (%u ms)",
+                         GetMaxStandbyDelay());
+                set_ps_display(waitactivitymsg, false);
+                if (len > 100)
+                    len = 100;
+                memcpy(waitactivitymsg, oldactivitymsg, len);
+
+                ereport(trace_recovery(DEBUG5),
+                        (errmsg("virtual transaction %u/%u is blocking %s",
+                                waitlist->backendId,
+                                waitlist->localTransactionId,
+                                reason)));
+
+                pgstat_report_waiting(true);
+
+                logged = true;
+            }
+
+            /* Is it time to kill it? */
+            if (WaitExceedsMaxStandbyDelay())
+            {
+                pid_t pid;
+
+                /*
+                 * Now find out who to throw out of the balloon.
+                 */
+                Assert(VirtualTransactionIdIsValid(*waitlist));
+                pid = CancelVirtualTransaction(*waitlist, cancel_mode);
+
+                if (pid != 0)
+                {
+                    /*
+                     * Startup process debug messages
+                     */
+                    switch (cancel_mode)
+                    {
+                        /*
+                         * XXX: these messages don't translate well, because
+                         * of the way the reason is passed in.
+                         */
+                        case CONFLICT_MODE_FATAL:
+                            ereport(trace_recovery(DEBUG1),
+                                    (errmsg("recovery disconnects session with pid %d because of conflict with %s",
+                                            pid,
+                                            reason)));
+                            break;
+                        case CONFLICT_MODE_ERROR:
+                            ereport(trace_recovery(DEBUG1),
+                                    (errmsg("recovery cancels virtual transaction %u/%u pid %d because of conflict
with%s", 
+                                            waitlist->backendId,
+                                            waitlist->localTransactionId,
+                                            pid,
+                                            reason)));
+                            break;
+                        default:
+                            /* No conflict pending, so fall through */
+                            break;
+                    }
+
+                    /*
+                     * Wait awhile for it to die so that we avoid flooding an
+                     * unresponsive backend when system is heavily loaded.
+                     */
+                    pg_usleep(5000);
+                }
+            }
+        }
+
+        /* Reset ps display */
+        if (logged)
+        {
+            set_ps_display(waitactivitymsg, false);
+            pgstat_report_waiting(false);
+        }
+
+        /* The virtual transaction is gone now, wait for the next one */
+        waitlist++;
+    }
+}
+
+/*
+ * -----------------------------------------------------
+ * Locking in Recovery Mode
+ * -----------------------------------------------------
+ *
+ * All locks are held by the Startup process using a single virtual
+ * transaction. This implementation is both simpler and in some senses,
+ * more correct. The locks held mean "some original transaction held
+ * this lock, so query access is not allowed at this time". So the Startup
+ * process is the proxy by which the original locks are implemented.
+ *
+ * We only keep track of AccessExclusiveLocks, which are only ever held by
+ * one transaction on one relation, and don't worry about lock queuing.
+ *
+ * We keep a single dynamically expandible list of locks in local memory,
+ * RelationLockList, so we can keep track of the various entried made by
+ * the Startup process's virtual xid in the shared lock table.
+ *
+ * List elements use type xl_rel_lock, since the WAL record type exactly
+ * matches the information that we need to keep track of.
+ *
+ * We use session locks rather than normal locks so we don't need
+ * ResourceOwners.
+ */
+
+/* called by relation_redo_locks() */
+void
+RelationAddRecoveryLock(TransactionId xid, Oid dbOid, Oid relOid)
+{
+    xl_rel_lock     *newlock;
+    LOCKTAG            locktag;
+
+    elog(trace_recovery(DEBUG4),
+         "adding recovery lock: db %d rel %d", dbOid, relOid);
+
+    /*
+     * dbOid is InvalidOid when we are locking a shared relation.
+     */
+    Assert(OidIsValid(relOid));
+
+    newlock = palloc(sizeof(xl_rel_lock));
+    newlock->xid = xid;
+    newlock->dbOid = dbOid;
+    newlock->relOid = relOid;
+    RecoveryLockList = lappend(RecoveryLockList, newlock);
+
+    /*
+     * Attempt to acquire the lock as requested.
+     */
+    SET_LOCKTAG_RELATION(locktag, newlock->dbOid, newlock->relOid);
+
+    /*
+     * Wait for lock to clear or kill anyone in our way.
+     */
+    while (LockAcquire(&locktag, AccessExclusiveLock, true, true)
+                                            == LOCKACQUIRE_NOT_AVAIL)
+    {
+        VirtualTransactionId *old_lockholders;
+
+        old_lockholders = GetLockConflicts(&locktag, AccessExclusiveLock);
+        ResolveRecoveryConflictWithVirtualXIDs(old_lockholders,
+                                                "exclusive lock",
+                                                CONFLICT_MODE_ERROR,
+                                                InvalidXLogRecPtr);
+        /*
+         * It's possible that other users may have aquired the lock while
+         * we were waiting for the first group of users. So we loop.
+         *
+         * XXXHS We could mark the lock as unobtainable to ensure that users
+         * abort immediately to avoid starving the startup process,
+         * but once we start it's going to be a bloodbath however we do it
+         * so keep it simple on the assumption that it's unlikely users
+         * will endlessly spring alive again and re-request the same lock.
+         */
+    }
+}
+
+static void
+RelationReleaseRecoveryLocks(TransactionId xid)
+{
+    ListCell   *cell,
+               *prev,
+               *next;
+
+    /*
+     * Release all matching locks and remove them from list
+     */
+    prev = NULL;
+    for (cell = list_head(RecoveryLockList); cell; cell = next)
+    {
+        xl_rel_lock *lock = (xl_rel_lock *) lfirst(cell);
+        next = lnext(cell);
+
+        if (!TransactionIdIsValid(xid) || lock->xid == xid)
+        {
+            LOCKTAG        locktag;
+
+            elog(trace_recovery(DEBUG4),
+                    "releasing recovery lock: xid %u db %d rel %d",
+                            lock->xid, lock->dbOid, lock->relOid);
+            SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
+            if (!LockRelease(&locktag, AccessExclusiveLock, true))
+                elog(trace_recovery(LOG),
+                    "RecoveryLockList contains entry for lock "
+                    "no longer recorded by lock manager "
+                    "xid %u database %d relation %d",
+                        lock->xid, lock->dbOid, lock->relOid);
+
+            RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev);
+            pfree(lock);
+        }
+        else
+            prev = cell;
+    }
+}
+
+/*
+ * Release locks for a transaction tree, starting at xid down, from
+ * RecoveryLockList.
+ *
+ * Called during WAL replay of COMMIT/ROLLBACK when in hot standby mode,
+ * to remove any AccessExclusiveLocks requested by a transaction.
+ */
+void
+RelationReleaseRecoveryLockTree(TransactionId xid, int nsubxids,
+                                TransactionId *subxids)
+{
+    int i;
+
+    RelationReleaseRecoveryLocks(xid);
+
+    for (i = 0; i < nsubxids; i++)
+        RelationReleaseRecoveryLocks(subxids[i]);
+}
+
+/*
+ * Called at end of recovery and when we see a shutdown checkpoint.
+ */
+void
+RelationReleaseAllRecoveryLocks(void)
+{
+    elog(trace_recovery(DEBUG2), "release all recovery locks");
+    RelationReleaseRecoveryLocks(InvalidTransactionId);
+}
+
+void
+RelationReleaseOldRecoveryLocks(TransactionId removeXid)
+{
+    ListCell   *cell,
+               *prev,
+               *next;
+    LOCKTAG        locktag;
+
+    /*
+     * Release all matching locks.
+     */
+    prev = NULL;
+    for (cell = list_head(RecoveryLockList); cell; cell = next)
+    {
+        xl_rel_lock *lock = (xl_rel_lock *) lfirst(cell);
+        next = lnext(cell);
+
+        if (TransactionIdPrecedes(removeXid, lock->xid))
+        {
+            elog(trace_recovery(DEBUG4),
+                 "releasing recovery lock: xid %u db %d rel %d",
+                 lock->xid, lock->dbOid, lock->relOid);
+            SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
+            if (!LockRelease(&locktag, AccessExclusiveLock, true))
+                elog(trace_recovery(LOG),
+                     "RecoveryLockList contains entry for lock "
+                     "no longer recorded by lock manager "
+                     "xid %u database %d relation %d",
+                     lock->xid, lock->dbOid, lock->relOid);
+            RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev);
+            pfree(lock);
+        }
+        else
+            prev = cell;
+    }
+}
+
+/*
+ * --------------------------------------------------
+ *         Recovery handling for Rmgr RM_RELATION_ID
+ * --------------------------------------------------
+ */
+
+/*
+ * Redo for relation invalidation messages
+ */
+static void
+relation_redo_inval(xl_rel_inval *xlrec)
+{
+    SharedInvalidationMessage *msgs = &(xlrec->msgs[0]);
+    int        nmsgs = xlrec->nmsgs;
+
+    Assert(nmsgs > 0);        /* else we should not have written a record */
+
+    /*
+     * Smack them straight onto the queue and we're done. This is safe
+     * because the only writer of these messages is non-transactional
+     * invalidation.
+     */
+    SendSharedInvalidMessages(msgs, nmsgs);
+}
+
+void
+relation_redo(XLogRecPtr lsn, XLogRecord *record)
+{
+    uint8        info = record->xl_info & ~XLR_INFO_MASK;
+
+    if (info == XLOG_RELATION_INVAL)
+    {
+        xl_rel_inval *xlrec = (xl_rel_inval *) XLogRecGetData(record);
+
+        relation_redo_inval(xlrec);
+    }
+    else if (info == XLOG_RELATION_LOCK)
+    {
+        xl_rel_lock *xlrec = (xl_rel_lock *) XLogRecGetData(record);
+
+        RelationAddRecoveryLock(xlrec->xid, xlrec->dbOid, xlrec->relOid);
+    }
+    else
+        elog(PANIC, "relation_redo: unknown op code %u", info);
+}
+
+static void
+relation_desc_inval(StringInfo buf, xl_rel_inval *xlrec)
+{
+    SharedInvalidationMessage *msgs = &(xlrec->msgs[0]);
+    int                            nmsgs = xlrec->nmsgs;
+
+    appendStringInfo(buf, "nmsgs %d;", nmsgs);
+
+    if (nmsgs > 0)
+    {
+        int i;
+
+        for (i = 0; i < nmsgs; i++)
+        {
+            SharedInvalidationMessage *msg = msgs + i;
+
+            if (msg->id >= 0)
+                appendStringInfo(buf,  "catcache id %d", msg->id);
+            else if (msg->id == SHAREDINVALRELCACHE_ID)
+                appendStringInfo(buf,  "relcache ");
+            else if (msg->id == SHAREDINVALSMGR_ID)
+                appendStringInfo(buf,  "smgr ");
+        }
+    }
+}
+
+void
+relation_desc(StringInfo buf, uint8 xl_info, char *rec)
+{
+    uint8        info = xl_info & ~XLR_INFO_MASK;
+
+    if (info == XLOG_RELATION_INVAL)
+    {
+        xl_rel_inval *xlrec = (xl_rel_inval *) rec;
+
+        appendStringInfo(buf, "inval: ");
+        relation_desc_inval(buf, xlrec);
+    }
+    else if (info == XLOG_RELATION_LOCK)
+    {
+        xl_rel_lock *xlrec = (xl_rel_lock *) rec;
+
+        appendStringInfo(buf, "exclusive relation lock: xid %u db %d rel %d",
+                                xlrec->xid, xlrec->dbOid, xlrec->relOid);
+    }
+    else
+        appendStringInfo(buf, "UNKNOWN");
+}
diff --git a/src/backend/utils/error/elog.c b/src/backend/utils/error/elog.c
index 61751bb..49df836 100644
--- a/src/backend/utils/error/elog.c
+++ b/src/backend/utils/error/elog.c
@@ -2778,3 +2778,21 @@ is_log_level_output(int elevel, int log_min_level)

     return false;
 }
+
+/*
+ * If trace_recovery_messages is set to make this visible, then show as LOG,
+ * else display as whatever level is set. It may still be shown, but only
+ * if log_min_messages is set lower than trace_recovery_messages.
+ *
+ * Intention is to keep this for at least the whole of the 8.5 production
+ * release, so we can more easily diagnose production problems in the field.
+ */
+int
+trace_recovery(int trace_level)
+{
+    if (trace_level < LOG &&
+        trace_level >= trace_recovery_messages)
+            return LOG;
+
+    return trace_level;
+}
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index 4ff19f0..65dfba4 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -481,7 +481,7 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
      */
     MyBackendId = InvalidBackendId;

-    SharedInvalBackendInit();
+    SharedInvalBackendInit(false);

     if (MyBackendId > MaxBackends || MyBackendId <= 0)
         elog(FATAL, "bad backend id: %d", MyBackendId);
@@ -495,13 +495,6 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
     InitBufferPoolBackend();

     /*
-     * Initialize local process's access to XLOG.  In bootstrap case we may
-     * skip this since StartupXLOG() was run instead.
-     */
-    if (!bootstrap)
-        InitXLOGAccess();
-
-    /*
      * Initialize the relation cache and the system catalog caches.  Note that
      * no catalog access happens here; we only set up the hashtable structure.
      * We must do this before starting a transaction because transaction abort
@@ -549,6 +542,16 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
      */
     if (!bootstrap)
     {
+        /*
+         * If we are connecting during recovery, make sure the initial
+         * transaction is read only and force all subsequent transactions
+         * that way also. RecoveryInProgress() initializes local process's
+         * access to XLOG, if appropriate at this time.  In bootstrap case
+         * we may skip this since StartupXLOG() was run instead.
+         */
+        if (RecoveryInProgress())
+            SetConfigOption("default_transaction_read_only", "true",
+                                PGC_POSTMASTER, PGC_S_OVERRIDE);
         StartTransactionCommand();
         (void) GetTransactionSnapshot();
     }
@@ -625,7 +628,7 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
      */
     if (!bootstrap)
         LockSharedObject(DatabaseRelationId, MyDatabaseId, 0,
-                         RowExclusiveLock);
+                (RecoveryInProgress() ? AccessShareLock : RowExclusiveLock));

     /*
      * Recheck pg_database to make sure the target database hasn't gone away.
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index a0a6605..2244a91 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -114,6 +114,8 @@ extern char *temp_tablespaces;
 extern bool synchronize_seqscans;
 extern bool fullPageWrites;

+int    trace_recovery_messages = LOG;
+
 #ifdef TRACE_SORT
 extern bool trace_sort;
 #endif
@@ -2661,6 +2663,16 @@ static struct config_enum ConfigureNamesEnum[] =
     },

     {
+        {"trace_recovery_messages", PGC_SUSET, LOGGING_WHEN,
+            gettext_noop("Sets the message levels that are logged during recovery."),
+            gettext_noop("Each level includes all the levels that follow it. The later"
+                         " the level, the fewer messages are sent.")
+        },
+        &trace_recovery_messages,
+        DEBUG1, server_message_level_options, NULL, NULL
+    },
+
+    {
         {"track_functions", PGC_SUSET, STATS_COLLECTOR,
             gettext_noop("Collects function-level statistics on database activity."),
             NULL
@@ -5482,8 +5494,19 @@ ExecSetVariableStmt(VariableSetStmt *stmt)
                         SetPGVariable("transaction_isolation",
                                       list_make1(item->arg), stmt->is_local);
                     else if (strcmp(item->defname, "transaction_read_only") == 0)
+                    {
+                        A_Const       *con;
+
+                        Assert(IsA(item->arg, A_Const));
+                        con = (A_Const *) item->arg;
+                        Assert(nodeTag(&con->val) == T_Integer);
+
+                        if (!intVal(&con->val))
+                            PreventCommandDuringRecovery();
+
                         SetPGVariable("transaction_read_only",
                                       list_make1(item->arg), stmt->is_local);
+                    }
                     else
                         elog(ERROR, "unexpected SET TRANSACTION element: %s",
                              item->defname);
@@ -5501,8 +5524,19 @@ ExecSetVariableStmt(VariableSetStmt *stmt)
                         SetPGVariable("default_transaction_isolation",
                                       list_make1(item->arg), stmt->is_local);
                     else if (strcmp(item->defname, "transaction_read_only") == 0)
+                    {
+                        A_Const       *con;
+
+                        Assert(IsA(item->arg, A_Const));
+                        con = (A_Const *) item->arg;
+                        Assert(nodeTag(&con->val) == T_Integer);
+
+                        if (!intVal(&con->val))
+                            PreventCommandDuringRecovery();
+
                         SetPGVariable("default_transaction_read_only",
                                       list_make1(item->arg), stmt->is_local);
+                    }
                     else
                         elog(ERROR, "unexpected SET SESSION element: %s",
                              item->defname);
diff --git a/src/backend/utils/time/tqual.c b/src/backend/utils/time/tqual.c
index a73db27..83472b2 100644
--- a/src/backend/utils/time/tqual.c
+++ b/src/backend/utils/time/tqual.c
@@ -1257,42 +1257,84 @@ XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
         return true;

     /*
-     * If the snapshot contains full subxact data, the fastest way to check
-     * things is just to compare the given XID against both subxact XIDs and
-     * top-level XIDs.    If the snapshot overflowed, we have to use pg_subtrans
-     * to convert a subxact XID to its parent XID, but then we need only look
-     * at top-level XIDs not subxacts.
+     * Snapshot information is stored slightly differently in snapshots
+     * taken during recovery.
      */
-    if (snapshot->subxcnt >= 0)
+    if (!snapshot->takenDuringRecovery)
     {
-        /* full data, so search subxip */
-        int32        j;
+        /*
+         * If the snapshot contains full subxact data, the fastest way to check
+         * things is just to compare the given XID against both subxact XIDs and
+         * top-level XIDs.    If the snapshot overflowed, we have to use pg_subtrans
+         * to convert a subxact XID to its parent XID, but then we need only look
+         * at top-level XIDs not subxacts.
+         */
+        if (snapshot->subxcnt >= 0)
+        {
+            /* full data, so search subxip */
+            int32        j;

-        for (j = 0; j < snapshot->subxcnt; j++)
+            for (j = 0; j < snapshot->subxcnt; j++)
+            {
+                if (TransactionIdEquals(xid, snapshot->subxip[j]))
+                    return true;
+            }
+
+            /* not there, fall through to search xip[] */
+        }
+        else
         {
-            if (TransactionIdEquals(xid, snapshot->subxip[j]))
-                return true;
+            /* overflowed, so convert xid to top-level */
+            xid = SubTransGetTopmostTransaction(xid);
+
+            /*
+             * If xid was indeed a subxact, we might now have an xid < xmin, so
+             * recheck to avoid an array scan.    No point in rechecking xmax.
+             */
+            if (TransactionIdPrecedes(xid, snapshot->xmin))
+                return false;
         }

-        /* not there, fall through to search xip[] */
+        for (i = 0; i < snapshot->xcnt; i++)
+        {
+            if (TransactionIdEquals(xid, snapshot->xip[i]))
+                return true;
+        }
     }
     else
     {
-        /* overflowed, so convert xid to top-level */
-        xid = SubTransGetTopmostTransaction(xid);
+        int32        j;

         /*
-         * If xid was indeed a subxact, we might now have an xid < xmin, so
-         * recheck to avoid an array scan.    No point in rechecking xmax.
+         * In recovery we store all xids in the subxact array because it
+         * is by far the bigger array, and we mostly don't know which xids
+         * are top-level and which are subxacts. The xip array is empty.
+         *
+         * We start by searching subtrans, if we overflowed.
          */
-        if (TransactionIdPrecedes(xid, snapshot->xmin))
-            return false;
-    }
+        if (snapshot->subxcnt < 0)
+        {
+            /* overflowed, so convert xid to top-level */
+            xid = SubTransGetTopmostTransaction(xid);

-    for (i = 0; i < snapshot->xcnt; i++)
-    {
-        if (TransactionIdEquals(xid, snapshot->xip[i]))
-            return true;
+            /*
+             * If xid was indeed a subxact, we might now have an xid < xmin, so
+             * recheck to avoid an array scan.    No point in rechecking xmax.
+             */
+            if (TransactionIdPrecedes(xid, snapshot->xmin))
+                return false;
+        }
+
+        /*
+         * We now have either a top-level xid higher than xmin or an
+         * indeterminate xid. We don't know whether it's top level or subxact
+         * but it doesn't matter. If it's present, the xid is visible.
+         */
+        for (j = 0; j < snapshot->subxcnt; j++)
+        {
+            if (TransactionIdEquals(xid, snapshot->subxip[j]))
+                return true;
+        }
     }

     return false;
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index f8395fe..906bf49 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -130,11 +130,13 @@ extern XLogRecPtr log_heap_move(Relation reln, Buffer oldbuf,
               ItemPointerData from,
               Buffer newbuf, HeapTuple newtup,
               bool all_visible_cleared, bool new_all_visible_cleared);
+extern XLogRecPtr log_heap_cleanup_info(RelFileNode rnode,
+                TransactionId latestRemovedXid);
 extern XLogRecPtr log_heap_clean(Relation reln, Buffer buffer,
                OffsetNumber *redirected, int nredirected,
                OffsetNumber *nowdead, int ndead,
                OffsetNumber *nowunused, int nunused,
-               bool redirect_move);
+               TransactionId latestRemovedXid, bool redirect_move);
 extern XLogRecPtr log_heap_freeze(Relation reln, Buffer buffer,
                 TransactionId cutoff_xid,
                 OffsetNumber *offsets, int offcnt);
diff --git a/src/include/access/htup.h b/src/include/access/htup.h
index f271cbc..c846123 100644
--- a/src/include/access/htup.h
+++ b/src/include/access/htup.h
@@ -580,6 +580,7 @@ typedef HeapTupleData *HeapTuple;
 #define XLOG_HEAP2_FREEZE        0x00
 #define XLOG_HEAP2_CLEAN        0x10
 #define XLOG_HEAP2_CLEAN_MOVE    0x20
+#define XLOG_HEAP2_CLEANUP_INFO 0x30

 /*
  * All what we need to find changed tuple
@@ -668,6 +669,7 @@ typedef struct xl_heap_clean
 {
     RelFileNode node;
     BlockNumber block;
+    TransactionId    latestRemovedXid;
     uint16        nredirected;
     uint16        ndead;
     /* OFFSET NUMBERS FOLLOW */
@@ -675,6 +677,19 @@ typedef struct xl_heap_clean

 #define SizeOfHeapClean (offsetof(xl_heap_clean, ndead) + sizeof(uint16))

+/*
+ * Cleanup_info is required in some cases during a lazy VACUUM.
+ * Used for reporting the results of HeapTupleHeaderAdvanceLatestRemovedXid()
+ * see vacuumlazy.c for full explanation
+ */
+typedef struct xl_heap_cleanup_info
+{
+    RelFileNode     node;
+    TransactionId    latestRemovedXid;
+} xl_heap_cleanup_info;
+
+#define SizeOfHeapCleanupInfo (sizeof(xl_heap_cleanup_info))
+
 /* This is for replacing a page's contents in toto */
 /* NB: this is used for indexes as well as heaps */
 typedef struct xl_heap_newpage
@@ -718,6 +733,9 @@ typedef struct xl_heap_freeze

 #define SizeOfHeapFreeze (offsetof(xl_heap_freeze, cutoff_xid) + sizeof(TransactionId))

+extern void HeapTupleHeaderAdvanceLatestRemovedXid(HeapTupleHeader tuple,
+                                        TransactionId *latestRemovedXid);
+
 /* HeapTupleHeader functions implemented in utils/time/combocid.c */
 extern CommandId HeapTupleHeaderGetCmin(HeapTupleHeader tup);
 extern CommandId HeapTupleHeaderGetCmax(HeapTupleHeader tup);
diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h
index cddf3e8..91b4e23 100644
--- a/src/include/access/nbtree.h
+++ b/src/include/access/nbtree.h
@@ -214,12 +214,13 @@ typedef struct BTMetaPageData
 #define XLOG_BTREE_SPLIT_R        0x40    /* as above, new item on right */
 #define XLOG_BTREE_SPLIT_L_ROOT 0x50    /* add tuple with split of root */
 #define XLOG_BTREE_SPLIT_R_ROOT 0x60    /* as above, new item on right */
-#define XLOG_BTREE_DELETE        0x70    /* delete leaf index tuple */
+#define XLOG_BTREE_DELETE        0x70    /* delete leaf index tuples for a page */
 #define XLOG_BTREE_DELETE_PAGE    0x80    /* delete an entire page */
 #define XLOG_BTREE_DELETE_PAGE_META 0x90        /* same, and update metapage */
 #define XLOG_BTREE_NEWROOT        0xA0    /* new root page */
 #define XLOG_BTREE_DELETE_PAGE_HALF 0xB0        /* page deletion that makes
                                                  * parent half-dead */
+#define XLOG_BTREE_VACUUM        0xC0    /* delete entries on a page during vacuum */

 /*
  * All that we need to find changed index tuple
@@ -306,16 +307,53 @@ typedef struct xl_btree_split
 /*
  * This is what we need to know about delete of individual leaf index tuples.
  * The WAL record can represent deletion of any number of index tuples on a
- * single index page.
+ * single index page when *not* executed by VACUUM.
  */
 typedef struct xl_btree_delete
 {
     RelFileNode node;
     BlockNumber block;
+    TransactionId    latestRemovedXid;
+    int            numItems;         /* number of items in the offset array */
+
     /* TARGET OFFSET NUMBERS FOLLOW AT THE END */
 } xl_btree_delete;

-#define SizeOfBtreeDelete    (offsetof(xl_btree_delete, block) + sizeof(BlockNumber))
+#define SizeOfBtreeDelete    (offsetof(xl_btree_delete, latestRemovedXid) + sizeof(TransactionId))
+
+/*
+ * This is what we need to know about vacuum of individual leaf index tuples.
+ * The WAL record can represent deletion of any number of index tuples on a
+ * single index page when executed by VACUUM.
+ *
+ * The correctness requirement for applying these changes during recovery is
+ * that we must do one of these two things for every block in the index:
+ *         * lock the block for cleanup and apply any required changes
+ *        * EnsureBlockUnpinned()
+ * The purpose of this is to ensure that no index scans started before we
+ * finish scanning the index are still running by the time we begin to remove
+ * heap tuples.
+ *
+ * Any changes to any one block are registered on just one WAL record. All
+ * blocks that we need to run EnsureBlockUnpinned() before we touch the changed
+ * block are also given on this record as a variable length array. The array
+ * is compressed by way of storing an array of block ranges, rather than an
+ * actual array of blockids.
+ *
+ * Note that the *last* WAL record in any vacuum of an index is allowed to
+ * have numItems == 0. All other WAL records must have numItems > 0.
+ */
+typedef struct xl_btree_vacuum
+{
+    RelFileNode node;
+    BlockNumber block;
+    BlockNumber lastBlockVacuumed;
+    int            numItems;         /* number of items in the offset array */
+
+    /* TARGET OFFSET NUMBERS FOLLOW */
+} xl_btree_vacuum;
+
+#define SizeOfBtreeVacuum    (offsetof(xl_btree_vacuum, lastBlockVacuumed) + sizeof(BlockNumber))

 /*
  * This is what we need to know about deletion of a btree page.  The target
@@ -537,7 +575,8 @@ extern void _bt_relbuf(Relation rel, Buffer buf);
 extern void _bt_pageinit(Page page, Size size);
 extern bool _bt_page_recyclable(Page page);
 extern void _bt_delitems(Relation rel, Buffer buf,
-             OffsetNumber *itemnos, int nitems);
+             OffsetNumber *itemnos, int nitems, bool isVacuum,
+             BlockNumber lastBlockVacuumed);
 extern int _bt_pagedel(Relation rel, Buffer buf,
             BTStack stack, bool vacuum_full);

diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h
index 47b95c2..55cb8d3 100644
--- a/src/include/access/relscan.h
+++ b/src/include/access/relscan.h
@@ -68,6 +68,7 @@ typedef struct IndexScanDescData
     /* signaling to index AM about killing index tuples */
     bool        kill_prior_tuple;        /* last-returned tuple is dead */
     bool        ignore_killed_tuples;    /* do not return killed entries */
+    bool        xactStartedInRecovery;    /* prevents killing/seeing killed tuples */

     /* index access method's private state */
     void       *opaque;            /* access-method-specific info */
diff --git a/src/include/access/rmgr.h b/src/include/access/rmgr.h
index 5702f5f..8ab1148 100644
--- a/src/include/access/rmgr.h
+++ b/src/include/access/rmgr.h
@@ -23,6 +23,7 @@ typedef uint8 RmgrId;
 #define RM_DBASE_ID                4
 #define RM_TBLSPC_ID            5
 #define RM_MULTIXACT_ID            6
+#define RM_RELATION_ID            8
 #define RM_HEAP2_ID                9
 #define RM_HEAP_ID                10
 #define RM_BTREE_ID                11
diff --git a/src/include/access/transam.h b/src/include/access/transam.h
index 8ce30e3..be4ee9f 100644
--- a/src/include/access/transam.h
+++ b/src/include/access/transam.h
@@ -129,6 +129,9 @@ typedef VariableCacheData *VariableCache;
  * ----------------
  */

+/* in transam/xact.c */
+extern bool TransactionStartedDuringRecovery(void);
+
 /* in transam/varsup.c */
 extern PGDLLIMPORT VariableCache ShmemVariableCache;

diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h
index 64dba0c..34595ad 100644
--- a/src/include/access/twophase.h
+++ b/src/include/access/twophase.h
@@ -41,6 +41,7 @@ extern void EndPrepare(GlobalTransaction gxact);

 extern TransactionId PrescanPreparedTransactions(void);
 extern void RecoverPreparedTransactions(void);
+extern TransactionId *StandbyRecoverPreparedTransactions(int *nxids_p);

 extern void RecreateTwoPhaseFile(TransactionId xid, void *content, int len);
 extern void RemoveTwoPhaseFile(TransactionId xid, bool giveWarning);
diff --git a/src/include/access/twophase_rmgr.h b/src/include/access/twophase_rmgr.h
index 20c2b2f..d1faac9 100644
--- a/src/include/access/twophase_rmgr.h
+++ b/src/include/access/twophase_rmgr.h
@@ -23,14 +23,14 @@ typedef uint8 TwoPhaseRmgrId;
  */
 #define TWOPHASE_RM_END_ID            0
 #define TWOPHASE_RM_LOCK_ID            1
-#define TWOPHASE_RM_INVAL_ID        2
-#define TWOPHASE_RM_NOTIFY_ID        3
-#define TWOPHASE_RM_PGSTAT_ID        4
+#define TWOPHASE_RM_NOTIFY_ID        2
+#define TWOPHASE_RM_PGSTAT_ID        3
 #define TWOPHASE_RM_MAX_ID            TWOPHASE_RM_PGSTAT_ID

 extern const TwoPhaseCallback twophase_recover_callbacks[];
 extern const TwoPhaseCallback twophase_postcommit_callbacks[];
 extern const TwoPhaseCallback twophase_postabort_callbacks[];
+extern const TwoPhaseCallback twophase_standby_recover_callbacks[];


 extern void RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 34ceb66..c23ba23 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -17,6 +17,7 @@
 #include "access/xlog.h"
 #include "nodes/pg_list.h"
 #include "storage/relfilenode.h"
+#include "utils/snapshot.h"
 #include "utils/timestamp.h"


@@ -84,19 +85,54 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
 #define XLOG_XACT_ABORT                0x20
 #define XLOG_XACT_COMMIT_PREPARED    0x30
 #define XLOG_XACT_ABORT_PREPARED    0x40
+#define XLOG_XACT_ASSIGNMENT        0x50
+#define XLOG_XACT_RUNNING_XACTS        0x60
+/* 0x70 can also be used, if required */
+
+typedef struct xl_xact_assignment
+{
+    TransactionId    xtop;        /* assigned xids top-level xid, if any */
+    int                nsubxacts;    /* number of subtransaction XIDs */
+    TransactionId    xsub[1];    /* assigned subxids */
+} xl_xact_assignment;
+
+#define MinSizeOfXactAssignment offsetof(xl_xact_assignment, xsub)
+
+/*
+ * xl_xact_running_xacts is in utils/snapshot.h so it can be passed
+ * around to the same places as snapshots. Not snapmgr.h
+ */

 typedef struct xl_xact_commit
 {
     TimestampTz xact_time;        /* time of commit */
+    uint32        xinfo;            /* info flags */
     int            nrels;            /* number of RelFileNodes */
     int            nsubxacts;        /* number of subtransaction XIDs */
+    int            nmsgs;            /* number of shared inval msgs */
     /* Array of RelFileNode(s) to drop at commit */
     RelFileNode xnodes[1];        /* VARIABLE LENGTH ARRAY */
     /* ARRAY OF COMMITTED SUBTRANSACTION XIDs FOLLOWS */
+    /* ARRAY OF SHARED INVALIDATION MESSAGES FOLLOWS */
 } xl_xact_commit;

 #define MinSizeOfXactCommit offsetof(xl_xact_commit, xnodes)

+/*
+ * These flags are set in the xinfo fields of WAL commit records,
+ * indicating a variety of additional actions that need to occur
+ * when emulating transaction effects during recovery.
+ * They are named XactCompletion... to differentiate them from
+ * EOXact... routines which run at the end of the original
+ * transaction completion.
+ */
+#define XACT_COMPLETION_UPDATE_RELCACHE_FILE    0x01
+#define XACT_COMPLETION_VACUUM_FULL    0x02
+
+/* Access macros for above flags */
+#define XactCompletionRelcacheInitFileInval(xlrec)    ((xlrec)->xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE)
+#define XactCompletionVacuumFull(xlrec)    ((xlrec)->xinfo & XACT_COMPLETION_VACUUM_FULL)
+
 typedef struct xl_xact_abort
 {
     TimestampTz xact_time;        /* time of abort */
@@ -106,6 +142,7 @@ typedef struct xl_xact_abort
     RelFileNode xnodes[1];        /* VARIABLE LENGTH ARRAY */
     /* ARRAY OF ABORTED SUBTRANSACTION XIDs FOLLOWS */
 } xl_xact_abort;
+/* Note the intentional lack of an invalidation message array c.f. commit */

 #define MinSizeOfXactAbort offsetof(xl_xact_abort, xnodes)

@@ -181,10 +218,14 @@ extern void UnregisterXactCallback(XactCallback callback, void *arg);
 extern void RegisterSubXactCallback(SubXactCallback callback, void *arg);
 extern void UnregisterSubXactCallback(SubXactCallback callback, void *arg);

-extern TransactionId RecordTransactionCommit(void);
+extern TransactionId RecordTransactionCommit(bool isVacuumFull);

 extern int    xactGetCommittedChildren(TransactionId **ptr);

+extern XLogRecPtr LogCurrentRunningXacts(RunningTransactions CurrRunningXacts);
+extern void InitRecoveryTransactionEnvironment(void);
+extern void XactClearRecoveryTransactions(void);
+
 extern void xact_redo(XLogRecPtr lsn, XLogRecord *record);
 extern void xact_desc(StringInfo buf, uint8 xl_info, char *rec);

diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index ee21c9f..57b17e0 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -18,6 +18,8 @@
 #include "utils/pg_crc.h"
 #include "utils/timestamp.h"

+/* Handy constant for an invalid xlog recptr */
+static const XLogRecPtr InvalidXLogRecPtr = {0, 0};

 /*
  * The overall layout of an XLOG record is:
@@ -133,7 +135,45 @@ typedef struct XLogRecData
 } XLogRecData;

 extern TimeLineID ThisTimeLineID;        /* current TLI */
+
+/*
+ * Prior to 8.4, all activity during recovery was carried out by Startup
+ * process. This local variable continues to be used in many parts of the
+ * code to indicate actions taken by RecoveryManagers. Other processes who
+ * potentially perform work during recovery should check RecoveryInProgress()
+ * see XLogCtl notes in xlog.c
+ */
 extern bool InRecovery;
+
+/*
+ * Like InRecovery, standbyState is only valid in the startup process.
+ *
+ * In DISABLED state, we're performing crash recovery or hot standby was
+ * disabled in recovery.conf.
+ *
+ * In UNINITIALIZED state, we haven't yet received a RUNNING_XACTS or shutdown
+ * checkpoint record to initialize our master transaction tracking system.
+ *
+ * When the transaction tracking is initialized, we enter the SNAPSHOT_PENDING
+ * state. The tracked information might still be incomplete, so we can't let
+ * backens in yet, but redo functions need to update the in-memory state when
+ * appropriate.
+ *
+ * In SNAPSHOT_READY mode, we have full knowledge of transactions that are
+ * (or were) running in the master at the current WAL location. Snapshots
+ * can be taken, and read-only queries can be run.
+ */
+typedef enum
+{
+    STANDBY_DISABLED,
+    STANDBY_UNINITIALIZED,
+    STANDBY_SNAPSHOT_PENDING,
+    STANDBY_READY
+} HotStandbyState;
+extern HotStandbyState standbyState;
+
+#define InHotStandby (standbyState >= STANDBY_SNAPSHOT_PENDING)
+
 extern XLogRecPtr XactLastRecEnd;

 /* these variables are GUC parameters related to XLOG */
@@ -203,6 +243,8 @@ extern void xlog_desc(StringInfo buf, uint8 xl_info, char *rec);

 extern bool RecoveryInProgress(void);
 extern bool XLogInsertAllowed(void);
+extern TimestampTz GetLatestXLogTime(void);
+extern int GetMaxStandbyDelay(void);

 extern void UpdateControlFile(void);
 extern Size XLOGShmemSize(void);
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index 5675bfb..b472190 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -71,7 +71,7 @@ typedef struct XLogContRecord
 /*
  * Each page of XLOG file has a header like this:
  */
-#define XLOG_PAGE_MAGIC 0xD063    /* can be used as WAL version indicator */
+#define XLOG_PAGE_MAGIC 0xD166    /* can be used as WAL version indicator */

 typedef struct XLogPageHeaderData
 {
@@ -255,5 +255,6 @@ extern Datum pg_current_xlog_location(PG_FUNCTION_ARGS);
 extern Datum pg_current_xlog_insert_location(PG_FUNCTION_ARGS);
 extern Datum pg_xlogfile_name_offset(PG_FUNCTION_ARGS);
 extern Datum pg_xlogfile_name(PG_FUNCTION_ARGS);
+extern Datum pg_is_in_recovery(PG_FUNCTION_ARGS);

 #endif   /* XLOG_INTERNAL_H */
diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h
index 860f34f..785f59a 100644
--- a/src/include/catalog/pg_control.h
+++ b/src/include/catalog/pg_control.h
@@ -40,6 +40,7 @@ typedef struct CheckPoint
     TransactionId oldestXid;    /* cluster-wide minimum datfrozenxid */
     Oid            oldestXidDB;    /* database with minimum datfrozenxid */
     pg_time_t    time;            /* time stamp of checkpoint */
+    int        MaxConnections; /* MaxConnections at time of checkpoint */
 } CheckPoint;

 /* XLOG info values for XLOG rmgr */
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 90111e9..f3536e6 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -3283,6 +3283,9 @@ DESCR("xlog filename and byte offset, given an xlog location");
 DATA(insert OID = 2851 ( pg_xlogfile_name            PGNSP PGUID 12 1 0 0 f f f t f i 1 0 25 "25" _null_ _null_ _null_
_null_pg_xlogfile_name _null_ _null_ _null_ )); 
 DESCR("xlog filename, given an xlog location");

+DATA(insert OID = 3810 (  pg_is_in_recovery     PGNSP PGUID 12 1 0 0 f f f t f v 0 0 16 "" _null_ _null_ _null_ _null_
pg_is_in_recovery_null_ _null_ _null_ )); 
+DESCR("true if server is in recovery");
+
 DATA(insert OID = 2621 ( pg_reload_conf            PGNSP PGUID 12 1 0 0 f f f t f v 0 0 16 "" _null_ _null_ _null_
_null_pg_reload_conf _null_ _null_ _null_ )); 
 DESCR("reload configuration files");
 DATA(insert OID = 2622 ( pg_rotate_logfile        PGNSP PGUID 12 1 0 0 f f f t f v 0 0 16 "" _null_ _null_ _null_
_null_pg_rotate_logfile _null_ _null_ _null_ )); 
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index b28f06b..1c9a73a 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -236,6 +236,12 @@ extern bool VacuumCostActive;
 /* in tcop/postgres.c */
 extern void check_stack_depth(void);

+/* in tcop/utility.c */
+extern void PreventCommandDuringRecovery(void);
+
+/* in utils/misc/guc.c */
+extern int trace_recovery_messages;
+int trace_recovery(int trace_level);

 /*****************************************************************************
  *      pdir.h --                                                                 *
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index d06eb77..4f9d020 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -201,6 +201,10 @@ extern bool ConditionalLockBuffer(Buffer buffer);
 extern void LockBufferForCleanup(Buffer buffer);
 extern bool ConditionalLockBufferForCleanup(Buffer buffer);

+extern void StartCleanupDelayStats(void);
+extern void EndCleanupDelayStats(void);
+extern void ReportCleanupDelayStats(void);
+
 extern void AbortBufferIO(void);

 extern void BufmgrCommit(void);
diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h
index 7231aa3..949d466 100644
--- a/src/include/storage/lock.h
+++ b/src/include/storage/lock.h
@@ -16,6 +16,7 @@

 #include "storage/backendid.h"
 #include "storage/lwlock.h"
+#include "storage/sinval.h"
 #include "storage/shmem.h"


@@ -494,6 +495,7 @@ extern void GrantAwaitedLock(void);
 extern void RemoveFromWaitQueue(PGPROC *proc, uint32 hashcode);
 extern Size LockShmemSize(void);
 extern LockData *GetLockStatusData(void);
+extern int GetRunningTransactionLocks(xl_rel_lock *loggableLocks, int maximum);
 extern const char *GetLockmodeName(LOCKMETHODID lockmethodid, LOCKMODE mode);

 extern void lock_twophase_recover(TransactionId xid, uint16 info,
@@ -502,6 +504,8 @@ extern void lock_twophase_postcommit(TransactionId xid, uint16 info,
                          void *recdata, uint32 len);
 extern void lock_twophase_postabort(TransactionId xid, uint16 info,
                         void *recdata, uint32 len);
+extern void lock_twophase_standby_recover(TransactionId xid, uint16 info,
+                      void *recdata, uint32 len);

 extern DeadLockState DeadLockCheck(PGPROC *proc);
 extern PGPROC *GetBlockingAutoVacuumPgproc(void);
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index e389c61..b07fc21 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -67,6 +67,7 @@ typedef enum LWLockId
     AutovacuumLock,
     AutovacuumScheduleLock,
     SyncScanLock,
+    RecoveryInfoLock,
     /* Individual lock IDs end here */
     FirstBufMappingLock,
     FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS,
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 26f9fdd..8b491e6 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -95,6 +95,13 @@ struct PGPROC

     uint8        vacuumFlags;    /* vacuum-related flags, see above */

+    /*
+     * While in hot standby mode, setting recoveryConflictMode instructs
+     * the backend to commit suicide. Possible values are the same as those
+     * passed to ResolveRecoveryConflictWithVirtualXIDs().
+     */
+    int            recoveryConflictMode;
+
     /* Info about LWLock the process is currently waiting for, if any. */
     bool        lwWaiting;        /* true if waiting for an LW lock */
     bool        lwExclusive;    /* true if waiting for exclusive access */
@@ -135,6 +142,9 @@ typedef struct PROC_HDR
     PGPROC       *autovacFreeProcs;
     /* Current shared estimate of appropriate spins_per_delay value */
     int            spins_per_delay;
+    /* The proc of the Startup process, since not in ProcArray */
+    PGPROC       *startupProc;
+    int       startupProcPid;
 } PROC_HDR;

 /*
@@ -165,6 +175,9 @@ extern void InitProcGlobal(void);
 extern void InitProcess(void);
 extern void InitProcessPhase2(void);
 extern void InitAuxiliaryProcess(void);
+
+extern void PublishStartupProcessInformation(void);
+
 extern bool HaveNFreeProcs(int n);
 extern void ProcReleaseLocks(bool isCommit);

diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index 1e7f0e5..c6156f0 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -14,6 +14,7 @@
 #ifndef PROCARRAY_H
 #define PROCARRAY_H

+#include "access/xact.h"
 #include "storage/lock.h"
 #include "utils/snapshot.h"

@@ -25,6 +26,21 @@ extern void ProcArrayRemove(PGPROC *proc, TransactionId latestXid);

 extern void ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid);
 extern void ProcArrayClearTransaction(PGPROC *proc);
+extern void ProcArrayApplyRecoveryInfo(XLogRecPtr lsn,
+                           RunningTransactions running);
+extern void ProcArrayApplyXidAssignment(TransactionId topxid,
+                            int nsubxids, TransactionId *subxids);
+
+extern void RecordKnownAssignedTransactionIds(TransactionId xid);
+extern void ExpireTreeKnownAssignedTransactionIds(TransactionId xid,
+                                      int nsubxids, TransactionId *subxids);
+extern void ExpireAllKnownAssignedTransactionIds(void);
+extern void ExpireOldKnownAssignedTransactionIds(TransactionId xid);
+extern int  GetKnownAssignedTransactions(TransactionId *xarray,
+                                        TransactionId *xmin, TransactionId xmax,
+                                        bool *overflow);
+
+extern void GetRunningTransactionData(void);

 extern Snapshot GetSnapshotData(Snapshot snapshot);

@@ -42,6 +58,11 @@ extern bool IsBackendPid(int pid);
 extern VirtualTransactionId *GetCurrentVirtualXIDs(TransactionId limitXmin,
                       bool excludeXmin0, bool allDbs, int excludeVacuum,
                       int *nvxids);
+extern VirtualTransactionId *GetConflictingVirtualXIDs(TransactionId limitXmin,
+                    Oid dbOid, bool skipExistingConflicts);
+extern pid_t CancelVirtualTransaction(VirtualTransactionId vxid,
+                         int cancel_mode);
+
 extern int    CountActiveBackends(void);
 extern int    CountDBBackends(Oid databaseid);
 extern int    CountUserBackends(Oid roleid);
diff --git a/src/include/storage/sinval.h b/src/include/storage/sinval.h
index b30805e..850bed6 100644
--- a/src/include/storage/sinval.h
+++ b/src/include/storage/sinval.h
@@ -14,6 +14,7 @@
 #ifndef SINVAL_H
 #define SINVAL_H

+#include "access/xlog.h"
 #include "storage/itemptr.h"
 #include "storage/relfilenode.h"

@@ -100,4 +101,44 @@ extern void HandleCatchupInterrupt(void);
 extern void EnableCatchupInterrupt(void);
 extern bool DisableCatchupInterrupt(void);

+extern int xactGetCommittedInvalidationMessages(SharedInvalidationMessage **msgs,
+                                        bool *RelcacheInitFileInval);
+
+/*
+ * Relation Rmgr (RM_RELATION_ID)
+ *
+ * Relation recovery manager exists to allow locks and certain kinds of
+ * invalidation message to be passed across to a standby server.
+ */
+extern void RelationReleaseRecoveryLockTree(TransactionId xid,
+                                        int nsubxids, TransactionId *subxids);
+extern void RelationReleaseAllRecoveryLocks(void);
+extern void RelationReleaseOldRecoveryLocks(TransactionId removeXid);
+
+
+/* Recovery handlers for the Relation Rmgr (RM_RELATION_ID) */
+extern void relation_redo(XLogRecPtr lsn, XLogRecord *record);
+extern void relation_desc(StringInfo buf, uint8 xl_info, char *rec);
+
+/*
+ * XLOG message types
+ */
+#define XLOG_RELATION_INVAL            0x00
+#define XLOG_RELATION_LOCK            0x10
+
+typedef struct xl_rel_inval
+{
+    int                            nmsgs;        /* number of shared inval msgs */
+    SharedInvalidationMessage    msgs[1];    /* VARIABLE LENGTH ARRAY */
+} xl_rel_inval;
+
+#define MinSizeOfRelationInval offsetof(xl_rel_inval, msgs)
+
+typedef struct xl_rel_lock
+{
+    TransactionId    xid;    /* xid of holder of AccessExclusiveLock */
+    Oid        dbOid;
+    Oid        relOid;
+} xl_rel_lock;
+
 #endif   /* SINVAL_H */
diff --git a/src/include/storage/sinvaladt.h b/src/include/storage/sinvaladt.h
index 6fd71ea..3a02259 100644
--- a/src/include/storage/sinvaladt.h
+++ b/src/include/storage/sinvaladt.h
@@ -29,7 +29,7 @@
  */
 extern Size SInvalShmemSize(void);
 extern void CreateSharedInvalidationState(void);
-extern void SharedInvalBackendInit(void);
+extern void SharedInvalBackendInit(bool sendOnly);
 extern bool BackendIdIsActive(int backendID);

 extern void SIInsertDataEntries(const SharedInvalidationMessage *data, int n);
diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h
index 697ece4..b8fea2f 100644
--- a/src/include/utils/builtins.h
+++ b/src/include/utils/builtins.h
@@ -730,6 +730,7 @@ extern Datum xidrecv(PG_FUNCTION_ARGS);
 extern Datum xidsend(PG_FUNCTION_ARGS);
 extern Datum xideq(PG_FUNCTION_ARGS);
 extern Datum xid_age(PG_FUNCTION_ARGS);
+extern int xidComparator(const void *arg1, const void *arg2);
 extern Datum cidin(PG_FUNCTION_ARGS);
 extern Datum cidout(PG_FUNCTION_ARGS);
 extern Datum cidrecv(PG_FUNCTION_ARGS);
diff --git a/src/include/utils/inval.h b/src/include/utils/inval.h
index 42fd8ba..ae1685e 100644
--- a/src/include/utils/inval.h
+++ b/src/include/utils/inval.h
@@ -15,6 +15,8 @@
 #define INVAL_H

 #include "access/htup.h"
+#include "access/xact.h"
+#include "storage/lock.h"
 #include "utils/relcache.h"


@@ -57,7 +59,20 @@ extern void CacheRegisterSyscacheCallback(int cacheid,
 extern void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
                               Datum arg);

+extern void RelationAddRecoveryLock(TransactionId xid, Oid dbOid, Oid relOid);
+
 extern void inval_twophase_postcommit(TransactionId xid, uint16 info,
                           void *recdata, uint32 len);

+/* cancel modes for ResolveRecoveryConflictWithVirtualXIDs */
+#define CONFLICT_MODE_NOT_SET            0x00 /* XXX explanations here */
+#define CONFLICT_MODE_ERROR_DEFERRABLE    0x01
+#define CONFLICT_MODE_ERROR_IF_NOT_IDLE    0x02
+#define CONFLICT_MODE_ERROR                0x03
+#define CONFLICT_MODE_FATAL                0x04
+
+extern void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
+                                        char *reason, int cancel_mode,
+                                        XLogRecPtr conflict_LSN);
+
 #endif   /* INVAL_H */
diff --git a/src/include/utils/selfuncs.h b/src/include/utils/selfuncs.h
index 5f473f4..d9c5181 100644
--- a/src/include/utils/selfuncs.h
+++ b/src/include/utils/selfuncs.h
@@ -193,4 +193,7 @@ extern Datum hashcostestimate(PG_FUNCTION_ARGS);
 extern Datum gistcostestimate(PG_FUNCTION_ARGS);
 extern Datum gincostestimate(PG_FUNCTION_ARGS);

+/* Duplicated to avoid needing to include access/xlog.h */
+extern bool RecoveryInProgress(void);
+
 #endif   /* SELFUNCS_H */
diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h
index df79e3a..72bbaea 100644
--- a/src/include/utils/snapshot.h
+++ b/src/include/utils/snapshot.h
@@ -15,6 +15,7 @@

 #include "access/htup.h"
 #include "storage/buf.h"
+#include "storage/sinval.h"


 typedef struct SnapshotData *Snapshot;
@@ -51,6 +52,7 @@ typedef struct SnapshotData
     /* note: all ids in xip[] satisfy xmin <= xip[i] < xmax */
     int32        subxcnt;        /* # of xact ids in subxip[], -1 if overflow */
     TransactionId *subxip;        /* array of subxact IDs in progress */
+    bool        takenDuringRecovery;    /* recovery-shaped snapshot? */

     /*
      * note: all ids in subxip[] are >= xmin, but we don't bother filtering
@@ -63,6 +65,53 @@ typedef struct SnapshotData
 } SnapshotData;

 /*
+ * Declarations for GetRunningTransactionData(). Similar to Snapshots, but
+ * not quite. This has nothing at all to do with visibility on this server,
+ * so this is completely separate from snapmgr.c and snapmgr.h
+ * This data is important for creating the initial snapshot state on a
+ * standby server. We need lots more information than a normal snapshot,
+ * hence we use a specific data structure for our needs. This data
+ * is written to WAL as a separate record immediately after each
+ * checkpoint. That means that wherever we start a standby from we will
+ * almost immediately see the data we need to begin executing queries.
+ */
+
+typedef struct RunningTransactionsData
+{
+    int                xcnt;                /* # of xact ids in xids[] */
+    int                numLocks;            /* # of loggable locks */
+    bool            lock_overflow;        /* too many locks held */
+    bool            subxid_overflow;    /* snapshot overflowed, subxids missing */
+    TransactionId     nextXid;            /* copy of ShmemVariableCache->nextXid */
+    TransactionId    oldestRunningXid;    /* *not* oldestXmin */
+
+    TransactionId  *xids;                /* array of (sub)xids still running */
+
+    xl_rel_lock        *loggableLocks;
+} RunningTransactionsData;
+
+typedef RunningTransactionsData *RunningTransactions;
+
+/*
+ * When we write running xact data to WAL, we use this structure.
+ */
+typedef struct xl_xact_running_xacts
+{
+    int                xcnt;                /* # of xact ids in xids[] */
+    int                numLocks;            /* # of loggable locks */
+    bool            lock_overflow;        /* too many locks held */
+    bool            subxid_overflow;    /* snapshot overflowed, subxids missing */
+    TransactionId    nextXid;            /* copy of ShmemVariableCache->nextXid */
+    TransactionId    oldestRunningXid;    /* *not* oldestXmin */
+
+    TransactionId    xids[1];        /* VARIABLE LENGTH ARRAY */
+
+    /* ARRAY OF LOGGABLE LOCKS FOLLOWS */
+} xl_xact_running_xacts;
+
+#define MinSizeOfXactRunningXacts offsetof(xl_xact_running_xacts, xids)
+
+/*
  * Result codes for HeapTupleSatisfiesUpdate.  This should really be in
  * tqual.h, but we want to avoid including that file elsewhere.
  */

pgsql-hackers by date:

Previous
From: Josh Berkus
Date:
Subject: Re: Parsing config files in a directory
Next
From: Alvaro Herrera
Date:
Subject: Re: "ERROR: could not read block 6 ...: read only 0 of 8192 bytes" after autovacuum cancelled