diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index bacc08e..a93c81b 100644
*** a/src/backend/commands/async.c
--- b/src/backend/commands/async.c
***************
*** 137,143 ****
--- 137,145 ----
  #include "utils/builtins.h"
  #include "utils/memutils.h"
  #include "utils/ps_status.h"
+ #include "utils/snapmgr.h"
  #include "utils/timestamp.h"
+ #include "utils/tqual.h"
  
  
  /*
*************** static bool SignalBackends(void);
*** 387,393 ****
  static void asyncQueueReadAllNotifications(void);
  static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
  							 QueuePosition stop,
! 							 char *page_buffer);
  static void asyncQueueAdvanceTail(void);
  static void ProcessIncomingNotify(void);
  static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
--- 389,396 ----
  static void asyncQueueReadAllNotifications(void);
  static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
  							 QueuePosition stop,
! 							 char *page_buffer,
! 							 Snapshot snapshot);
  static void asyncQueueAdvanceTail(void);
  static void ProcessIncomingNotify(void);
  static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
*************** PreCommit_Notify(void)
*** 798,804 ****
  		}
  	}
  
! 	/* Queue any pending notifies */
  	if (pendingNotifies)
  	{
  		ListCell   *nextNotify;
--- 801,807 ----
  		}
  	}
  
! 	/* Queue any pending notifies (must happen after the above) */
  	if (pendingNotifies)
  	{
  		ListCell   *nextNotify;
*************** Exec_ListenPreCommit(void)
*** 987,993 ****
  	 * have already committed before we started to LISTEN.
  	 *
  	 * Note that we are not yet listening on anything, so we won't deliver any
! 	 * notification to the frontend.
  	 *
  	 * This will also advance the global tail pointer if possible.
  	 */
--- 990,998 ----
  	 * have already committed before we started to LISTEN.
  	 *
  	 * Note that we are not yet listening on anything, so we won't deliver any
! 	 * notification to the frontend.  Also, although our transaction might
! 	 * have executed NOTIFY, those message(s) aren't queued yet so we can't
! 	 * see them in the queue.
  	 *
  	 * This will also advance the global tail pointer if possible.
  	 */
*************** asyncQueueReadAllNotifications(void)
*** 1744,1749 ****
--- 1749,1755 ----
  	volatile QueuePosition pos;
  	QueuePosition oldpos;
  	QueuePosition head;
+ 	Snapshot	snapshot;
  	bool		advanceTail;
  
  	/* page_buffer must be adequately aligned, so use a union */
*************** asyncQueueReadAllNotifications(void)
*** 1767,1772 ****
--- 1773,1781 ----
  		return;
  	}
  
+ 	/* Get snapshot we'll use to decide which xacts are still in progress */
+ 	snapshot = RegisterSnapshot(GetLatestSnapshot());
+ 
  	/*----------
  	 * Note that we deliver everything that we see in the queue and that
  	 * matches our _current_ listening state.
*************** asyncQueueReadAllNotifications(void)
*** 1854,1860 ****
  			 * while sending the notifications to the frontend.
  			 */
  			reachedStop = asyncQueueProcessPageEntries(&pos, head,
! 													   page_buffer.buf);
  		} while (!reachedStop);
  	}
  	PG_CATCH();
--- 1863,1870 ----
  			 * while sending the notifications to the frontend.
  			 */
  			reachedStop = asyncQueueProcessPageEntries(&pos, head,
! 													   page_buffer.buf,
! 													   snapshot);
  		} while (!reachedStop);
  	}
  	PG_CATCH();
*************** asyncQueueReadAllNotifications(void)
*** 1882,1887 ****
--- 1892,1900 ----
  	/* If we were the laziest backend, try to advance the tail pointer */
  	if (advanceTail)
  		asyncQueueAdvanceTail();
+ 
+ 	/* Done with snapshot */
+ 	UnregisterSnapshot(snapshot);
  }
  
  /*
*************** asyncQueueReadAllNotifications(void)
*** 1903,1909 ****
  static bool
  asyncQueueProcessPageEntries(volatile QueuePosition *current,
  							 QueuePosition stop,
! 							 char *page_buffer)
  {
  	bool		reachedStop = false;
  	bool		reachedEndOfPage;
--- 1916,1923 ----
  static bool
  asyncQueueProcessPageEntries(volatile QueuePosition *current,
  							 QueuePosition stop,
! 							 char *page_buffer,
! 							 Snapshot snapshot)
  {
  	bool		reachedStop = false;
  	bool		reachedEndOfPage;
*************** asyncQueueProcessPageEntries(volatile Qu
*** 1928,1934 ****
  		/* Ignore messages destined for other databases */
  		if (qe->dboid == MyDatabaseId)
  		{
! 			if (TransactionIdIsInProgress(qe->xid))
  			{
  				/*
  				 * The source transaction is still in progress, so we can't
--- 1942,1948 ----
  		/* Ignore messages destined for other databases */
  		if (qe->dboid == MyDatabaseId)
  		{
! 			if (XidInMVCCSnapshot(qe->xid, snapshot))
  			{
  				/*
  				 * The source transaction is still in progress, so we can't
*************** asyncQueueProcessPageEntries(volatile Qu
*** 1939,1948 ****
  				 * this advance-then-back-up behavior when dealing with an
  				 * uncommitted message.)
  				 *
! 				 * Note that we must test TransactionIdIsInProgress before we
! 				 * test TransactionIdDidCommit, else we might return a message
! 				 * from a transaction that is not yet visible to snapshots;
! 				 * compare the comments at the head of tqual.c.
  				 */
  				*current = thisentry;
  				reachedStop = true;
--- 1953,1967 ----
  				 * this advance-then-back-up behavior when dealing with an
  				 * uncommitted message.)
  				 *
! 				 * Note that we must test XidInMVCCSnapshot before we test
! 				 * TransactionIdDidCommit, else we might return a message from
! 				 * a transaction that is not yet visible to snapshots; compare
! 				 * the comments at the head of tqual.c.
! 				 *
! 				 * Also, while our own xact won't be listed in the snapshot,
! 				 * we need not check for TransactionIdIsCurrentTransactionId
! 				 * because our transaction cannot (yet) have queued any
! 				 * messages.
  				 */
  				*current = thisentry;
  				reachedStop = true;
diff --git a/src/backend/utils/time/tqual.c b/src/backend/utils/time/tqual.c
index bbac408..28060c5 100644
*** a/src/backend/utils/time/tqual.c
--- b/src/backend/utils/time/tqual.c
***************
*** 81,88 ****
  SnapshotData SnapshotSelfData = {HeapTupleSatisfiesSelf};
  SnapshotData SnapshotAnyData = {HeapTupleSatisfiesAny};
  
- /* local functions */
- static bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot);
  
  /*
   * SetHintBits()
--- 81,86 ----
*************** HeapTupleIsSurelyDead(HeapTuple htup, Tr
*** 1482,1488 ****
   * TransactionIdIsCurrentTransactionId first, except for known-committed
   * XIDs which could not be ours anyway.
   */
! static bool
  XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
  {
  	uint32		i;
--- 1480,1486 ----
   * TransactionIdIsCurrentTransactionId first, except for known-committed
   * XIDs which could not be ours anyway.
   */
! bool
  XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
  {
  	uint32		i;
diff --git a/src/include/utils/tqual.h b/src/include/utils/tqual.h
index 9a3b56e..96eaf01 100644
*** a/src/include/utils/tqual.h
--- b/src/include/utils/tqual.h
*************** extern HTSV_Result HeapTupleSatisfiesVac
*** 78,83 ****
--- 78,84 ----
  						 TransactionId OldestXmin, Buffer buffer);
  extern bool HeapTupleIsSurelyDead(HeapTuple htup,
  					  TransactionId OldestXmin);
+ extern bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot);
  
  extern void HeapTupleSetHintBits(HeapTupleHeader tuple, Buffer buffer,
  					 uint16 infomask, TransactionId xid);
