diff --git a/contrib/parallel_dummy/Makefile b/contrib/parallel_dummy/Makefile
new file mode 100644
index 0000000..de00f50
--- /dev/null
+++ b/contrib/parallel_dummy/Makefile
@@ -0,0 +1,19 @@
+MODULE_big = parallel_dummy
+OBJS = parallel_dummy.o $(WIN32RES)
+PGFILEDESC = "parallel_dummy - dummy use of parallel infrastructure"
+
+EXTENSION = parallel_dummy
+DATA = parallel_dummy--1.0.sql
+
+REGRESS = parallel_dummy
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = contrib/parallel_dummy
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/contrib/parallel_dummy/parallel_dummy--1.0.sql b/contrib/parallel_dummy/parallel_dummy--1.0.sql
new file mode 100644
index 0000000..3c0ae7d
--- /dev/null
+++ b/contrib/parallel_dummy/parallel_dummy--1.0.sql
@@ -0,0 +1,12 @@
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION parallel_dummy" to load this file. \quit
+
+CREATE FUNCTION parallel_sleep(sleep_time pg_catalog.int4,
+							  nworkers pg_catalog.int4)
+    RETURNS pg_catalog.void STRICT
+	AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION parallel_count(rel pg_catalog.regclass,
+							  nworkers pg_catalog.int4)
+    RETURNS pg_catalog.int8 STRICT
+	AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/contrib/parallel_dummy/parallel_dummy.c b/contrib/parallel_dummy/parallel_dummy.c
new file mode 100644
index 0000000..0a32ea8
--- /dev/null
+++ b/contrib/parallel_dummy/parallel_dummy.c
@@ -0,0 +1,238 @@
+/*--------------------------------------------------------------------------
+ *
+ * parallel_dummy.c
+ *		Test harness code for parallel mode code.
+ *
+ * Copyright (C) 2013-2014, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		contrib/parallel_dummy/parallel_dummy.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/heapam.h"
+#include "access/parallel.h"
+#include "access/xact.h"
+#include "fmgr.h"
+#include "miscadmin.h"
+#include "storage/bufmgr.h"
+#include "storage/spin.h"
+#include "utils/builtins.h"
+#include "utils/snapmgr.h"
+#include "utils/tqual.h"
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(parallel_sleep);
+PG_FUNCTION_INFO_V1(parallel_count);
+
+#define		PARALLEL_DUMMY_KEY			1
+
+typedef struct
+{
+	int32		sleep_time;
+} ParallelSleepInfo;
+
+typedef struct
+{
+	int32		relid;
+	slock_t		mutex;
+	BlockNumber	lastblock;
+	BlockNumber	currentblock;
+	int64		ntuples;
+} ParallelCountInfo;
+
+void		_PG_init(void);
+void		sleep_worker_main(dsm_segment *seg, shm_toc *toc);
+void		count_worker_main(dsm_segment *seg, shm_toc *toc);
+
+static void count_helper(Relation rel, ParallelCountInfo *info);
+
+Datum
+parallel_sleep(PG_FUNCTION_ARGS)
+{
+	int32		sleep_time = PG_GETARG_INT32(0);
+	int32		nworkers = PG_GETARG_INT32(1);
+	bool		already_in_parallel_mode = IsInParallelMode();
+	ParallelContext *pcxt;
+	ParallelSleepInfo *info;
+
+	if (nworkers < 0)
+		ereport(ERROR,
+				(errmsg("number of parallel workers must be non-negative")));
+
+	if (!already_in_parallel_mode)
+		EnterParallelMode();
+
+	pcxt = CreateParallelContextForExtension("parallel_dummy",
+											 "sleep_worker_main",
+											 nworkers);
+	shm_toc_estimate_chunk(&pcxt->estimator, sizeof(ParallelSleepInfo));
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+	InitializeParallelDSM(pcxt);
+	info = shm_toc_allocate(pcxt->toc, sizeof(ParallelSleepInfo));
+	info->sleep_time = sleep_time;
+	shm_toc_insert(pcxt->toc, PARALLEL_DUMMY_KEY, info);
+	LaunchParallelWorkers(pcxt);
+
+	/* here's where we do the "real work" ... */
+	DirectFunctionCall1(pg_sleep, Float8GetDatum((float8) sleep_time));
+
+	WaitForParallelWorkersToFinish(pcxt);
+	DestroyParallelContext(pcxt);
+
+	if (!already_in_parallel_mode)
+		ExitParallelMode();
+
+	PG_RETURN_VOID();
+}
+
+Datum
+parallel_count(PG_FUNCTION_ARGS)
+{
+	Oid			relid = PG_GETARG_OID(0);
+	int32		nworkers = PG_GETARG_INT32(1);
+	bool		already_in_parallel_mode = IsInParallelMode();
+	ParallelContext *pcxt;
+	ParallelCountInfo *info;
+	Relation	rel;
+	int64		result;
+
+	if (nworkers < 0)
+		ereport(ERROR,
+				(errmsg("number of parallel workers must be non-negative")));
+
+	rel = relation_open(relid, AccessShareLock);
+
+	if (!already_in_parallel_mode)
+		EnterParallelMode();
+
+	pcxt = CreateParallelContextForExtension("parallel_dummy",
+											 "count_worker_main",
+											 nworkers);
+	shm_toc_estimate_chunk(&pcxt->estimator, sizeof(ParallelCountInfo));
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+	InitializeParallelDSM(pcxt);
+	info = shm_toc_allocate(pcxt->toc, sizeof(ParallelCountInfo));
+	info->relid = relid;
+	SpinLockInit(&info->mutex);
+	info->lastblock = RelationGetNumberOfBlocks(rel);
+	info->currentblock = 0;
+	info->ntuples = 0;
+	shm_toc_insert(pcxt->toc, PARALLEL_DUMMY_KEY, info);
+	LaunchParallelWorkers(pcxt);
+
+	/* here's where we do the "real work" ... */
+	count_helper(rel, info);
+
+	WaitForParallelWorkersToFinish(pcxt);
+
+	result = info->ntuples;
+
+	DestroyParallelContext(pcxt);
+
+	relation_close(rel, AccessShareLock);
+
+	if (!already_in_parallel_mode)
+		ExitParallelMode();
+
+	PG_RETURN_INT64(result);
+}
+
+void
+sleep_worker_main(dsm_segment *seg, shm_toc *toc)
+{
+	ParallelSleepInfo *info;
+
+	info = shm_toc_lookup(toc, PARALLEL_DUMMY_KEY);
+	Assert(info != NULL);
+
+	/* here's where we do the "real work" ... */
+	DirectFunctionCall1(pg_sleep, Float8GetDatum((float8) info->sleep_time));
+}
+
+void
+count_worker_main(dsm_segment *seg, shm_toc *toc)
+{
+	ParallelCountInfo *info;
+	Relation	rel;
+
+	info = shm_toc_lookup(toc, PARALLEL_DUMMY_KEY);
+	Assert(info != NULL);
+
+	rel = relation_open(info->relid, AccessShareLock);
+	count_helper(rel, info);
+	relation_close(rel, AccessShareLock);
+}
+
+static void
+count_helper(Relation rel, ParallelCountInfo *info)
+{
+	int64		ntuples = 0;
+	int64		mytuples = 0;
+	Oid			relid = info->relid;
+	Snapshot	snapshot = GetActiveSnapshot();
+
+	for (;;)
+	{
+		BlockNumber blkno;
+		Buffer		buffer;
+		Page		page;
+		int			lines;
+		OffsetNumber	lineoff;
+		ItemId		lpp;
+		bool		all_visible;
+		bool		done = false;
+
+		CHECK_FOR_INTERRUPTS();
+
+		SpinLockAcquire(&info->mutex);
+		if (info->currentblock >= info->lastblock)
+			done = true;
+		else
+			blkno = info->currentblock++;
+		info->ntuples += ntuples;
+		SpinLockRelease(&info->mutex);
+
+		mytuples += ntuples;
+		if (done)
+			break;
+
+		buffer = ReadBuffer(rel, blkno);
+		LockBuffer(buffer, BUFFER_LOCK_SHARE);
+		page = BufferGetPage(buffer);
+		lines = PageGetMaxOffsetNumber(page);
+		ntuples = 0;
+
+		all_visible = PageIsAllVisible(page) && !snapshot->takenDuringRecovery;
+
+		for (lineoff = FirstOffsetNumber, lpp = PageGetItemId(page, lineoff);
+			 lineoff <= lines;
+			 lineoff++, lpp++)
+		{
+			HeapTupleData	loctup;
+
+			if (!ItemIdIsNormal(lpp))
+				continue;
+			if (all_visible)
+			{
+				++ntuples;
+				continue;
+			}
+
+			loctup.t_tableOid = relid;
+			loctup.t_data = (HeapTupleHeader) PageGetItem(page, lpp);
+			loctup.t_len = ItemIdGetLength(lpp);
+
+			if (HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer))
+				++ntuples;
+		}
+
+		UnlockReleaseBuffer(buffer);
+	}
+
+	elog(NOTICE, "PID %d counted " INT64_FORMAT " tuples", MyProcPid, mytuples);
+}
diff --git a/contrib/parallel_dummy/parallel_dummy.control b/contrib/parallel_dummy/parallel_dummy.control
new file mode 100644
index 0000000..90bae3f
--- /dev/null
+++ b/contrib/parallel_dummy/parallel_dummy.control
@@ -0,0 +1,4 @@
+comment = 'Dummy parallel code'
+default_version = '1.0'
+module_pathname = '$libdir/parallel_dummy'
+relocatable = true
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 61216e5..8e3ccf4 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -546,6 +546,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 
 			switch (event)
 			{
+				case XACT_EVENT_PARALLEL_PRE_COMMIT:
 				case XACT_EVENT_PRE_COMMIT:
 					/* Commit all remote transactions during pre-commit */
 					do_sql_command(entry->conn, "COMMIT TRANSACTION");
@@ -588,11 +589,13 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 							 errmsg("cannot prepare a transaction that modified remote tables")));
 					break;
+				case XACT_EVENT_PARALLEL_COMMIT:
 				case XACT_EVENT_COMMIT:
 				case XACT_EVENT_PREPARE:
 					/* Pre-commit should have closed the open transaction */
 					elog(ERROR, "missed cleaning up connection during pre-commit");
 					break;
+				case XACT_EVENT_PARALLEL_ABORT:
 				case XACT_EVENT_ABORT:
 					/* Assume we might have lost track of prepared statements */
 					entry->have_error = true;
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 24e300c..eee1ca0 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -2234,6 +2234,17 @@ static HeapTuple
 heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid,
 					CommandId cid, int options)
 {
+	/*
+	 * For now, parallel operations are required to be strictly read-only.
+	 * Unlike heap_update() and heap_delete(), an insert should never create
+	 * a combo CID, so it might be possible to relax this restrction, but
+	 * not without more thought and testing.
+	 */
+	if (IsInParallelMode())
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+				 errmsg("cannot insert tuples during a parallel operation")));
+
 	if (relation->rd_rel->relhasoids)
 	{
 #ifdef NOT_USED
@@ -2641,6 +2652,16 @@ heap_delete(Relation relation, ItemPointer tid,
 
 	Assert(ItemPointerIsValid(tid));
 
+	/*
+	 * Forbid this during a parallel operation, lest it allocate a combocid.
+	 * Other workers might need that combocid for visibility checks, and we
+	 * have no provision for broadcasting it to them.
+	 */
+	if (IsInParallelMode())
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+				 errmsg("cannot delete tuples during a parallel operation")));
+
 	block = ItemPointerGetBlockNumber(tid);
 	buffer = ReadBuffer(relation, block);
 	page = BufferGetPage(buffer);
@@ -3079,6 +3100,16 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
 	Assert(ItemPointerIsValid(otid));
 
 	/*
+	 * Forbid this during a parallel operation, lest it allocate a combocid.
+	 * Other workers might need that combocid for visibility checks, and we
+	 * have no provision for broadcasting it to them.
+	 */
+	if (IsInParallelMode())
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+				 errmsg("cannot update tuples during a parallel operation")));
+
+	/*
 	 * Fetch the list of attributes to be checked for HOT update.  This is
 	 * wasted effort if we fail to update or have to put the new tuple on a
 	 * different page.  But we must compute the list before obtaining buffer
@@ -5382,6 +5413,17 @@ heap_inplace_update(Relation relation, HeapTuple tuple)
 	uint32		oldlen;
 	uint32		newlen;
 
+	/*
+	 * For now, parallel operations are required to be strictly read-only.
+	 * Unlike a regular update, this should never create a combo CID, so it
+	 * might be possible to relax this restrction, but not without more
+	 * thought and testing.  It's not clear that it would be useful, anyway.
+	 */
+	if (IsInParallelMode())
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+				 errmsg("cannot update tuples during a parallel operation")));
+
 	buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(&(tuple->t_self)));
 	LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 	page = (Page) BufferGetPage(buffer);
diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index 9d4d5db..94455b2 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -12,7 +12,7 @@ subdir = src/backend/access/transam
 top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = clog.o commit_ts.o multixact.o rmgr.o slru.o subtrans.o \
+OBJS = clog.o commit_ts.o multixact.o parallel.o rmgr.o slru.o subtrans.o \
 	timeline.o transam.o twophase.o twophase_rmgr.o varsup.o \
 	xact.o xlog.o xlogarchive.o xlogfuncs.o \
 	xloginsert.o xlogreader.o xlogutils.o
diff --git a/src/backend/access/transam/README.parallel b/src/backend/access/transam/README.parallel
new file mode 100644
index 0000000..401cfd6
--- /dev/null
+++ b/src/backend/access/transam/README.parallel
@@ -0,0 +1,206 @@
+Overview
+========
+
+PostgreSQL provides some simple facilities to make writing parallel algorithms
+easier.  Using a data structure called a ParallelContext, you can arrange to
+launch background worker processes, initialize their state to match that of
+the backend which initiated paralellism, communicate with them via dynamic
+shared memory, and write reasonably complex code that can run either in the
+user backend or in one of the parallel workers without needing to be aware of
+where it's running.
+
+The backend which starts a parallel operation (hereafter, the initiating
+backend) starts by creating a dynamic shared memory segment which will last
+for the lifetime of the parallel operation.  This dynamic shared memory segment
+will contain (1) a shm_mq that can be used to transport errors (and other
+messages reported via elog/ereport) from the worker back to the initiating
+backend; (2) serialized representations of the initiating backend's private
+state, so that the worker can synchronize its state with of the initiating
+backend; and (3) any other data structures which a particular user of the
+ParallelContext data structure may wish to add for its own purposes.  Once
+the initiating backend has initialized the dynamic shared memory segment, it
+asks the postmaster to launch the appropriate number of parallel workers.
+These workers then connect to the dynamic shared memory segment, initiate
+their state, and then invoke the appropriate entrypoint, as further detailed
+below.
+
+Error Reporting
+===============
+
+When started, each parallel worker begins by attaching the dynamic shared
+memory segment and locating the shm_mq to be used for error reporting; it
+redirects all of its protocol messages to this shm_mq.  Prior to this point,
+any failure of the background worker will not be reported to the initiating
+backend; from the point of view of the initiating backend, the worker simply
+failed to start.  The initiating backend must anyway be prepared to cope
+with fewer parallel workers than it originally requested, so catering to
+this case imposes no additional burden.
+
+Whenever a new message (or partial message; very large messages may wrap) is
+sent to the error-reporting queue, PROCSIG_PARALLEL_MESSAGE is sent to the
+initiating backend.  This causes the next CHECK_FOR_INTERRUPTS() in the
+initiating backend to read and rethrow the message.  For the most part, this
+makes error reporting in parallel mode "just work".  Of course, to work
+properly, it is important that the code the initiating backend is executing
+CHECK_FOR_INTERRUPTS() regularly and avoid blocking interrupt processing for
+long periods of time, but those are good things to do anyway.
+
+(A currently-unsolved problem is that some messages may get written to the
+system log twice, once in the backend where the report was originally
+generated, and again when the initiating backend rethrows the message.  If
+we decide to suppress one of these reports, it should probably be second one;
+otherwise, if the worker is for some reason unable to propagate the message
+back to the initiating backend, the message will be lost altogether.)
+
+State Sharing
+=============
+
+It's possible to write C code which works correctly without parallelism, but
+which fails when parallelism is used.  No parallel infrastructure can
+completely eliminate this problem, because any global variable is a risk.
+There's no general mechanism for ensuring that every global variable in the
+worker will have the same value that it does in the initiating backend; even
+if we could ensure that, some function we're calling could update the variable
+after each call, and only the backend where that update is performed will see
+the new value.  Similar problems can arise with any more-complex data
+structure we might choose to use.  For example, a pseudo-random number
+generator should, given a particular seed value, produce the same predictable
+series of values every time.  But it does this by relying on some private
+state which won't automatically be shared between cooperating backends.  A
+parallel-safe PRNG would need to store its state in dynamic shared memory, and
+would require locking.  The parallelism infrastructure has no way of knowing
+whether the user intends to call code that has this sort of problem, and can't
+do anything about it anyway.
+
+Instead, we take a more pragmatic approach: we try to make as many of the
+operations that are safe outside of parallel mode work correctly in parallel
+mode as well, and we try to prohibit the rest via suitable error checks.
+The error checks are engaged via EnterParallelMode(), which should be called
+before creating a parallel context, and disarmed via ExitParallelMode(),
+which should be called after all parallel contexts have been destroyed.
+The most significant restriction imposed by parallel mode is that all
+operations must be strictly read-only; we allow no writes to the database
+and no DDL.  We might try to relax these restrictions in the future.
+
+To make as many operations as possible safe in parallel mode, we try to copy
+the most important pieces of state from the initiating backend to each parallel
+worker.  This includes:
+
+  - The authenticated user ID and current database.  Each parallel worker
+    will connect to the same database as the initiating backend, using the
+    same user ID.
+
+  - The values of all GUCs.  Accordingly, permanent changes to the value of 
+    any GUC are forbidden while in parallel mode; but temporary changes,
+    such as entering a function with non-NULL proconfig, are potentially OK.
+
+  - The current subtransaction's XID, the top-level transaction's XID, and
+    the list of XIDs considered current (that is, they are in-progress or
+    subcommitted).  This information is needed to ensure that tuple visibility
+    checks return the same results in the worker as they do in the
+    initiating backend.  See also the section Transaction Integration, below.
+
+  - The combo CID mappings.  This is needed to ensure consistent answers to
+    tuple visibility checks.  The need to synchronize this data structure is
+    a major reason why we can't support writes in parallel mode: such writes
+    might create new combo CIDs, and we have now way to let other workers
+    (or the initiating backend) know about them.
+
+  - The transaction snapshot.
+
+  - The active snapshot, which might be different from the transaction
+    snapshot.
+
+  - The currently active user ID and security context.  Note that this is
+    the fourth user ID we restore: the initial step of binding to the correct
+    database also involves restoring the authenticated user ID.  When GUC
+    values are restored, this incidentally sets SessionUserId and OuterUserId
+    to the correct values.  This final step restores CurrentUserId.
+
+Transaction Integration
+=======================
+
+Regardless of what the TransactionState stack looks like in the master, the
+parallel backend ends up with a stack of depth 1.  This stack entry is
+marked with the special transaction block state TBLOCK_PARALLEL_INPROGRESS
+so that it's not confused with an ordinary toplevel transaction.  The
+XID of this TransactionState is set to the XID of the innermost
+currently-active subtransaction in the initiating backend.  The initiating
+backend's toplevel XID, and the XIDs of all current (in-progress or
+subcommitted) XIDs are stored separately from the TransactionState stack,
+but in such a way that GetTopTransactionId(), GetTopTransactionIdIfAny(),
+and TransactionIdIsCurrentTransactionId() return the same values that they
+would in the initiating backend.  We could copy the entire transaction state
+stack, but most of it would be useless: for example, you can't roll back to
+a savepoint from within a parallel worker, and there are no resources to
+associated with the memory contexts or resource owners of intermediate
+subtransactions.
+
+No meaningful change to the transaction state can be made while in parallel
+mode.  No XIDs can be assigned, and no subtransactions can start or end,
+because we have no way of communicating these state changes to cooperating
+backends, or of synchronizing them.  It's clearly unworkable for the initating
+backend to exit any transaction or subtransaction that was in progress when
+paralellism was started before all parallel workers have exited; and it's even
+more clearly crazy for a parallel worker to try to subcommit or subabort the
+current subtransaction and execute in some other transaction context that was
+present in the initiating backend.  It might be practical to allow internal
+sub-transactions (e.g. to implement a PL/pgsql EXCEPTION block) to be used in
+parallel mode, provided that they are XID-less, because other backends
+wouldn't really need to know about those transactions or do anything
+differently because of them.  Right now, we don't even allow that.
+
+Transaction commit or abort requires careful coordination between backends.
+Each backend has its own resource owners: buffer pins, catcache or relcache 
+reference counts, tuple descriptors, and so on are managed separately by each
+backend, and each backend is separately responsible for releasing such
+resources.  Generally, the commit or abort of a parallel worker is much like
+a top-transaction commit or abort, but there are a few exceptions.  Most
+importantly:
+
+  - No commit or abort record is written; the initiating backend is
+    responsible for this.
+
+  - End-of-transaction namespace processing is not done.  If a pg_temp
+    namespace needs to be cleaned up, the master is responsible for this.
+
+The master kills off all remaining workers as part of commit or abort
+processing.  It must not only kill the workers but wait for them to actually
+exit; otherwise, chaos can ensue.  For example, if the master is
+rolling back the transaction that created the relation being scanned by
+a worker, the relation could disappear while the worker is still busy
+scanning it.  That's not safe.
+
+Coding Conventions
+===================
+
+Before beginning any parallel operation, call EnterParallelMode(); after all
+parallel operations are completed, call ExitParallelMode().  To actually
+parallelize a particular operation, use a ParallelContext.  The basic coding
+pattern looks like this:
+
+	EnterParallelMode();		/* prohibit unsafe state changes */
+
+	pcxt = CreateParallelContext(entrypoint, nworkers);
+
+	/* Allow space for application-specific data here. */
+	shm_toc_estimate_chunk(&pcxt->estimator, size);
+	shm_toc_estimate_keys(&pcxt->estimator, keys);
+
+	InitializeParallelDSM(pcxt);	/* create DSM and copy state to it */
+
+	/* Store the data for which we reserved space. */
+	space = shm_toc_allocate(pcxt->toc, size);
+	shm_toc_insert(pcxt->toc, key, space);
+
+	LaunchParallelWorkers(pcxt);
+
+	/* do parallel stuff */
+
+	WaitForParallelWorkersToFinish(pcxt);
+
+	/* read any final results from dynamic shared memory */
+
+	DestroyParallelContext(pcxt);
+
+	ExitParallelMode();
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
new file mode 100644
index 0000000..9f3a01a
--- /dev/null
+++ b/src/backend/access/transam/parallel.c
@@ -0,0 +1,921 @@
+/*-------------------------------------------------------------------------
+ *
+ * parallel.c
+ *	  Infrastructure for launching parallel workers
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	  src/backend/access/transam/parallel.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/xact.h"
+#include "access/parallel.h"
+#include "commands/async.h"
+#include "libpq/libpq.h"
+#include "libpq/pqformat.h"
+#include "libpq/pqmq.h"
+#include "miscadmin.h"
+#include "storage/ipc.h"
+#include "storage/sinval.h"
+#include "storage/spin.h"
+#include "utils/combocid.h"
+#include "utils/guc.h"
+#include "utils/memutils.h"
+#include "utils/resowner.h"
+#include "utils/snapmgr.h"
+
+/*
+ * We don't want to waste a lot of memory on an error queue which, most of
+ * the time, will process only a handful of small messages.  However, it is
+ * desirable to make it large enough that a typical ErrorResponse can be sent
+ * without blocking.  That way, a worker that errors out can write the whole
+ * message into the queue and terminate without waiting for the user backend.
+ */
+#define	PARALLEL_ERROR_QUEUE_SIZE			16384
+
+/* Magic number for parallel context TOC. */
+#define PARALLEL_MAGIC						0x50477c7c
+
+/*
+ * Magic numbers for parallel state sharing.  Higher-level code should use
+ * smaller values, leaving these very large ones for use by this module.
+ */
+#define PARALLEL_KEY_FIXED					UINT64CONST(0xFFFFFFFFFFFF0001)
+#define PARALLEL_KEY_ERROR_QUEUE			UINT64CONST(0xFFFFFFFFFFFF0002)
+#define PARALLEL_KEY_GUC					UINT64CONST(0xFFFFFFFFFFFF0003)
+#define PARALLEL_KEY_COMBO_CID				UINT64CONST(0xFFFFFFFFFFFF0004)
+#define PARALLEL_KEY_TRANSACTION_SNAPSHOT	UINT64CONST(0xFFFFFFFFFFFF0005)
+#define PARALLEL_KEY_ACTIVE_SNAPSHOT		UINT64CONST(0xFFFFFFFFFFFF0006)
+#define PARALLEL_KEY_TRANSACTION_STATE		UINT64CONST(0xFFFFFFFFFFFF0007)
+#define PARALLEL_KEY_EXTENSION_TRAMPOLINE	UINT64CONST(0xFFFFFFFFFFFF0008)
+
+/* Fixed-size parallel state. */
+typedef struct FixedParallelState
+{
+	/* Fixed-size state that workers must restore. */
+	Oid			database_id;
+	Oid			authenticated_user_id;
+	Oid			current_user_id;
+	int			sec_context;
+	PGPROC	   *parallel_master_pgproc;
+	pid_t		parallel_master_pid;
+	BackendId	parallel_master_backend_id;
+
+	/* Entrypoint for parallel workers. */
+	parallel_worker_main_type	entrypoint;
+
+	/* Track whether workers have attached. */
+	slock_t		mutex;
+	int			workers_expected;
+	int			workers_attached;
+} FixedParallelState;
+
+/*
+ * Our parallel worker number.  We initialize this to -1, meaning that we are
+ * not a parallel worker.  In parallel workers, it will be set to a value >= 0
+ * and < the number of workers before any user code is invoked; each parallel
+ * worker will get a different parallel worker number.
+ */
+int ParallelWorkerNumber = -1;
+
+/* Is there a parallel message pending which we need to receive? */
+bool ParallelMessagePending = false;
+
+/* Are we in the midst of handling parallel messages? */
+static bool HandlingParallelMessages = false;
+
+/* List of active parallel contexts. */
+static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);
+
+/* Private functions. */
+static void HandleParallelMessages(void);
+static void HandleParallelMessage(ParallelContext *, int, StringInfo msg);
+static void ParallelErrorContext(void *arg);
+static void ParallelMain(Datum main_arg);
+static void ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc);
+static void handle_sigterm(SIGNAL_ARGS);
+
+/*
+ * Establish a new parallel context.  This should be done after entering
+ * parallel mode, and (unless there is an error) the context should be
+ * destroyed before exiting the current subtransaction.
+ */
+ParallelContext *
+CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers)
+{
+	MemoryContext	oldcontext;
+	ParallelContext	*pcxt;
+
+	/* It is unsafe to create a parallel context if not in parallel mode. */
+	Assert(IsInParallelMode());
+
+	/* Number of workers should be positive. */
+	Assert(nworkers >= 0);
+
+	/* We might be running in a very short-lived memory context. */
+	oldcontext = MemoryContextSwitchTo(TopTransactionContext);
+
+	/* Initialize a new ParallelContext. */
+	pcxt = palloc0(sizeof(ParallelContext));
+	pcxt->subid = GetCurrentSubTransactionId();
+	pcxt->nworkers = nworkers;
+	pcxt->entrypoint = entrypoint;
+	pcxt->error_context_stack = error_context_stack;
+	shm_toc_initialize_estimator(&pcxt->estimator);
+	dlist_push_head(&pcxt_list, &pcxt->node);
+
+	/* Restore previous memory context. */
+	MemoryContextSwitchTo(oldcontext);
+
+	return pcxt;
+}
+
+/*
+ * Establish a new parallel context that calls a function provided by an
+ * extension.  This works around the fact that the library might get mapped
+ * at a different address in each backend.
+ */
+ParallelContext *
+CreateParallelContextForExtension(char *library_name, char *function_name,
+								  int nworkers)
+{
+	MemoryContext	oldcontext;
+	ParallelContext *pcxt;
+
+	/* We might be running in a very short-lived memory context. */
+	oldcontext = MemoryContextSwitchTo(TopTransactionContext);
+
+	/* Create the context. */
+	pcxt = CreateParallelContext(ParallelExtensionTrampoline, nworkers);
+	pcxt->library_name = pstrdup(library_name);
+	pcxt->function_name = pstrdup(function_name);
+
+	/* Restore previous memory context. */
+	MemoryContextSwitchTo(oldcontext);
+
+	return pcxt;
+}
+
+/*
+ * Establish the dynamic shared memory segment for a parallel context and
+ * copied state and other bookkeeping information that will need by parallel
+ * workers into it.
+ */
+void
+InitializeParallelDSM(ParallelContext *pcxt)
+{
+	MemoryContext	oldcontext;
+	Size	guc_len;
+	Size	combocidlen;
+	Size	tsnaplen;
+	Size	asnaplen;
+	Size	tstatelen;
+	Size	segsize;
+	int		i;
+	FixedParallelState *fps;
+	char   *gucspace;
+	char   *combocidspace;
+	char   *tsnapspace;
+	char   *asnapspace;
+	char   *tstatespace;
+	char   *error_queue_space;
+	Snapshot	transaction_snapshot = GetTransactionSnapshot();
+	Snapshot	active_snapshot = GetActiveSnapshot();
+
+	/* We might be running in a very short-lived memory context. */
+	oldcontext = MemoryContextSwitchTo(TopTransactionContext);
+
+	/* Allocate space for worker information. */
+	pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers);
+
+	/*
+	 * Estimate how much space we'll need for state sharing.
+	 *
+	 * If you add more chunks here, you probably need more keys, too.
+	 */
+	shm_toc_estimate_chunk(&pcxt->estimator, sizeof(FixedParallelState));
+	guc_len = EstimateGUCStateSpace();
+	shm_toc_estimate_chunk(&pcxt->estimator, guc_len);
+	combocidlen = EstimateComboCIDStateSpace();
+	shm_toc_estimate_chunk(&pcxt->estimator, combocidlen);
+	tsnaplen = EstimateSnapshotSpace(transaction_snapshot);
+	shm_toc_estimate_chunk(&pcxt->estimator, tsnaplen);
+	asnaplen = EstimateSnapshotSpace(active_snapshot);
+	shm_toc_estimate_chunk(&pcxt->estimator, asnaplen);
+	tstatelen = EstimateTransactionStateSpace();
+	shm_toc_estimate_chunk(&pcxt->estimator, tstatelen);
+	shm_toc_estimate_keys(&pcxt->estimator, 6);
+
+	/* Estimate how much space we'll need for error queues. */
+	StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
+		PARALLEL_ERROR_QUEUE_SIZE,
+		"parallel error queue size not buffer-aligned");
+	shm_toc_estimate_chunk(&pcxt->estimator,
+						   PARALLEL_ERROR_QUEUE_SIZE * pcxt->nworkers);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+	/* Estimate how much we'll need for extension entrypoint information. */
+	if (pcxt->library_name != NULL)
+	{
+		Assert(pcxt->entrypoint == ParallelExtensionTrampoline);
+		Assert(pcxt->function_name != NULL);
+		shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name)
+							   + strlen(pcxt->function_name) + 2);
+		shm_toc_estimate_keys(&pcxt->estimator, 1);
+	}
+
+	/* Create DSM and initialize with new table of contents. */
+	segsize = shm_toc_estimate(&pcxt->estimator);
+	pcxt->seg = dsm_create(segsize);
+	pcxt->toc = shm_toc_create(PARALLEL_MAGIC,
+							   dsm_segment_address(pcxt->seg),
+							   segsize);
+
+	/* Initialize fixed-size state in shared memory. */
+	fps = (FixedParallelState *)
+		shm_toc_allocate(pcxt->toc, sizeof(FixedParallelState));
+	fps->database_id = MyDatabaseId;
+	fps->authenticated_user_id = GetAuthenticatedUserId();
+	GetUserIdAndSecContext(&fps->current_user_id, &fps->sec_context);
+	fps->parallel_master_pgproc = MyProc;
+	fps->parallel_master_pid = MyProcPid;
+	fps->parallel_master_backend_id = MyBackendId;
+	fps->entrypoint = pcxt->entrypoint;
+	SpinLockInit(&fps->mutex);
+	fps->workers_expected = pcxt->nworkers;
+	fps->workers_attached = 0;
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps);
+
+	/* Serialize GUC state to dynamic shared memory. */
+	gucspace = shm_toc_allocate(pcxt->toc, guc_len);
+	SerializeGUCState(guc_len, gucspace);
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_GUC, gucspace);
+
+	/* Serialize combo CID state to dynamic shared memory. */
+	combocidspace = shm_toc_allocate(pcxt->toc, combocidlen);
+	SerializeComboCIDState(combocidlen, combocidspace);
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_COMBO_CID, combocidspace);
+
+	/* Serialize transaction snapshots to dynamic shared memory. */
+	tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen);
+	SerializeSnapshot(transaction_snapshot, tsnaplen, tsnapspace);
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT, tsnapspace);
+	asnapspace = shm_toc_allocate(pcxt->toc, asnaplen);
+	SerializeSnapshot(active_snapshot, asnaplen, asnapspace);
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace);
+
+	/* Serialize transaction state to dynamic shared memory. */
+	tstatespace = shm_toc_allocate(pcxt->toc, tstatelen);
+	SerializeTransactionState(tstatelen, tstatespace);
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_STATE, tstatespace);
+
+	/*
+	 * Establish error queues in dynamic shared memory.
+	 *
+	 * These queues should be used only for transmitting ErrorResponse,
+	 * NoticeResponse, and NotifyResponse protocol messages.  Tuple data should
+	 * be transmitted via separate (possibly larger?) queue.
+	 */
+	error_queue_space =
+	   shm_toc_allocate(pcxt->toc, PARALLEL_ERROR_QUEUE_SIZE * pcxt->nworkers);
+	for (i = 0; i < pcxt->nworkers; ++i)
+	{
+		shm_mq *mq;
+
+		mq = shm_mq_create(error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE,
+						   PARALLEL_ERROR_QUEUE_SIZE);
+		shm_mq_set_receiver(mq, MyProc);
+		pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
+	}
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, error_queue_space);
+
+	/* Serialize extension entrypoint information to dynamic shared memory. */
+	if (pcxt->library_name != NULL)
+	{
+		Size	lnamelen = strlen(pcxt->library_name);
+		char *extensionstate;
+
+		extensionstate = shm_toc_allocate(pcxt->toc, lnamelen
+										  + strlen(pcxt->function_name) + 2);
+		strcpy(extensionstate, pcxt->library_name);
+		strcpy(extensionstate + lnamelen + 1, pcxt->function_name);
+		shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXTENSION_TRAMPOLINE,
+					   extensionstate);
+	}
+
+	/* Restore previous memory context. */
+	MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Launch parallel workers.
+ */
+void
+LaunchParallelWorkers(ParallelContext *pcxt)
+{
+	MemoryContext	oldcontext;
+	BackgroundWorker	worker;
+	int		i;
+
+	/* We might be running in a very short-lived memory context. */
+	oldcontext = MemoryContextSwitchTo(TopTransactionContext);
+
+	/* Configure a worker. */
+	snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
+			 MyProcPid);
+	worker.bgw_flags =
+		BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
+	worker.bgw_start_time = BgWorkerStart_ConsistentState;
+	worker.bgw_restart_time = BGW_NEVER_RESTART;
+	worker.bgw_main = ParallelMain;
+	worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(pcxt->seg));
+	worker.bgw_notify_pid = MyProcPid;
+
+	/*
+	 * Start workers.
+	 *
+	 * The caller must be able to tolerate ending up with fewer workers than
+	 * expected, so there is no need to throw an error here if registration
+	 * fails.  It wouldn't help much anyway, because registering the worker
+	 * in no way guarantees that it will start up and initialize successfully.
+	 */
+	for (i = 0; i < pcxt->nworkers; ++i)
+	{
+		if (RegisterDynamicBackgroundWorker(&worker,
+											&pcxt->worker[i].bgwhandle))
+			shm_mq_set_handle(pcxt->worker[i].error_mqh,
+							  pcxt->worker[i].bgwhandle);
+		else
+		{
+			pcxt->worker[i].bgwhandle = NULL;
+			pcxt->worker[i].error_mqh = NULL;
+		}
+	}
+
+	/* Restore previous memory context. */
+	MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Wait for all workers to exit cleanly.
+ */
+void
+WaitForParallelWorkersToFinish(ParallelContext *pcxt)
+{
+	for (;;)
+	{
+		bool	anyone_alive = false;
+		int		i;
+
+		/*
+		 * This will process any parallel messages that are pending, which
+		 * may change the outcome of the loop that follows.  It may also
+		 * throw an error propagated from a worker.
+		 */
+		CHECK_FOR_INTERRUPTS();
+
+		for (i = 0; i < pcxt->nworkers; ++i)
+		{
+			if (pcxt->worker[i].error_mqh != NULL)
+			{
+				anyone_alive = true;
+				break;
+			}
+		}
+
+		if (!anyone_alive)
+			break;
+
+		WaitLatch(&MyProc->procLatch, WL_LATCH_SET, -1);
+		ResetLatch(&MyProc->procLatch);
+	}
+}
+
+/*
+ * Destroy a parallel context.
+ *
+ * If expecting a clean exit, you should use WaitForParallelWorkersToFinish()
+ * first, before calling this function.  When this function is invoked, any
+ * remaining workers are forcibly killed; the dynamic shared memory segment
+ * is unmapped; and we then wait (uninterruptibly) for the workers to exit.
+ */
+void
+DestroyParallelContext(ParallelContext *pcxt)
+{
+	int		i;
+
+	/*
+	 * Be careful about order of operations here!  We remove the parallel
+	 * context from the list before we do anything else; otherwise, if an
+	 * error occurs during a subsequent step, we might try to nuke it again
+	 * from AtEOXact_Parallel or AtEOSubXact_Parallel.
+	 */
+	dlist_delete(&pcxt->node);
+
+	/* Kill each worker in turn, and forget their error queues. */
+	for (i = 0; i < pcxt->nworkers; ++i)
+	{
+		if (pcxt->worker[i].bgwhandle != NULL)
+			TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
+		if (pcxt->worker[i].error_mqh != NULL)
+		{
+			pfree(pcxt->worker[i].error_mqh);
+			pcxt->worker[i].error_mqh = NULL;
+		}
+	}
+
+	/*
+	 * If we have allocated a shared memory segment, detach it.  This will
+	 * implicitly detach the error queues, and any other shared memory queues,
+	 * stored there.
+	 */
+	if (pcxt->seg != NULL)
+		dsm_detach(pcxt->seg);
+
+	/* Wait until the workers actually die. */
+	for (i = 0; i < pcxt->nworkers; ++i)
+	{
+		BgwHandleStatus	status;
+
+		if (pcxt->worker[i].bgwhandle == NULL)
+			continue;
+
+		/*
+		 * We can't finish transaction commit or abort until all of the
+		 * workers are dead.  This means, in particular, that we can't respond
+		 * to interrupts at this stage.
+		 */
+		HOLD_INTERRUPTS();
+		status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
+		RESUME_INTERRUPTS();
+
+		/*
+		 * If the postmaster kicked the bucket, we have no chance of cleaning
+		 * up safely -- we won't be able to tell when our workers are actually
+		 * dead.  This doesn't necessitate a PANIC since they will all abort
+		 * eventually, but we can't safely continue this session.
+		 */
+		if (status == BGWH_POSTMASTER_DIED)
+			ereport(FATAL,
+					(errcode(ERRCODE_ADMIN_SHUTDOWN),
+				 errmsg("postmaster exited during a parallel transaction")));
+
+		/* Release memory. */
+		pfree(pcxt->worker[i].bgwhandle);
+		pcxt->worker[i].bgwhandle = NULL;
+	}
+
+	/* Free the worker array itself. */
+	pfree(pcxt->worker);
+	pcxt->worker = NULL;
+
+	/* Free memory. */
+	pfree(pcxt);
+}
+
+/*
+ * Are there any parallel contexts currently active?
+ */
+bool
+ParallelContextActive(void)
+{
+	return !dlist_is_empty(&pcxt_list);
+}
+
+/*
+ * Handle receipt of an interrupt indicating a parallel worker message.
+ *
+ * If signal_handler is true, we are being called from a signal handler and must
+ * be extremely cautious about what we do here!
+ */
+void
+HandleParallelMessageInterrupt(bool signal_handler)
+{
+	int			save_errno = errno;
+
+	/* Don't joggle the elbow of proc_exit */
+	if (!proc_exit_inprogress)
+	{
+		InterruptPending = true;
+		ParallelMessagePending = true;
+
+		/*
+		 * If it's safe to interrupt, service the interrupt immediately.
+		 * (We shouldn't be in parallel mode if waiting for the user to send
+		 * a new query, but we could be waiting for a lock.)
+		 */
+		if ((ImmediateInterruptOK || !signal_handler)
+			&& InterruptHoldoffCount == 0 && CritSectionCount == 0
+			&& !HandlingParallelMessages)
+		{
+			bool notify_enabled;
+			bool catchup_enabled;
+			bool save_ImmediateInterruptOK;
+
+			/*
+			 * Disable everything that might recursively interrupt us.
+			 *
+			 * If there were any possibility that disabling and re-enabling
+			 * interrupts or handling parallel messages might take a lock, we'd
+			 * need to HOLD_INTERRUPTS() as well, since taking a lock might
+			 * cause ImmediateInterruptOK to get temporarily reset to true.
+			 * But that shouldn't happen, so this is (hopefully) safe.  That's
+			 * good, because it lets us respond to query cancel and die
+			 * interrupts while we're in the midst of message-processing.
+			 */
+			save_ImmediateInterruptOK = ImmediateInterruptOK;
+			ImmediateInterruptOK = false;
+			notify_enabled = DisableNotifyInterrupt();
+			catchup_enabled = DisableCatchupInterrupt();
+			HandlingParallelMessages = true;
+
+			/* OK, do the work... */
+			HandleParallelMessages();
+
+			/* Now re-enable whatever was enabled before */
+			HandlingParallelMessages = false;
+			if (catchup_enabled)
+				EnableCatchupInterrupt();
+			if (notify_enabled)
+				EnableNotifyInterrupt();
+			ImmediateInterruptOK = save_ImmediateInterruptOK;
+		}
+	}
+
+	errno = save_errno;
+}
+
+/*
+ * Handle any queued protocol messages received from parallel workers.
+ */
+static void
+HandleParallelMessages(void)
+{
+	dlist_iter	iter;
+
+	ParallelMessagePending = false;
+
+	dlist_foreach(iter, &pcxt_list)
+	{
+		ParallelContext *pcxt;
+		int		i;
+		Size	nbytes;
+		void   *data;
+
+		pcxt = dlist_container(ParallelContext, node, iter.cur);
+		if (pcxt->worker == NULL)
+			continue;
+
+		for (i = 0; i < pcxt->nworkers; ++i)
+		{
+			/*
+			 * Read messages for as long as we have an error queue; if we
+			 * have hit (or hit while reading) ReadyForQuery, this will go to
+			 * NULL.
+			 */
+			while (pcxt->worker[i].error_mqh != NULL)
+			{
+				shm_mq_result	res;
+
+				CHECK_FOR_INTERRUPTS();
+
+				res = shm_mq_receive(pcxt->worker[i].error_mqh, &nbytes,
+									 &data, true);
+				if (res == SHM_MQ_SUCCESS)
+				{
+					StringInfoData	msg;
+
+					initStringInfo(&msg);
+					appendBinaryStringInfo(&msg, data, nbytes);
+					HandleParallelMessage(pcxt, i, &msg);
+					pfree(msg.data);
+				}
+				else if (res == SHM_MQ_DETACHED)
+					ereport(ERROR,
+							(errcode(ERRCODE_INTERNAL_ERROR), /* XXX: wrong errcode? */
+							 errmsg("lost connection to parallel worker")));
+			}
+		}
+	}
+}
+
+/*
+ * Handle a single protocol message received from a single parallel worker.
+ */
+static void
+HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
+{
+	char	msgtype;
+
+	msgtype = pq_getmsgbyte(msg);
+
+	switch (msgtype)
+	{
+		case 'E':
+		case 'N':
+			{
+				ErrorData	edata;
+				ErrorContextCallback *save_error_context_stack;
+
+				/* Parse ErrorReponse or NoticeResponse. */
+				pq_parse_errornotice(msg, &edata);
+
+				/* Death of a worker isn't enough justification for suicide. */
+				edata.elevel = Min(edata.elevel, ERROR);
+
+				/*
+				 * Rethrow the error using the error context callbacks that
+				 * were in effect when the context was created, not the
+				 * current ones.
+				 */
+				save_error_context_stack = error_context_stack;
+				error_context_stack = pcxt->error_context_stack;
+				ThrowErrorData(&edata);
+				error_context_stack = save_error_context_stack;
+
+				break;
+			}
+
+		case 'A':
+			{
+				/* Propagate NotifyResponse. */
+				pq_putmessage(msg->data[0], &msg->data[1], msg->len - 1);
+				break;
+			}
+
+		case 'Z':
+			{
+				/* ReadyForQuery indicates that this worker exits cleanly. */
+				pfree(pcxt->worker[i].bgwhandle);
+				pfree(pcxt->worker[i].error_mqh);
+				pcxt->worker[i].bgwhandle = NULL;
+				pcxt->worker[i].error_mqh = NULL;
+				break;
+			}
+
+		default:
+			{
+				elog(ERROR, "unknown message type: %c (%d bytes)",
+					 msgtype, msg->len);
+			}
+	}
+}
+
+/*
+ * End-of-subtransaction cleanup for parallel contexts.
+ *
+ * Currently, it's forbidden to enter or leave a subtransaction while
+ * parallel mode is in effect, so we could just blow away everything.  But
+ * we may want to relax that restriction in the future, so this code
+ * contemplates that there may be multiple subtransaction IDs in pcxt_list.
+ */
+void
+AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId)
+{
+	HandlingParallelMessages = false;
+
+	while (!dlist_is_empty(&pcxt_list))
+	{
+		ParallelContext *pcxt;
+
+		pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
+		if (pcxt->subid != mySubId)
+			break;
+		if (isCommit)
+			elog(WARNING, "leaked parallel context");
+		DestroyParallelContext(pcxt);
+	}
+}
+
+/*
+ * End-of-transaction cleanup for parallel contexts.
+ */
+void
+AtEOXact_Parallel(bool isCommit)
+{
+	HandlingParallelMessages = false;
+
+	while (!dlist_is_empty(&pcxt_list))
+	{
+		ParallelContext *pcxt;
+
+		pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
+		if (isCommit)
+			elog(WARNING, "leaked parallel context");
+		DestroyParallelContext(pcxt);
+	}
+}
+
+/*
+ * Main entrypoint for parallel workers.
+ */
+static void
+ParallelMain(Datum main_arg)
+{
+	dsm_segment *seg;
+	shm_toc *toc;
+	FixedParallelState *fps;
+	char   *error_queue_space;
+	shm_mq *mq;
+	shm_mq_handle *mqh;
+	char   *gucspace;
+	char   *combocidspace;
+	char   *tsnapspace;
+	char   *asnapspace;
+	char   *tstatespace;
+	ErrorContextCallback errctx;
+
+	/* Establish signal handlers. */
+	pqsignal(SIGTERM, handle_sigterm);
+	BackgroundWorkerUnblockSignals();
+
+	/* Set up a memory context and resource owner. */
+	Assert(CurrentResourceOwner == NULL);
+	CurrentResourceOwner = ResourceOwnerCreate(NULL, "parallel toplevel");
+	CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext,
+												 "parallel worker",
+												 ALLOCSET_DEFAULT_MINSIZE,
+												 ALLOCSET_DEFAULT_INITSIZE,
+												 ALLOCSET_DEFAULT_MAXSIZE);
+
+	/*
+	 * Now that we have a resource owner, we can attach to the dynamic
+	 * shared memory segment and read the table of contents.
+	 */
+	seg = dsm_attach(DatumGetInt32(main_arg));
+	if (seg == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("unable to map dynamic shared memory segment")));
+	toc = shm_toc_attach(PARALLEL_MAGIC, dsm_segment_address(seg));
+	if (toc == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("bad magic number in dynamic shared memory segment")));
+
+	/* Determine and set our worker number. */
+	fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED);
+	Assert(fps != NULL);
+	Assert(ParallelWorkerNumber == -1);
+	SpinLockAcquire(&fps->mutex);
+	if (fps->workers_attached < fps->workers_expected)
+		ParallelWorkerNumber = fps->workers_attached++;
+	SpinLockRelease(&fps->mutex);
+	if (ParallelWorkerNumber < 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("too many parallel workers already attached")));
+
+	/*
+	 * Now that we have a worker number, we can find and attach to the error
+	 * queue provided for us.  That's good, because until we do that, any
+	 * errors that happen here will not be reported back to the process that
+	 * requested that this worker be launched.
+	 */
+	error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE);
+	mq = (shm_mq *) (error_queue_space +
+		ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE);
+	shm_mq_set_sender(mq, MyProc);
+	mqh = shm_mq_attach(mq, seg, NULL);
+	pq_redirect_to_shm_mq(mq, mqh);
+	pq_set_parallel_master(fps->parallel_master_pid,
+						   fps->parallel_master_backend_id);
+
+	/* Install an error-context callback. */
+	errctx.callback = ParallelErrorContext;
+	errctx.arg = NULL;
+	errctx.previous = error_context_stack;
+	error_context_stack = &errctx;
+
+	/*
+	 * Hooray! Primary initialization is complete.  Now, we need to set up
+	 * our backend-local state to match the original backend.
+	 */
+
+	/* Restore database connection. */
+	BackgroundWorkerInitializeConnectionByOid(fps->database_id,
+											  fps->authenticated_user_id);
+
+	/* Restore GUC values from launching backend. */
+	gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC);
+	Assert(gucspace != NULL);
+	StartTransactionCommand();
+	RestoreGUCState(gucspace);
+	CommitTransactionCommand();
+
+	/* Handle local_preload_libraries and session_preload_libraries. */
+	process_session_preload_libraries();
+
+	/* Crank up a transaction state appropriate to a parallel worker. */
+	tstatespace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_STATE);
+	StartParallelWorkerTransaction(tstatespace);
+
+	/* Restore combo CID state. */
+	combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID);
+	Assert(combocidspace != NULL);
+	RestoreComboCIDState(combocidspace);
+
+	/* Restore transaction snapshot. */
+	tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT);
+	Assert(tsnapspace != NULL);
+	RestoreTransactionSnapshot(RestoreSnapshot(tsnapspace),
+							   fps->parallel_master_pgproc);
+
+	/* Restore active snapshot. */
+	asnapspace = shm_toc_lookup(toc, PARALLEL_KEY_ACTIVE_SNAPSHOT);
+	Assert(asnapspace != NULL);
+	PushActiveSnapshot(RestoreSnapshot(asnapspace));
+
+	/* Restore user ID and security context. */
+	SetUserIdAndSecContext(fps->current_user_id, fps->sec_context);
+
+	/*
+	 * We've initialized all of our state now; nothing should change hereafter.
+	 */
+	EnterParallelMode();
+
+	/*
+	 * Time to do the real work: invoke the caller-supplied code.
+	 *
+	 * If you get a crash at this line, see the comments for
+	 * ParallelExtensionTrampoline.
+	 */
+	fps->entrypoint(seg, toc);
+
+	/* Must exit parallel mode to pop active snapshot. */
+	ExitParallelMode();
+
+	/* Must pop active snapshot so resowner.c doesn't complain. */
+	PopActiveSnapshot();
+
+	/* Shut down the parallel-worker transaction. */
+	EndParallelWorkerTransaction();
+
+	/* Report success. */
+	ReadyForQuery(DestRemote);
+}
+
+/*
+ * It's unsafe for the entrypoint invoked by ParallelMain to be a function
+ * living in a dynamically loaded module, because the module might not be
+ * loaded in every process, or might be loaded but not at the same address.
+ * To work around that problem, CreateParallelContextForExtension() arranges
+ * to call this function rather than calling the extension-provided function
+ * directly; and this function then looks up the real entrypoint and calls it.
+ */
+static void
+ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc)
+{
+	char   *extensionstate;
+	char   *library_name;
+	char   *function_name;
+	parallel_worker_main_type entrypt;
+
+	extensionstate = shm_toc_lookup(toc, PARALLEL_KEY_EXTENSION_TRAMPOLINE);
+	Assert(extensionstate != NULL);
+	library_name = extensionstate;
+	function_name = extensionstate + strlen(library_name) + 1;
+
+	entrypt = (parallel_worker_main_type)
+		load_external_function(library_name, function_name, true, NULL);
+	entrypt(seg, toc);
+}
+
+/*
+ * When we receive a SIGTERM, we set InterruptPending and ProcDiePending just
+ * like a normal backend.  The next CHECK_FOR_INTERRUPTS() will do the right
+ * thing.
+ */
+static void
+ParallelErrorContext(void *arg)
+{
+	errcontext("parallel worker, pid %d", MyProcPid);
+}
+
+/*
+ * When we receive a SIGTERM, we set InterruptPending and ProcDiePending just
+ * like a normal backend.  The next CHECK_FOR_INTERRUPTS() will do the right
+ * thing.
+ */
+static void
+handle_sigterm(SIGNAL_ARGS)
+{
+	int		save_errno = errno;
+
+	if (MyProc)
+		SetLatch(&MyProc->procLatch);
+
+	if (!proc_exit_inprogress)
+	{
+		InterruptPending = true;
+		ProcDiePending = true;
+	}
+
+	errno = save_errno;
+}
diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c
index c541156..92f657e 100644
--- a/src/backend/access/transam/varsup.c
+++ b/src/backend/access/transam/varsup.c
@@ -50,6 +50,13 @@ GetNewTransactionId(bool isSubXact)
 	TransactionId xid;
 
 	/*
+	 * Workers synchronize transaction state at the beginning of each parallel
+	 * operation, so we can't account for new XIDs after that point.
+	 */
+	if (IsInParallelMode())
+		elog(ERROR, "cannot assign TransactionIds during a parallel operation");
+
+	/*
 	 * During bootstrap initialization, we return the special bootstrap
 	 * transaction id.
 	 */
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 8b2f714..d27d184 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -22,6 +22,7 @@
 
 #include "access/commit_ts.h"
 #include "access/multixact.h"
+#include "access/parallel.h"
 #include "access/subtrans.h"
 #include "access/transam.h"
 #include "access/twophase.h"
@@ -49,6 +50,7 @@
 #include "storage/procarray.h"
 #include "storage/sinvaladt.h"
 #include "storage/smgr.h"
+#include "utils/builtins.h"
 #include "utils/catcache.h"
 #include "utils/combocid.h"
 #include "utils/guc.h"
@@ -76,6 +78,32 @@ bool		XactDeferrable;
 int			synchronous_commit = SYNCHRONOUS_COMMIT_ON;
 
 /*
+ * Only a single TransactionStateData is placed on the parallel worker's
+ * state stack, and the XID reflected there will be that of the *innermost*
+ * currently-active subtransaction in the backend that initiated paralllelism.
+ * However, GetTopTransactionId() and TransactionIdIsCurrentTransactionId()
+ * need to return the same answers in the parallel worker as they would have
+ * in the user backend, so we need some additional bookkeeping.
+ *
+ * XactTopTransactionId stores the XID of our toplevel transaction, which
+ * will be the same as TopTransactionState.transactionId in an ordinary
+ * backend; but in a parallel backend, which does not have the entire
+ * transaction state, it will instead be copied from the backend that started
+ * the parallel operation.
+ *
+ * nParallelCurrentXids will be 0 and ParallelCurrentXids NULL in an ordinary
+ * backend, but in a parallel backend, nParallelCurrentXids will contain the
+ * number of XIDs that need to be considered current, and ParallelCurrentXids
+ * will contain the XIDs themselves.  This includes all XIDs that were current
+ * or sub-committed in the parent at the time the parallel operation began.
+ * The XIDs are stored sorted in numerical order (not logical order) to make
+ * lookups as fast as possible.
+ */
+TransactionId	XactTopTransactionId = InvalidTransactionId;
+int				nParallelCurrentXids = 0;
+TransactionId  *ParallelCurrentXids;
+
+/*
  * MyXactAccessedTempRel is set when a temporary relation is accessed.
  * We don't allow PREPARE TRANSACTION in that case.  (This is global
  * so that it can be set from heapam.c.)
@@ -111,6 +139,7 @@ typedef enum TBlockState
 	/* transaction block states */
 	TBLOCK_BEGIN,				/* starting transaction block */
 	TBLOCK_INPROGRESS,			/* live transaction */
+	TBLOCK_PARALLEL_INPROGRESS,	/* live transaction inside parallel worker */
 	TBLOCK_END,					/* COMMIT received */
 	TBLOCK_ABORT,				/* failed xact, awaiting ROLLBACK */
 	TBLOCK_ABORT_END,			/* failed xact, ROLLBACK received */
@@ -152,6 +181,7 @@ typedef struct TransactionStateData
 	bool		prevXactReadOnly;		/* entry-time xact r/o state */
 	bool		startedInRecovery;		/* did we start in recovery? */
 	bool		didLogXid;		/* has xid been included in WAL record? */
+	bool		parallelMode;	/* current transaction in parallel operation? */
 	struct TransactionStateData *parent;		/* back link to parent */
 } TransactionStateData;
 
@@ -182,6 +212,7 @@ static TransactionStateData TopTransactionStateData = {
 	false,						/* entry-time xact r/o state */
 	false,						/* startedInRecovery */
 	false,						/* didLogXid */
+	false,						/* parallelMode */
 	NULL						/* link to parent state block */
 };
 
@@ -351,9 +382,9 @@ IsAbortedTransactionBlockState(void)
 TransactionId
 GetTopTransactionId(void)
 {
-	if (!TransactionIdIsValid(TopTransactionStateData.transactionId))
+	if (!TransactionIdIsValid(XactTopTransactionId))
 		AssignTransactionId(&TopTransactionStateData);
-	return TopTransactionStateData.transactionId;
+	return XactTopTransactionId;
 }
 
 /*
@@ -366,7 +397,7 @@ GetTopTransactionId(void)
 TransactionId
 GetTopTransactionIdIfAny(void)
 {
-	return TopTransactionStateData.transactionId;
+	return XactTopTransactionId;
 }
 
 /*
@@ -460,6 +491,13 @@ AssignTransactionId(TransactionState s)
 	Assert(s->state == TRANS_INPROGRESS);
 
 	/*
+	 * Workers synchronize transaction state at the beginning of each
+	 * parallel operation, so we can't account for new XIDs at this point.
+	 */
+	if (IsInParallelMode())
+		elog(ERROR, "cannot assign XIDs during a parallel operation");
+
+	/*
 	 * Ensure parent(s) have XIDs, so that a child always has an XID later
 	 * than its parent.  Musn't recurse here, or we might get a stack overflow
 	 * if we're at the bottom of a huge stack of subtransactions none of which
@@ -511,6 +549,8 @@ AssignTransactionId(TransactionState s)
 	 * the Xid as "running".  See GetNewTransactionId.
 	 */
 	s->transactionId = GetNewTransactionId(isSubXact);
+	if (!isSubXact)
+		XactTopTransactionId = s->transactionId;
 
 	if (isSubXact)
 		SubTransSetParent(s->transactionId, s->parent->transactionId, false);
@@ -642,7 +682,16 @@ GetCurrentCommandId(bool used)
 {
 	/* this is global to a transaction, not subtransaction-local */
 	if (used)
+	{
+		/*
+		 * Forbid setting currentCommandIdUsed in parallel mode, because we
+		 * have no provision for communicating this back to the master.  We
+		 * could relax this restriction when currentCommandIdUsed was already
+		 * true at the start of the parallel operation.
+		 */
+		Assert(!CurrentTransactionState->parallelMode);
 		currentCommandIdUsed = true;
+	}
 	return currentCommandId;
 }
 
@@ -736,6 +785,36 @@ TransactionIdIsCurrentTransactionId(TransactionId xid)
 		return false;
 
 	/*
+	 * In parallel workers, the XIDs we must consider as current are stored
+	 * in ParallelCurrentXids rather than the transaction-state stack.  Note
+	 * that the XIDs in this array are sorted numerically rather than
+	 * according to transactionIdPrecedes order.
+	 */
+	if (nParallelCurrentXids > 0)
+	{
+		int			low,
+					high;
+
+		low = 0;
+		high = nParallelCurrentXids - 1;
+		while (low <= high)
+		{
+			int			middle;
+			TransactionId probe;
+
+			middle = low + (high - low) / 2;
+			probe = ParallelCurrentXids[middle];
+			if (probe == xid)
+				return true;
+			else if (probe < xid)
+				low = middle + 1;
+			else
+				high = middle - 1;
+		}
+		return false;
+	}
+
+	/*
 	 * We will return true for the Xid of the current subtransaction, any of
 	 * its subcommitted children, any of its parents, or any of their
 	 * previously subcommitted children.  However, a transaction being aborted
@@ -789,6 +868,53 @@ TransactionStartedDuringRecovery(void)
 }
 
 /*
+ *	EnterParallelMode
+ */
+void
+EnterParallelMode(void)
+{
+	TransactionState s = CurrentTransactionState;
+
+	/*
+	 * Workers synchronize transaction state at the beginning of each
+	 * parallel operation, so we can't let the transaction state be changed
+	 * after that point.  That includes the parallel mode flag itself.
+	 */
+	Assert(!s->parallelMode);
+
+	s->parallelMode = true;
+}
+
+/*
+ *	ExitParallelMode
+ */
+void
+ExitParallelMode(void)
+{
+	TransactionState s = CurrentTransactionState;
+
+	Assert(s->parallelMode);
+	Assert(!ParallelContextActive());
+
+	s->parallelMode = false;
+}
+
+/*
+ *	IsInParallelMode
+ *
+ * Are we in a parallel operation, as either the master or a worker?  Check
+ * this to prohibit operations that change backend-local state expected to
+ * match across all workers.  Mere caches usually don't require such a
+ * restriction.  State modified in a strict push/pop fashion, such as the
+ * active snapshot stack, is often fine.
+ */
+bool
+IsInParallelMode(void)
+{
+	return CurrentTransactionState->parallelMode;
+}
+
+/*
  *	CommandCounterIncrement
  */
 void
@@ -802,6 +928,14 @@ CommandCounterIncrement(void)
 	 */
 	if (currentCommandIdUsed)
 	{
+		/*
+		 * Workers synchronize transaction state at the beginning of each
+		 * parallel operation, so we can't account for new commands after that
+		 * point.
+		 */
+		if (IsInParallelMode())
+			elog(ERROR, "cannot start commands during a parallel operation");
+
 		currentCommandId += 1;
 		if (currentCommandId == InvalidCommandId)
 		{
@@ -1705,6 +1839,8 @@ StartTransaction(void)
 	s = &TopTransactionStateData;
 	CurrentTransactionState = s;
 
+	Assert(XactTopTransactionId == InvalidTransactionId);
+
 	/*
 	 * check the current transaction state
 	 */
@@ -1834,6 +1970,9 @@ CommitTransaction(void)
 {
 	TransactionState s = CurrentTransactionState;
 	TransactionId latestXid;
+	bool		parallel;
+
+	parallel = (s->blockState == TBLOCK_PARALLEL_INPROGRESS);
 
 	ShowTransactionState("CommitTransaction");
 
@@ -1845,6 +1984,10 @@ CommitTransaction(void)
 			 TransStateAsString(s->state));
 	Assert(s->parent == NULL);
 
+	/* If we might have parallel workers, clean them up now. */
+	if (IsInParallelMode())
+		AtEOXact_Parallel(true);
+
 	/*
 	 * Do pre-commit processing that involves calling user-defined code, such
 	 * as triggers.  Since closing cursors could queue trigger actions,
@@ -1867,7 +2010,8 @@ CommitTransaction(void)
 			break;
 	}
 
-	CallXactCallbacks(XACT_EVENT_PRE_COMMIT);
+	CallXactCallbacks(parallel ? XACT_EVENT_PARALLEL_PRE_COMMIT
+					  : XACT_EVENT_PRE_COMMIT);
 
 	/*
 	 * The remaining actions cannot call any user-defined code, so it's safe
@@ -1915,9 +2059,13 @@ CommitTransaction(void)
 	s->state = TRANS_COMMIT;
 
 	/*
-	 * Here is where we really truly commit.
+	 * Unless we're in parallel mode, we need to mark our XIDs as committed
+	 * in pg_clog.  This is where durably commit.
 	 */
-	latestXid = RecordTransactionCommit();
+	if (parallel)
+		latestXid = InvalidTransactionId;
+	else
+		latestXid = RecordTransactionCommit();
 
 	TRACE_POSTGRESQL_TRANSACTION_COMMIT(MyProc->lxid);
 
@@ -1944,7 +2092,8 @@ CommitTransaction(void)
 	 * state.
 	 */
 
-	CallXactCallbacks(XACT_EVENT_COMMIT);
+	CallXactCallbacks(parallel ? XACT_EVENT_PARALLEL_COMMIT
+					  : XACT_EVENT_COMMIT);
 
 	ResourceOwnerRelease(TopTransactionResourceOwner,
 						 RESOURCE_RELEASE_BEFORE_LOCKS,
@@ -1992,7 +2141,8 @@ CommitTransaction(void)
 	AtEOXact_GUC(true, 1);
 	AtEOXact_SPI(true);
 	AtEOXact_on_commit_actions(true);
-	AtEOXact_Namespace(true);
+	if (!parallel)
+		AtEOXact_Namespace(true);
 	AtEOXact_SMgr();
 	AtEOXact_Files();
 	AtEOXact_ComboCid();
@@ -2017,6 +2167,9 @@ CommitTransaction(void)
 	s->nChildXids = 0;
 	s->maxChildXids = 0;
 
+	XactTopTransactionId = InvalidTransactionId;
+	nParallelCurrentXids = 0;
+
 	/*
 	 * done with commit processing, set current transaction state back to
 	 * default
@@ -2040,6 +2193,8 @@ PrepareTransaction(void)
 	GlobalTransaction gxact;
 	TimestampTz prepared_at;
 
+	Assert(!IsInParallelMode());
+
 	ShowTransactionState("PrepareTransaction");
 
 	/*
@@ -2284,6 +2439,9 @@ PrepareTransaction(void)
 	s->nChildXids = 0;
 	s->maxChildXids = 0;
 
+	XactTopTransactionId = InvalidTransactionId;
+	nParallelCurrentXids = 0;
+
 	/*
 	 * done with 1st phase commit processing, set current transaction state
 	 * back to default
@@ -2302,6 +2460,7 @@ AbortTransaction(void)
 {
 	TransactionState s = CurrentTransactionState;
 	TransactionId latestXid;
+	bool	parallel;
 
 	/* Prevent cancel/die interrupt while cleaning up */
 	HOLD_INTERRUPTS();
@@ -2350,12 +2509,20 @@ AbortTransaction(void)
 	/*
 	 * check the current transaction state
 	 */
+	parallel = (s->blockState == TBLOCK_PARALLEL_INPROGRESS);
 	if (s->state != TRANS_INPROGRESS && s->state != TRANS_PREPARE)
 		elog(WARNING, "AbortTransaction while in %s state",
 			 TransStateAsString(s->state));
 	Assert(s->parent == NULL);
 
 	/*
+	 * If we might have parallel workers, send them all termination signals,
+	 * and wait for them to die.
+	 */
+	if (IsInParallelMode())
+		AtEOXact_Parallel(false);
+
+	/*
 	 * set the current transaction state information appropriately during the
 	 * abort processing
 	 */
@@ -2385,9 +2552,14 @@ AbortTransaction(void)
 
 	/*
 	 * Advertise the fact that we aborted in pg_clog (assuming that we got as
-	 * far as assigning an XID to advertise).
+	 * far as assigning an XID to advertise).  But if we're inside a parallel
+	 * worker, skip this; the user backend must be the one to write the abort
+	 * record.
 	 */
-	latestXid = RecordTransactionAbort(false);
+	if (parallel)
+		latestXid = InvalidTransactionId;
+	else
+		latestXid = RecordTransactionAbort(false);
 
 	TRACE_POSTGRESQL_TRANSACTION_ABORT(MyProc->lxid);
 
@@ -2405,7 +2577,10 @@ AbortTransaction(void)
 	 */
 	if (TopTransactionResourceOwner != NULL)
 	{
-		CallXactCallbacks(XACT_EVENT_ABORT);
+		if (parallel)
+			CallXactCallbacks(XACT_EVENT_PARALLEL_ABORT);
+		else
+			CallXactCallbacks(XACT_EVENT_ABORT);
 
 		ResourceOwnerRelease(TopTransactionResourceOwner,
 							 RESOURCE_RELEASE_BEFORE_LOCKS,
@@ -2426,7 +2601,8 @@ AbortTransaction(void)
 		AtEOXact_GUC(false, 1);
 		AtEOXact_SPI(false);
 		AtEOXact_on_commit_actions(false);
-		AtEOXact_Namespace(false);
+		if (!parallel)
+			AtEOXact_Namespace(false);
 		AtEOXact_SMgr();
 		AtEOXact_Files();
 		AtEOXact_ComboCid();
@@ -2478,6 +2654,10 @@ CleanupTransaction(void)
 	s->childXids = NULL;
 	s->nChildXids = 0;
 	s->maxChildXids = 0;
+	s->parallelMode = false;
+
+	XactTopTransactionId = InvalidTransactionId;
+	nParallelCurrentXids = 0;
 
 	/*
 	 * done with abort processing, set current transaction state back to
@@ -2531,6 +2711,7 @@ StartTransactionCommand(void)
 			/* These cases are invalid. */
 		case TBLOCK_STARTED:
 		case TBLOCK_BEGIN:
+		case TBLOCK_PARALLEL_INPROGRESS:
 		case TBLOCK_SUBBEGIN:
 		case TBLOCK_END:
 		case TBLOCK_SUBRELEASE:
@@ -2566,11 +2747,13 @@ CommitTransactionCommand(void)
 	switch (s->blockState)
 	{
 			/*
-			 * This shouldn't happen, because it means the previous
+			 * These shouldn't happen.  TBLOCK_DEFAULT means the previous
 			 * StartTransactionCommand didn't set the STARTED state
-			 * appropriately.
+			 * appropriately, while TBLOCK_PARALLEL_INPROGRESS should be ended
+			 * by EndParallelWorkerTranaction(), not this function.
 			 */
 		case TBLOCK_DEFAULT:
+		case TBLOCK_PARALLEL_INPROGRESS:
 			elog(FATAL, "CommitTransactionCommand: unexpected state %s",
 				 BlockStateAsString(s->blockState));
 			break;
@@ -2852,6 +3035,7 @@ AbortCurrentTransaction(void)
 			 * ABORT state.  We will stay in ABORT until we get a ROLLBACK.
 			 */
 		case TBLOCK_INPROGRESS:
+		case TBLOCK_PARALLEL_INPROGRESS:
 			AbortTransaction();
 			s->blockState = TBLOCK_ABORT;
 			/* CleanupTransaction happens when we exit TBLOCK_ABORT_END */
@@ -3241,6 +3425,7 @@ BeginTransactionBlock(void)
 			 * Already a transaction block in progress.
 			 */
 		case TBLOCK_INPROGRESS:
+		case TBLOCK_PARALLEL_INPROGRESS:
 		case TBLOCK_SUBINPROGRESS:
 		case TBLOCK_ABORT:
 		case TBLOCK_SUBABORT:
@@ -3418,6 +3603,16 @@ EndTransactionBlock(void)
 			result = true;
 			break;
 
+			/*
+			 * The user issued a COMMIT that somehow ran inside a parallel
+			 * worker.  We can't cope with that.
+			 */
+		case TBLOCK_PARALLEL_INPROGRESS:
+			ereport(FATAL,
+					(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+					 errmsg("cannot commit during a parallel operation")));
+			break;
+
 			/* These cases are invalid. */
 		case TBLOCK_DEFAULT:
 		case TBLOCK_BEGIN:
@@ -3511,6 +3706,16 @@ UserAbortTransactionBlock(void)
 			s->blockState = TBLOCK_ABORT_PENDING;
 			break;
 
+			/*
+			 * The user issued an ABORT that somehow ran inside a parallel
+			 * worker.  We can't cope with that.
+			 */
+		case TBLOCK_PARALLEL_INPROGRESS:
+			ereport(FATAL,
+					(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+					 errmsg("cannot abort during a parallel operation")));
+			break;
+
 			/* These cases are invalid. */
 		case TBLOCK_DEFAULT:
 		case TBLOCK_BEGIN:
@@ -3540,6 +3745,18 @@ DefineSavepoint(char *name)
 {
 	TransactionState s = CurrentTransactionState;
 
+	/*
+	 * Workers synchronize transaction state at the beginning of each parallel
+	 * operation, so we can't account for new subtransactions after that
+	 * point.  (Note that this check will certainly error out if s->blockState
+	 * is TBLOCK_PARALLEL_INPROGRESS, so we can treat that as an invalid case
+	 * below.)
+	 */
+	if (IsInParallelMode())
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+				 errmsg("cannot define savepoints during a parallel operation")));
+
 	switch (s->blockState)
 	{
 		case TBLOCK_INPROGRESS:
@@ -3560,6 +3777,7 @@ DefineSavepoint(char *name)
 		case TBLOCK_DEFAULT:
 		case TBLOCK_STARTED:
 		case TBLOCK_BEGIN:
+		case TBLOCK_PARALLEL_INPROGRESS:
 		case TBLOCK_SUBBEGIN:
 		case TBLOCK_END:
 		case TBLOCK_SUBRELEASE:
@@ -3594,6 +3812,18 @@ ReleaseSavepoint(List *options)
 	ListCell   *cell;
 	char	   *name = NULL;
 
+	/*
+	 * Workers synchronize transaction state at the beginning of each parallel
+	 * operation, so we can't account for transaction state change after that
+	 * point.  (Note that this check will certainly error out if s->blockState
+	 * is TBLOCK_PARALLEL_INPROGRESS, so we can treat that as an invalid case
+	 * below.)
+	 */
+	if (IsInParallelMode())
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+				 errmsg("cannot release savepoints during a parallel operation")));
+
 	switch (s->blockState)
 	{
 			/*
@@ -3617,6 +3847,7 @@ ReleaseSavepoint(List *options)
 		case TBLOCK_DEFAULT:
 		case TBLOCK_STARTED:
 		case TBLOCK_BEGIN:
+		case TBLOCK_PARALLEL_INPROGRESS:
 		case TBLOCK_SUBBEGIN:
 		case TBLOCK_END:
 		case TBLOCK_SUBRELEASE:
@@ -3694,6 +3925,18 @@ RollbackToSavepoint(List *options)
 	ListCell   *cell;
 	char	   *name = NULL;
 
+	/*
+	 * Workers synchronize transaction state at the beginning of each parallel
+	 * operation, so we can't account for transaction state change after that
+	 * point.  (Note that this check will certainly error out if s->blockState
+	 * is TBLOCK_PARALLEL_INPROGRESS, so we can treat that as an invalid case
+	 * below.)
+	 */
+	if (IsInParallelMode())
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+				 errmsg("cannot rollback to savepoints during a parallel operation")));
+
 	switch (s->blockState)
 	{
 			/*
@@ -3718,6 +3961,7 @@ RollbackToSavepoint(List *options)
 		case TBLOCK_DEFAULT:
 		case TBLOCK_STARTED:
 		case TBLOCK_BEGIN:
+		case TBLOCK_PARALLEL_INPROGRESS:
 		case TBLOCK_SUBBEGIN:
 		case TBLOCK_END:
 		case TBLOCK_SUBRELEASE:
@@ -3806,6 +4050,20 @@ BeginInternalSubTransaction(char *name)
 {
 	TransactionState s = CurrentTransactionState;
 
+	/*
+	 * Workers synchronize transaction state at the beginning of each parallel
+	 * operation, so we can't account for new subtransactions after that point.
+	 * We might be able to make an exception for the type of subtransaction
+	 * established by this function, which is typically used in contexts where
+	 * we're going to release or roll back the subtransaction before proceeding
+	 * further, so that no enduring change to the transaction state occurs.
+	 * For now, however, we prohibit this case along with all the others.
+	 */
+	if (IsInParallelMode())
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+				 errmsg("cannot start subtransactions during a parallel operation")));
+
 	switch (s->blockState)
 	{
 		case TBLOCK_STARTED:
@@ -3828,6 +4086,7 @@ BeginInternalSubTransaction(char *name)
 			/* These cases are invalid. */
 		case TBLOCK_DEFAULT:
 		case TBLOCK_BEGIN:
+		case TBLOCK_PARALLEL_INPROGRESS:
 		case TBLOCK_SUBBEGIN:
 		case TBLOCK_SUBRELEASE:
 		case TBLOCK_SUBCOMMIT:
@@ -3860,6 +4119,18 @@ ReleaseCurrentSubTransaction(void)
 {
 	TransactionState s = CurrentTransactionState;
 
+	/*
+	 * Workers synchronize transaction state at the beginning of each parallel
+	 * operation, so we can't account for commit of subtransactions after that
+	 * point.  This should not happen anyway.  Code calling this would
+	 * typically have called BeginInternalSubTransaction() first, failing
+	 * there.
+	 */
+	if (IsInParallelMode())
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+				 errmsg("cannot commit subtransactions during a parallel operation")));
+
 	if (s->blockState != TBLOCK_SUBINPROGRESS)
 		elog(ERROR, "ReleaseCurrentSubTransaction: unexpected state %s",
 			 BlockStateAsString(s->blockState));
@@ -3882,6 +4153,14 @@ RollbackAndReleaseCurrentSubTransaction(void)
 {
 	TransactionState s = CurrentTransactionState;
 
+	/*
+	 * Unlike ReleaseCurrentSubTransaction(), this is nominally permitted
+	 * during parallel operations.  That's because we may be in the master,
+	 * recovering from an error thrown while we were in parallel mode.  We
+	 * won't reach here in a worker, because BeginInternalSubTransaction()
+	 * will have failed.
+	 */
+
 	switch (s->blockState)
 	{
 			/* Must be in a subtransaction */
@@ -3893,6 +4172,7 @@ RollbackAndReleaseCurrentSubTransaction(void)
 		case TBLOCK_DEFAULT:
 		case TBLOCK_STARTED:
 		case TBLOCK_BEGIN:
+		case TBLOCK_PARALLEL_INPROGRESS:
 		case TBLOCK_SUBBEGIN:
 		case TBLOCK_INPROGRESS:
 		case TBLOCK_END:
@@ -3968,6 +4248,7 @@ AbortOutOfAnyTransaction(void)
 			case TBLOCK_STARTED:
 			case TBLOCK_BEGIN:
 			case TBLOCK_INPROGRESS:
+			case TBLOCK_PARALLEL_INPROGRESS:
 			case TBLOCK_END:
 			case TBLOCK_ABORT_PENDING:
 			case TBLOCK_PREPARE:
@@ -4059,6 +4340,7 @@ TransactionBlockStatusCode(void)
 		case TBLOCK_BEGIN:
 		case TBLOCK_SUBBEGIN:
 		case TBLOCK_INPROGRESS:
+		case TBLOCK_PARALLEL_INPROGRESS:
 		case TBLOCK_SUBINPROGRESS:
 		case TBLOCK_END:
 		case TBLOCK_SUBRELEASE:
@@ -4157,6 +4439,13 @@ CommitSubTransaction(void)
 		elog(WARNING, "CommitSubTransaction while in %s state",
 			 TransStateAsString(s->state));
 
+	/* Exit from parallel mode, if necessary. */
+	if (IsInParallelMode())
+	{
+		AtEOSubXact_Parallel(true, s->subTransactionId);
+		s->parallelMode = false;
+	}
+
 	/* Pre-commit processing goes here */
 
 	CallSubXactCallbacks(SUBXACT_EVENT_PRE_COMMIT_SUB, s->subTransactionId,
@@ -4315,6 +4604,13 @@ AbortSubTransaction(void)
 	 */
 	SetUserIdAndSecContext(s->prevUser, s->prevSecContext);
 
+	/* Exit from parallel mode, if necessary. */
+	if (IsInParallelMode())
+	{
+		AtEOSubXact_Parallel(false, s->subTransactionId);
+		s->parallelMode = false;
+	}
+
 	/*
 	 * We can skip all this stuff if the subxact failed before creating a
 	 * ResourceOwner...
@@ -4455,6 +4751,7 @@ PushTransaction(void)
 	s->blockState = TBLOCK_SUBBEGIN;
 	GetUserIdAndSecContext(&s->prevUser, &s->prevSecContext);
 	s->prevXactReadOnly = XactReadOnly;
+	s->parallelMode = false;
 
 	CurrentTransactionState = s;
 
@@ -4502,6 +4799,134 @@ PopTransaction(void)
 }
 
 /*
+ * EstimateTransactionStateSpace
+ *		Estimate the amount of space that will be needed by
+ *		SerializeTransactionState.  It would be OK to overestimate slightly,
+ *		but it's simple for us to work out the precise value, so we do.
+ */
+Size
+EstimateTransactionStateSpace(void)
+{
+	TransactionState s;
+	Size	nxids = 3;	/* top XID, current XID, count of XIDs */
+
+	for (s = CurrentTransactionState; s != NULL; s = s->parent)
+	{
+		if (TransactionIdIsValid(s->transactionId))
+			nxids = add_size(nxids, 1);
+		nxids = add_size(nxids, s->nChildXids);
+	}
+
+	nxids = add_size(nxids, nParallelCurrentXids);
+	return mul_size(nxids, sizeof(TransactionId));
+}
+
+/*
+ * SerializeTransactionState
+ *		Write out relevant details of our transaction state that will be
+ *		needed by a parallel worker.
+ *
+ * Currently, the only information we attempt to save and restore here is
+ * the XIDs associated with this transaction.  The first eight bytes of the
+ * result contain the XID of the top-level transaction and the XID of the
+ * current transaction (or, in each case, InvalidTransactionId if none).
+ * The next 4 bytes contain a count of how many additional XIDs follow;
+ * this is followed by all of those XIDs one after another.  We emit the XIDs
+ * in sorted order for the convenience of the receiving process.
+ */
+void
+SerializeTransactionState(Size maxsize, char *start_address)
+{
+	TransactionState s;
+	Size	nxids = 0;
+	Size	i = 0;
+	TransactionId *workspace;
+	TransactionId *result = (TransactionId *) start_address;
+
+	Assert(maxsize >= 3 * sizeof(TransactionId));
+	result[0] = XactTopTransactionId;
+	result[1] = CurrentTransactionState->transactionId;
+
+	/*
+	 * If we're running in a parallel worker and launching a parallel worker
+	 * of our own, we can just pass along the information that was passed to
+	 * us.
+	 */
+	if (nParallelCurrentXids > 0)
+	{
+		Assert(maxsize > (nParallelCurrentXids + 2) * sizeof(TransactionId));
+		result[2] = nParallelCurrentXids;
+		memcpy(&result[3], ParallelCurrentXids,
+			   nParallelCurrentXids * sizeof(TransactionId));
+		return;
+	}
+
+	/*
+	 * OK, we need to generate a sorted list of XIDs that our workers
+	 * should view as current.  First, figure out how many there are.
+	 */
+	for (s = CurrentTransactionState; s != NULL; s = s->parent)
+	{
+		if (TransactionIdIsValid(s->transactionId))
+			nxids = add_size(nxids, 1);
+		nxids = add_size(nxids, s->nChildXids);
+	}
+	Assert(nxids * sizeof(TransactionId) < maxsize);
+
+	/* Copy them to our scratch space. */
+	workspace = palloc(nxids * sizeof(TransactionId));
+	for (s = CurrentTransactionState; s != NULL; s = s->parent)
+	{
+		if (TransactionIdIsValid(s->transactionId))
+			workspace[i++] = s->transactionId;
+		memcpy(&workspace[i], s->childXids,
+			   s->nChildXids * sizeof(TransactionId));
+		i += s->nChildXids;
+	}
+	Assert(i == nxids);
+
+	/* Sort them. */
+	qsort(workspace, nxids, sizeof(TransactionId), xidComparator);
+
+	/* Copy data into output area. */
+	result[2] = (TransactionId) nxids;
+	memcpy(&result[3], workspace, nxids * sizeof(TransactionId));
+}
+
+/*
+ * StartParallelWorkerTransaction
+ *		Start a parallel worker transaction, restoring the relevant
+ *		transaction state serialized by SerializeTransactionState.
+ */
+void
+StartParallelWorkerTransaction(char *tstatespace)
+{
+	TransactionId *tstate = (TransactionId *) tstatespace;
+
+	Assert(CurrentTransactionState->blockState == TBLOCK_DEFAULT);
+	StartTransaction();
+
+	XactTopTransactionId = tstate[0];
+	CurrentTransactionState->transactionId = tstate[1];
+	nParallelCurrentXids = (int) tstate[2];
+	ParallelCurrentXids = &tstate[3];
+
+	CurrentTransactionState->blockState = TBLOCK_PARALLEL_INPROGRESS;
+}
+
+/*
+ * EndParallelWorkerTransaction
+ *		End a parallel worker transaction.
+ */
+void
+EndParallelWorkerTransaction(void)
+{
+	Assert(CurrentTransactionState->blockState == TBLOCK_PARALLEL_INPROGRESS);
+	CommitTransaction();
+	CurrentTransactionState->blockState = TBLOCK_DEFAULT;
+}
+
+/*
  * ShowTransactionState
  *		Debug support
  */
@@ -4571,6 +4996,8 @@ BlockStateAsString(TBlockState blockState)
 			return "BEGIN";
 		case TBLOCK_INPROGRESS:
 			return "INPROGRESS";
+		case TBLOCK_PARALLEL_INPROGRESS:
+			return "PARALLEL_INPROGRESS";
 		case TBLOCK_END:
 			return "END";
 		case TBLOCK_ABORT:
diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c
index ed2b05a..5398b70 100644
--- a/src/backend/bootstrap/bootstrap.c
+++ b/src/backend/bootstrap/bootstrap.c
@@ -476,7 +476,7 @@ BootstrapModeMain(void)
 	 */
 	InitProcess();
 
-	InitPostgres(NULL, InvalidOid, NULL, NULL);
+	InitPostgres(NULL, InvalidOid, NULL, InvalidOid, NULL);
 
 	/* Initialize stuff for bootstrap-file processing */
 	for (i = 0; i < MAXATTR; i++)
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 8b1c727..0cc35b4 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -914,9 +914,10 @@ DoCopy(const CopyStmt *stmt, const char *queryString, uint64 *processed)
 	{
 		Assert(rel);
 
-		/* check read-only transaction */
+		/* check read-only transaction and parallel mode */
 		if (XactReadOnly && !rel->rd_islocaltemp)
 			PreventCommandIfReadOnly("COPY FROM");
+		PreventCommandIfParallelMode("COPY FROM");
 
 		cstate = BeginCopyFrom(rel, stmt->filename, stmt->is_program,
 							   stmt->attlist, stmt->options);
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index 811e1d4..d9b0aed 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -551,6 +551,13 @@ nextval_internal(Oid relid)
 	if (!seqrel->rd_islocaltemp)
 		PreventCommandIfReadOnly("nextval()");
 
+	/*
+	 * Forbid this during parallel operation because, to make it work,
+	 * the cooperating backends would need to share the backend-local cached
+	 * sequence information.  Currently, we don't support that.
+	 */
+	PreventCommandIfParallelMode("nextval()");
+
 	if (elm->last != elm->cached)		/* some numbers were cached */
 	{
 		Assert(elm->last_valid);
@@ -838,6 +845,13 @@ do_setval(Oid relid, int64 next, bool iscalled)
 	if (!seqrel->rd_islocaltemp)
 		PreventCommandIfReadOnly("setval()");
 
+	/*
+	 * Forbid this during parallel operation because, to make it work,
+	 * the cooperating backends would need to share the backend-local cached
+	 * sequence information.  Currently, we don't support that.
+	 */
+	PreventCommandIfParallelMode("setval()");
+
 	/* lock page' buffer and read tuple */
 	seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple);
 
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 8c799d3..9223f5e 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -135,8 +135,20 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
 	/*
 	 * If the transaction is read-only, we need to check if any writes are
 	 * planned to non-temporary tables.  EXPLAIN is considered read-only.
+	 *
+	 * Don't allow writes in parallel mode.  Supporting UPDATE and DELETE would
+	 * require (a) storing the combocid hash in shared memory, rather than
+	 * synchronizing it just once at the start of parallelism, and (b) an
+	 * alternative to heap_update()'s reliance on xmax for mutual exclusion.
+	 * INSERT may have no such troubles, but we forbid it to simplify the
+	 * checks.
+	 *
+	 * We have lower-level defenses in CommandCounterIncrement and elsewhere
+	 * against performing unsafe operations in parallel mode, but this gives
+	 * a more user-friendly error message.
 	 */
-	if (XactReadOnly && !(eflags & EXEC_FLAG_EXPLAIN_ONLY))
+	if ((XactReadOnly || IsInParallelMode()) &&
+		!(eflags & EXEC_FLAG_EXPLAIN_ONLY))
 		ExecCheckXactReadOnly(queryDesc->plannedstmt);
 
 	/*
@@ -679,18 +691,23 @@ ExecCheckRTEPerms(RangeTblEntry *rte)
 }
 
 /*
- * Check that the query does not imply any writes to non-temp tables.
+ * Check that the query does not imply any writes to non-temp tables;
+ * unless we're in parallel mode, in which case don't even allow writes
+ * to temp tables.
  *
  * Note: in a Hot Standby slave this would need to reject writes to temp
- * tables as well; but an HS slave can't have created any temp tables
- * in the first place, so no need to check that.
+ * tables just as we do in parallel mode; but an HS slave can't have created
+ * any temp tables in the first place, so no need to check that.
  */
 static void
 ExecCheckXactReadOnly(PlannedStmt *plannedstmt)
 {
 	ListCell   *l;
 
-	/* Fail if write permissions are requested on any non-temp table */
+	/*
+	 * Fail if write permissions are requested in parallel mode for
+	 * table (temp or non-temp), otherwise fail for any non-temp table.
+	 */
 	foreach(l, plannedstmt->rtable)
 	{
 		RangeTblEntry *rte = (RangeTblEntry *) lfirst(l);
@@ -701,6 +718,8 @@ ExecCheckXactReadOnly(PlannedStmt *plannedstmt)
 		if ((rte->requiredPerms & (~ACL_SELECT)) == 0)
 			continue;
 
+		PreventCommandIfParallelMode(CreateCommandTag((Node *) plannedstmt));
+
 		if (isTempNamespace(get_rel_namespace(rte->relid)))
 			continue;
 
diff --git a/src/backend/executor/functions.c b/src/backend/executor/functions.c
index 4d11260..62b615a 100644
--- a/src/backend/executor/functions.c
+++ b/src/backend/executor/functions.c
@@ -513,6 +513,9 @@ init_execution_state(List *queryTree_list,
 					   errmsg("%s is not allowed in a non-volatile function",
 							  CreateCommandTag(stmt))));
 
+			if (IsInParallelMode() && !CommandIsReadOnly(stmt))
+				PreventCommandIfParallelMode(CreateCommandTag(stmt));
+
 			/* OK, build the execution_state for this query */
 			newes = (execution_state *) palloc(sizeof(execution_state));
 			if (preves)
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index cfa4a24..72d55c7 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -23,6 +23,7 @@
 #include "commands/trigger.h"
 #include "executor/executor.h"
 #include "executor/spi_priv.h"
+#include "miscadmin.h"
 #include "tcop/pquery.h"
 #include "tcop/utility.h"
 #include "utils/builtins.h"
@@ -1322,13 +1323,14 @@ SPI_cursor_open_internal(const char *name, SPIPlanPtr plan,
 	}
 
 	/*
-	 * If told to be read-only, we'd better check for read-only queries. This
-	 * can't be done earlier because we need to look at the finished, planned
-	 * queries.  (In particular, we don't want to do it between GetCachedPlan
-	 * and PortalDefineQuery, because throwing an error between those steps
-	 * would result in leaking our plancache refcount.)
+	 * If told to be read-only, or in parallel mode, verify that this query
+	 * is in fact read-only.  This can't be done earlier because we need to
+	 * look at the finished, planned queries.  (In particular, we don't want
+	 * to do it between GetCachedPlan and PortalDefineQuery, because throwing
+	 * an error between those steps would result in leaking our plancache
+	 * refcount.)
 	 */
-	if (read_only)
+	if (read_only || IsInParallelMode())
 	{
 		ListCell   *lc;
 
@@ -1337,11 +1339,16 @@ SPI_cursor_open_internal(const char *name, SPIPlanPtr plan,
 			Node	   *pstmt = (Node *) lfirst(lc);
 
 			if (!CommandIsReadOnly(pstmt))
-				ereport(ERROR,
-						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-				/* translator: %s is a SQL statement name */
-					   errmsg("%s is not allowed in a non-volatile function",
-							  CreateCommandTag(pstmt))));
+			{
+				if (read_only)
+					ereport(ERROR,
+							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					/* translator: %s is a SQL statement name */
+							 errmsg("%s is not allowed in a non-volatile function",
+									CreateCommandTag(pstmt))));
+				else
+					PreventCommandIfParallelMode(CreateCommandTag(pstmt));
+			}
 		}
 	}
 
@@ -2129,6 +2136,9 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 					   errmsg("%s is not allowed in a non-volatile function",
 							  CreateCommandTag(stmt))));
 
+			if (IsInParallelMode() && !CommandIsReadOnly(stmt))
+				PreventCommandIfParallelMode(CreateCommandTag(stmt));
+
 			/*
 			 * If not read-only mode, advance the command counter before each
 			 * command and update the snapshot.
diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c
index 6e6b429..b07978c 100644
--- a/src/backend/libpq/pqmq.c
+++ b/src/backend/libpq/pqmq.c
@@ -16,12 +16,15 @@
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
 #include "libpq/pqmq.h"
+#include "miscadmin.h"
 #include "tcop/tcopprot.h"
 #include "utils/builtins.h"
 
 static shm_mq *pq_mq;
 static shm_mq_handle *pq_mq_handle;
 static bool pq_mq_busy = false;
+static pid_t pq_mq_parallel_master_pid = 0;
+static pid_t pq_mq_parallel_master_backend_id = InvalidBackendId;
 
 static void mq_comm_reset(void);
 static int	mq_flush(void);
@@ -57,6 +60,18 @@ pq_redirect_to_shm_mq(shm_mq *mq, shm_mq_handle *mqh)
 	FrontendProtocol = PG_PROTOCOL_LATEST;
 }
 
+/*
+ * Arrange to SendProcSignal() to the parallel master each time we transmit
+ * message data via the shm_mq.
+ */
+void
+pq_set_parallel_master(pid_t pid, BackendId backend_id)
+{
+	Assert(PqCommMethods == &PqCommMqMethods);
+	pq_mq_parallel_master_pid = pid;
+	pq_mq_parallel_master_backend_id = backend_id;
+}
+
 static void
 mq_comm_reset(void)
 {
@@ -120,7 +135,23 @@ mq_putmessage(char msgtype, const char *s, size_t len)
 	iov[1].len = len;
 
 	Assert(pq_mq_handle != NULL);
-	result = shm_mq_sendv(pq_mq_handle, iov, 2, false);
+
+	for (;;)
+	{
+		result = shm_mq_sendv(pq_mq_handle, iov, 2, true);
+
+		if (pq_mq_parallel_master_pid != 0)
+			SendProcSignal(pq_mq_parallel_master_pid,
+						   PROCSIG_PARALLEL_MESSAGE,
+						   pq_mq_parallel_master_backend_id);
+
+		if (result != SHM_MQ_WOULD_BLOCK)
+			break;
+
+		WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0);
+		CHECK_FOR_INTERRUPTS();
+		ResetLatch(&MyProc->procLatch);
+	}
 
 	pq_mq_busy = false;
 
diff --git a/src/backend/postmaster/autovacuum.c b/src/backend/postmaster/autovacuum.c
index 675f985..637749a 100644
--- a/src/backend/postmaster/autovacuum.c
+++ b/src/backend/postmaster/autovacuum.c
@@ -471,7 +471,7 @@ AutoVacLauncherMain(int argc, char *argv[])
 	InitProcess();
 #endif
 
-	InitPostgres(NULL, InvalidOid, NULL, NULL);
+	InitPostgres(NULL, InvalidOid, NULL, InvalidOid, NULL);
 
 	SetProcessingMode(NormalProcessing);
 
@@ -1664,7 +1664,7 @@ AutoVacWorkerMain(int argc, char *argv[])
 		 * Note: if we have selected a just-deleted database (due to using
 		 * stale stats info), we'll fail and exit here.
 		 */
-		InitPostgres(NULL, dbid, NULL, dbname);
+		InitPostgres(NULL, dbid, NULL, InvalidOid, dbname);
 		SetProcessingMode(NormalProcessing);
 		set_ps_display(dbname, false);
 		ereport(DEBUG1,
diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c
index 85a3b3a..b5a70d7 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -984,6 +984,56 @@ WaitForBackgroundWorkerStartup(BackgroundWorkerHandle *handle, pid_t *pidp)
 }
 
 /*
+ * Wait for a background worker to stop.
+ *
+ * If the worker hasn't yet started, or is running, we wait for it to stop
+ * and then return BGWH_STOPPED.  However, if the postmaster has died, we give
+ * up and return BGWH_POSTMASTER_DIED, because it's the postmaster that
+ * notifies us when a worker's state changes.
+ */
+BgwHandleStatus
+WaitForBackgroundWorkerShutdown(BackgroundWorkerHandle *handle)
+{
+	BgwHandleStatus status;
+	int			rc;
+	bool		save_set_latch_on_sigusr1;
+
+	save_set_latch_on_sigusr1 = set_latch_on_sigusr1;
+	set_latch_on_sigusr1 = true;
+
+	PG_TRY();
+	{
+		for (;;)
+		{
+			pid_t		pid;
+
+			CHECK_FOR_INTERRUPTS();
+
+			status = GetBackgroundWorkerPid(handle, &pid);
+			if (status == BGWH_STOPPED)
+				return status;
+
+			rc = WaitLatch(&MyProc->procLatch,
+						   WL_LATCH_SET | WL_POSTMASTER_DEATH, 0);
+
+			if (rc & WL_POSTMASTER_DEATH)
+				return BGWH_POSTMASTER_DIED;
+
+			ResetLatch(&MyProc->procLatch);
+		}
+	}
+	PG_CATCH();
+	{
+		set_latch_on_sigusr1 = save_set_latch_on_sigusr1;
+		PG_RE_THROW();
+	}
+	PG_END_TRY();
+
+	set_latch_on_sigusr1 = save_set_latch_on_sigusr1;
+	return status;
+}
+
+/*
  * Instruct the postmaster to terminate a background worker.
  *
  * Note that it's safe to do this without regard to whether the worker is
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 5106f52..451f814 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -5305,7 +5305,30 @@ BackgroundWorkerInitializeConnection(char *dbname, char *username)
 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
 				 errmsg("database connection requirement not indicated during registration")));
 
-	InitPostgres(dbname, InvalidOid, username, NULL);
+	InitPostgres(dbname, InvalidOid, username, InvalidOid, NULL);
+
+	/* it had better not gotten out of "init" mode yet */
+	if (!IsInitProcessingMode())
+		ereport(ERROR,
+				(errmsg("invalid processing mode in background worker")));
+	SetProcessingMode(NormalProcessing);
+}
+
+/*
+ * Connect background worker to a database using OIDs.
+ */
+void
+BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid)
+{
+	BackgroundWorker *worker = MyBgworkerEntry;
+
+	/* XXX is this the right errcode? */
+	if (!(worker->bgw_flags & BGWORKER_BACKEND_DATABASE_CONNECTION))
+		ereport(FATAL,
+				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+				 errmsg("database connection requirement not indicated during registration")));
+
+	InitPostgres(NULL, dboid, NULL, useroid, NULL);
 
 	/* it had better not gotten out of "init" mode yet */
 	if (!IsInitProcessingMode())
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index d953545..7b7b57a 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -1686,6 +1686,50 @@ ProcArrayInstallImportedXmin(TransactionId xmin, TransactionId sourcexid)
 }
 
 /*
+ * ProcArrayInstallRestoredXmin -- install restored xmin into MyPgXact->xmin
+ *
+ * This is like ProcArrayInstallImportedXmin, but we have a pointer to the
+ * PGPROC of the transaction from which we imported the snapshot, rather than
+ * an XID.
+ *
+ * Returns TRUE if successful, FALSE if source xact is no longer running.
+ */
+bool
+ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc)
+{
+	bool		result = false;
+	TransactionId xid;
+	volatile PGXACT *pgxact;
+
+	Assert(TransactionIdIsNormal(xmin));
+	Assert(proc != NULL);
+
+	/* Get lock so source xact can't end while we're doing this */
+	LWLockAcquire(ProcArrayLock, LW_SHARED);
+
+	pgxact = &allPgXact[proc->pgprocno];
+
+	/*
+	 * Be certain that the referenced PGPROC has an advertised xmin which
+	 * is no later than the one we're installing, so that the system-wide
+	 * xmin can't go backwards.  Also, make sure it's running in the same
+	 * database, so that the per-database xmin cannot go backwards.
+	 */
+	xid = pgxact->xmin;		/* fetch just once */
+	if (proc->databaseId == MyDatabaseId &&
+		TransactionIdIsNormal(xid) &&
+		TransactionIdPrecedesOrEquals(xid, xmin))
+	{
+		MyPgXact->xmin = TransactionXmin = xmin;
+		result = true;
+	}
+
+	LWLockRelease(ProcArrayLock);
+
+	return result;
+}
+
+/*
  * GetRunningTransactionData -- returns information about running transactions.
  *
  * Similar to GetSnapshotData but returns more information. We include
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c
index cd9a287..0678678 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -17,6 +17,7 @@
 #include <signal.h>
 #include <unistd.h>
 
+#include "access/parallel.h"
 #include "commands/async.h"
 #include "miscadmin.h"
 #include "storage/latch.h"
@@ -274,6 +275,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
 	if (CheckProcSignal(PROCSIG_NOTIFY_INTERRUPT))
 		HandleNotifyInterrupt();
 
+	if (CheckProcSignal(PROCSIG_PARALLEL_MESSAGE))
+		HandleParallelMessageInterrupt(true);
+
 	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_DATABASE))
 		RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_DATABASE);
 
diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c
index e783955..b35cbbd 100644
--- a/src/backend/storage/lmgr/predicate.c
+++ b/src/backend/storage/lmgr/predicate.c
@@ -1653,6 +1653,14 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot,
 
 	Assert(!RecoveryInProgress());
 
+	/*
+	 * Since all parts of a serializable transaction must use the same
+	 * snapshot, it is too late to establish one after a parallel operation
+	 * has begun.
+	 */
+	if (IsInParallelMode())
+		elog(ERROR, "cannot establish serializable snapshot during a parallel operation");
+
 	proc = MyProc;
 	Assert(proc != NULL);
 	GET_VXID_FROM_PGPROC(vxid, *proc);
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index cc62b2c..4285748 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -37,6 +37,7 @@
 #include "rusagestub.h"
 #endif
 
+#include "access/parallel.h"
 #include "access/printtup.h"
 #include "access/xact.h"
 #include "catalog/pg_type.h"
@@ -2968,7 +2969,8 @@ ProcessInterrupts(void)
 					 errmsg("canceling statement due to user request")));
 		}
 	}
-	/* If we get here, do nothing (probably, QueryCancelPending was reset) */
+	if (ParallelMessagePending)
+		HandleParallelMessageInterrupt(false);
 }
 
 
@@ -3704,7 +3706,7 @@ PostgresMain(int argc, char *argv[],
 	 * it inside InitPostgres() instead.  In particular, anything that
 	 * involves database access should be there, not here.
 	 */
-	InitPostgres(dbname, InvalidOid, username, NULL);
+	InitPostgres(dbname, InvalidOid, username, InvalidOid, NULL);
 
 	/*
 	 * If the PostmasterContext is still around, recycle the space; we don't
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 71580e8..623f985 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -128,14 +128,15 @@ CommandIsReadOnly(Node *parsetree)
 static void
 check_xact_readonly(Node *parsetree)
 {
-	if (!XactReadOnly)
+	/* Only perform the check if we have a reason to do so. */
+	if (!XactReadOnly && !IsInParallelMode())
 		return;
 
 	/*
 	 * Note: Commands that need to do more complicated checking are handled
 	 * elsewhere, in particular COPY and plannable statements do their own
-	 * checking.  However they should all call PreventCommandIfReadOnly to
-	 * actually throw the error.
+	 * checking.  However they should all call PreventCommandIfReadOnly
+	 * or PreventCommandIfParallelMode to actually throw the error.
 	 */
 
 	switch (nodeTag(parsetree))
@@ -207,6 +208,7 @@ check_xact_readonly(Node *parsetree)
 		case T_ImportForeignSchemaStmt:
 		case T_SecLabelStmt:
 			PreventCommandIfReadOnly(CreateCommandTag(parsetree));
+			PreventCommandIfParallelMode(CreateCommandTag(parsetree));
 			break;
 		default:
 			/* do nothing */
@@ -232,6 +234,24 @@ PreventCommandIfReadOnly(const char *cmdname)
 }
 
 /*
+ * PreventCommandIfParallelMode: throw error if current (sub)transaction is
+ * in parallel mode.
+ *
+ * This is useful mainly to ensure consistency of the error message wording;
+ * most callers have checked IsInParallelMode() for themselves.
+ */
+void
+PreventCommandIfParallelMode(const char *cmdname)
+{
+	if (IsInParallelMode())
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+		/* translator: %s is name of a SQL command, eg CREATE */
+				 errmsg("cannot execute %s during a parallel operation",
+						cmdname)));
+}
+
+/*
  * PreventCommandDuringRecovery: throw error if RecoveryInProgress
  *
  * The majority of operations that are unsafe in a Hot Standby slave
@@ -630,6 +650,7 @@ standard_ProcessUtility(Node *parsetree,
 		case T_ClusterStmt:
 			/* we choose to allow this during "read only" transactions */
 			PreventCommandDuringRecovery("CLUSTER");
+			/* forbidden in parallel mode due to CommandIsReadOnly */
 			cluster((ClusterStmt *) parsetree, isTopLevel);
 			break;
 
@@ -640,6 +661,7 @@ standard_ProcessUtility(Node *parsetree,
 				/* we choose to allow this during "read only" transactions */
 				PreventCommandDuringRecovery((stmt->options & VACOPT_VACUUM) ?
 											 "VACUUM" : "ANALYZE");
+				/* forbidden in parallel mode due to CommandIsReadOnly */
 				vacuum(stmt, InvalidOid, true, NULL, false, isTopLevel);
 			}
 			break;
@@ -716,6 +738,7 @@ standard_ProcessUtility(Node *parsetree,
 			 * outside a transaction block is presumed to be user error.
 			 */
 			RequireTransactionChain(isTopLevel, "LOCK TABLE");
+			/* forbidden in parallel mode due to CommandIsReadOnly */
 			LockTableCommand((LockStmt *) parsetree);
 			break;
 
@@ -747,6 +770,7 @@ standard_ProcessUtility(Node *parsetree,
 
 				/* we choose to allow this during "read only" transactions */
 				PreventCommandDuringRecovery("REINDEX");
+				/* forbidden in parallel mode due to CommandIsReadOnly */
 				switch (stmt->kind)
 				{
 					case REINDEX_OBJECT_INDEX:
diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c
index 8fccb4c..a045062 100644
--- a/src/backend/utils/init/miscinit.c
+++ b/src/backend/utils/init/miscinit.c
@@ -350,11 +350,10 @@ has_rolreplication(Oid roleid)
  * Initialize user identity during normal backend startup
  */
 void
-InitializeSessionUserId(const char *rolename)
+InitializeSessionUserId(const char *rolename, Oid roleid)
 {
 	HeapTuple	roleTup;
 	Form_pg_authid rform;
-	Oid			roleid;
 
 	/*
 	 * Don't do scans if we're bootstrapping, none of the system catalogs
@@ -365,7 +364,10 @@ InitializeSessionUserId(const char *rolename)
 	/* call only once */
 	AssertState(!OidIsValid(AuthenticatedUserId));
 
-	roleTup = SearchSysCache1(AUTHNAME, PointerGetDatum(rolename));
+	if (rolename != NULL)
+		roleTup = SearchSysCache1(AUTHNAME, PointerGetDatum(rolename));
+	else
+		roleTup = SearchSysCache1(AUTHOID, ObjectIdGetDatum(roleid));
 	if (!HeapTupleIsValid(roleTup))
 		ereport(FATAL,
 				(errcode(ERRCODE_INVALID_AUTHORIZATION_SPECIFICATION),
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index c348034..8f6d517 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -523,6 +523,9 @@ BaseInit(void)
  * name can be returned to the caller in out_dbname.  If out_dbname isn't
  * NULL, it must point to a buffer of size NAMEDATALEN.
  *
+ * Similarly, the username can be passed by name, using the username parameter,
+ * or by OID using the useroid parameter.
+ *
  * In bootstrap mode no parameters are used.  The autovacuum launcher process
  * doesn't use any parameters either, because it only goes far enough to be
  * able to read pg_database; it doesn't connect to any particular database.
@@ -537,7 +540,7 @@ BaseInit(void)
  */
 void
 InitPostgres(const char *in_dbname, Oid dboid, const char *username,
-			 char *out_dbname)
+			 Oid useroid, char *out_dbname)
 {
 	bool		bootstrap = IsBootstrapProcessingMode();
 	bool		am_superuser;
@@ -692,18 +695,18 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
 					(errcode(ERRCODE_UNDEFINED_OBJECT),
 					 errmsg("no roles are defined in this database system"),
 					 errhint("You should immediately run CREATE USER \"%s\" SUPERUSER;.",
-							 username)));
+							 username != NULL ? username : "postgres")));
 	}
 	else if (IsBackgroundWorker)
 	{
-		if (username == NULL)
+		if (username == NULL && !OidIsValid(useroid))
 		{
 			InitializeSessionUserIdStandalone();
 			am_superuser = true;
 		}
 		else
 		{
-			InitializeSessionUserId(username);
+			InitializeSessionUserId(username, useroid);
 			am_superuser = superuser();
 		}
 	}
@@ -712,7 +715,7 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
 		/* normal multiuser case */
 		Assert(MyProcPort != NULL);
 		PerformAuthentication(MyProcPort);
-		InitializeSessionUserId(username);
+		InitializeSessionUserId(username, useroid);
 		am_superuser = superuser();
 	}
 
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 77c3494..6f2a571 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -5656,6 +5656,20 @@ set_config_option(const char *name, const char *value,
 			elevel = ERROR;
 	}
 
+	/*
+	 * GUC_ACTION_SAVE changes are acceptable during a parallel operation,
+	 * because the current worker will also pop the change.  We're probably
+	 * dealing with a function having a proconfig entry.  Only the function's
+	 * body should observe the change, and peer workers do not share in the
+	 * execution of a function call started by this worker.
+	 *
+	 * Other changes might need to affect other workers, so forbid them.
+	 */
+	if (IsInParallelMode() && changeVal && action != GUC_ACTION_SAVE)
+		ereport(elevel,
+				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+				 errmsg("cannot set parameters during a parallel operation")));
+
 	record = find_option(name, true, elevel);
 	if (record == NULL)
 	{
@@ -6929,6 +6943,15 @@ ExecSetVariableStmt(VariableSetStmt *stmt, bool isTopLevel)
 {
 	GucAction	action = stmt->is_local ? GUC_ACTION_LOCAL : GUC_ACTION_SET;
 
+	/*
+	 * Workers synchronize these parameters at the start of the parallel
+	 * operation; then, we block SET during the operation.
+	 */
+	if (IsInParallelMode())
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+				 errmsg("cannot set parameters during a parallel operation")));
+
 	switch (stmt->kind)
 	{
 		case VAR_SET_VALUE:
diff --git a/src/backend/utils/time/combocid.c b/src/backend/utils/time/combocid.c
index ea7a905..6c8056e 100644
--- a/src/backend/utils/time/combocid.c
+++ b/src/backend/utils/time/combocid.c
@@ -44,6 +44,7 @@
 #include "miscadmin.h"
 #include "access/htup_details.h"
 #include "access/xact.h"
+#include "storage/shmem.h"
 #include "utils/combocid.h"
 #include "utils/hsearch.h"
 #include "utils/memutils.h"
@@ -286,3 +287,76 @@ GetRealCmax(CommandId combocid)
 	Assert(combocid < usedComboCids);
 	return comboCids[combocid].cmax;
 }
+
+/*
+ * Estimate the amount of space required to serialize the current ComboCID
+ * state.
+ */
+Size
+EstimateComboCIDStateSpace(void)
+{
+	Size		size;
+
+	/* Add space required for saving usedComboCids */
+	size = sizeof(int);
+
+	/* Add space required for saving the combocids key */
+	size = add_size(size, mul_size(sizeof(ComboCidKeyData), usedComboCids));
+
+	return size;
+}
+
+/*
+ * Serialize the ComboCID state into the memory, beginning at start_address.
+ * maxsize should be at least as large as the value returned by
+ * EstimateComboCIDStateSpace.
+ */
+void
+SerializeComboCIDState(Size maxsize, char *start_address)
+{
+	char	   *endptr;
+
+	/* First, we store the number of currently-existing ComboCIDs. */
+	* (int *) start_address = usedComboCids;
+
+	/* If maxsize is too small, throw an error. */
+	endptr = start_address + sizeof(int) +
+		(sizeof(ComboCidKeyData) * usedComboCids);
+	if (endptr < start_address || endptr > start_address + maxsize)
+		elog(ERROR, "not enough space to serialize ComboCID state");
+
+	/* Now, copy the actual cmin/cmax pairs. */
+	memcpy(start_address + sizeof(int), comboCids,
+		   (sizeof(ComboCidKeyData) * usedComboCids));
+}
+
+/*
+ * Read the ComboCID state at the specified address and initialize this
+ * backend with the same ComboCIDs.  This is only valid in a backend that
+ * currently has no ComboCIDs (and only makes sense if the transaction state
+ * is serialized and restored as well).
+ */
+void
+RestoreComboCIDState(char *comboCIDstate)
+{
+	int			num_elements;
+	ComboCidKeyData *keydata;
+	int			i;
+	CommandId	cid;
+
+	Assert(!comboCids && !comboHash);
+
+	/* First, we retrieve the number of ComboCIDs that were serialized. */
+	num_elements = * (int *) comboCIDstate;
+	keydata = (ComboCidKeyData *) (comboCIDstate + sizeof(int));
+
+	/* Use GetComboCommandId to restore each ComboCID. */
+	for (i = 0; i < num_elements; i++)
+	{
+		cid = GetComboCommandId(keydata[i].cmin, keydata[i].cmax);
+
+		/* Verify that we got the expected answer. */
+		if (cid != i)
+			elog(ERROR, "unexpected command ID while restoring combo CIDs");
+	}
+}
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index d601efe..11ec825 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -155,6 +155,22 @@ static Snapshot CopySnapshot(Snapshot snapshot);
 static void FreeSnapshot(Snapshot snapshot);
 static void SnapshotResetXmin(void);
 
+/*
+ * Snapshot fields to be serialized.
+ *
+ * Only these fields need to be sent to the cooperating backend; the
+ * remaining ones can (and must) set by the receiver upon restore.
+ */
+typedef struct SerializedSnapshotData
+{
+	TransactionId xmin;
+	TransactionId xmax;
+	uint32		xcnt;
+	int32		subxcnt;
+	bool		suboverflowed;
+	bool		takenDuringRecovery;
+	CommandId	curcid;
+} SerializedSnapshotData;
 
 /*
  * GetTransactionSnapshot
@@ -186,6 +202,10 @@ GetTransactionSnapshot(void)
 		Assert(RegisteredSnapshots == 0);
 		Assert(FirstXactSnapshot == NULL);
 
+		if (IsInParallelMode())
+			elog(ERROR,
+				 "cannot take query snapshot during a parallel operation");
+
 		/*
 		 * In transaction-snapshot mode, the first snapshot must live until
 		 * end of xact regardless of what the caller does with it, so we must
@@ -237,6 +257,14 @@ Snapshot
 GetLatestSnapshot(void)
 {
 	/*
+	 * We might be able to relax this, but nothing that could otherwise work
+	 * needs it.
+	 */
+	if (IsInParallelMode())
+		elog(ERROR,
+			 "cannot update SecondarySnapshot during a parallel operation");
+
+	/*
 	 * So far there are no cases requiring support for GetLatestSnapshot()
 	 * during logical decoding, but it wouldn't be hard to add if required.
 	 */
@@ -345,7 +373,8 @@ SnapshotSetCommandId(CommandId curcid)
  * in GetTransactionSnapshot.
  */
 static void
-SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid)
+SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid,
+					   PGPROC *sourceproc)
 {
 	/* Caller should have checked this already */
 	Assert(!FirstSnapshotSet);
@@ -392,7 +421,15 @@ SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid)
 	 * doesn't seem worth contorting the logic here to avoid two calls,
 	 * especially since it's not clear that predicate.c *must* do this.
 	 */
-	if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcexid))
+	if (sourceproc != NULL)
+	{
+		if (!ProcArrayInstallRestoredXmin(CurrentSnapshot->xmin, sourceproc))
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("could not import the requested snapshot"),
+			   errdetail("The source transaction is not running anymore.")));
+	}
+	else if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcexid))
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("could not import the requested snapshot"),
@@ -548,11 +585,24 @@ PushCopiedSnapshot(Snapshot snapshot)
 void
 UpdateActiveSnapshotCommandId(void)
 {
+	CommandId	save_curcid, curcid;
 	Assert(ActiveSnapshot != NULL);
 	Assert(ActiveSnapshot->as_snap->active_count == 1);
 	Assert(ActiveSnapshot->as_snap->regd_count == 0);
 
-	ActiveSnapshot->as_snap->curcid = GetCurrentCommandId(false);
+	/*
+	 * Don't allow modification of the active snapshot during parallel
+	 * operation.  We share the snapshot to worker backends at beginning of
+	 * parallel operation, so any change to snapshot can lead to
+	 * inconsistencies.  We have other defenses against
+	 * CommandCounterIncrement, but there are a few places that call this
+	 * directly, so we put an additional guard here.
+	 */
+	save_curcid = ActiveSnapshot->as_snap->curcid;
+	curcid = GetCurrentCommandId(false);
+	if (IsInParallelMode() && save_curcid != curcid)
+		elog(ERROR, "cannot modify commandid in active snapshot during a parallel operation");
+	ActiveSnapshot->as_snap->curcid = curcid;
 }
 
 /*
@@ -1247,7 +1297,7 @@ ImportSnapshot(const char *idstr)
 			  errmsg("cannot import a snapshot from a different database")));
 
 	/* OK, install the snapshot */
-	SetTransactionSnapshot(&snapshot, src_xid);
+	SetTransactionSnapshot(&snapshot, src_xid, NULL);
 }
 
 /*
@@ -1350,3 +1400,159 @@ HistoricSnapshotGetTupleCids(void)
 	Assert(HistoricSnapshotActive());
 	return tuplecid_data;
 }
+
+/*
+ * EstimateSnapshotSpace
+ *		Returns the size need to store the given snapshot.
+ *
+ * We are exporting only required fields from the Snapshot, stored in
+ * SerializedSnapshotData.
+ */
+Size
+EstimateSnapshotSpace(Snapshot snap)
+{
+	Size		size;
+
+	Assert(snap != InvalidSnapshot);
+	Assert(snap->satisfies == HeapTupleSatisfiesMVCC);
+
+	/* We allocate any XID arrays needed in the same palloc block. */
+	size = add_size(sizeof(SerializedSnapshotData),
+					mul_size(snap->xcnt, sizeof(TransactionId)));
+	if (snap->subxcnt > 0 &&
+		(!snap->suboverflowed || snap->takenDuringRecovery))
+		size = add_size(size,
+						mul_size(snap->subxcnt, sizeof(TransactionId)));
+
+	return size;
+}
+
+/*
+ * SerializeSnapshot
+ * 		Dumps the serialized snapshot (extracted from given snapshot) onto the
+ * 		memory location at start_address.
+ */
+void
+SerializeSnapshot(Snapshot snapshot, Size maxsize, char *start_address)
+{
+	SerializedSnapshotData *serialized_snapshot;
+
+	/* If the size is small, throw an error */
+	if (maxsize < EstimateSnapshotSpace(snapshot))
+		elog(ERROR, "not enough space to serialize given snapshot");
+
+	Assert(snapshot->xcnt >= 0);
+	Assert(snapshot->subxcnt >= 0);
+
+	serialized_snapshot = (SerializedSnapshotData *) start_address;
+
+	/* Copy all required fields */
+	serialized_snapshot->xmin = snapshot->xmin;
+	serialized_snapshot->xmax = snapshot->xmax;
+	serialized_snapshot->xcnt = snapshot->xcnt;
+	serialized_snapshot->subxcnt = snapshot->subxcnt;
+	serialized_snapshot->suboverflowed = snapshot->suboverflowed;
+	serialized_snapshot->takenDuringRecovery = snapshot->takenDuringRecovery;
+	serialized_snapshot->curcid = snapshot->curcid;
+
+	/*
+	 * Ignore the SubXID array if it has overflowed, unless the snapshot
+	 * was taken during recovey - in that case, top-level XIDs are in subxip
+	 * as well, and we mustn't lose them.
+	 */
+	if (serialized_snapshot->suboverflowed && !snapshot->takenDuringRecovery)
+		serialized_snapshot->subxcnt = 0;
+
+	/* Copy XID array */
+	if (snapshot->xcnt > 0)
+		memcpy((TransactionId *) (serialized_snapshot + 1),
+			   snapshot->xip, snapshot->xcnt * sizeof(TransactionId));
+
+	/*
+	 * Copy SubXID array. Don't bother to copy it if it had overflowed,
+	 * though, because it's not used anywhere in that case. Except if it's a
+	 * snapshot taken during recovery; all the top-level XIDs are in subxip as
+	 * well in that case, so we mustn't lose them.
+	 */
+	if (snapshot->subxcnt > 0)
+	{
+		Size subxipoff = sizeof(SerializedSnapshotData) +
+			snapshot->xcnt * sizeof(TransactionId);
+
+		memcpy((TransactionId *) ((char *) serialized_snapshot + subxipoff),
+			   snapshot->subxip, snapshot->subxcnt * sizeof(TransactionId));
+	}
+}
+
+/*
+ * RestoreSnapshot
+ *		Restore a serialized snapshot from the specified address.
+ *
+ * The copy is palloc'd in TopTransactionContext and has initial refcounts set
+ * to 0.  The returned snapshot has the copied flag set.
+ */
+Snapshot
+RestoreSnapshot(char *start_address)
+{
+	SerializedSnapshotData *serialized_snapshot;
+	Size		size;
+	Snapshot	snapshot;
+	TransactionId *serialized_xids;
+
+	serialized_snapshot = (SerializedSnapshotData *) start_address;
+	serialized_xids = (TransactionId *)
+		(start_address + sizeof(SerializedSnapshotData));
+
+	/* We allocate any XID arrays needed in the same palloc block. */
+	size = sizeof(SnapshotData)
+		+ serialized_snapshot->xcnt * sizeof(TransactionId)
+		+ serialized_snapshot->subxcnt * sizeof(TransactionId);
+
+	/* Copy all required fields */
+	snapshot = (Snapshot) MemoryContextAlloc(TopTransactionContext, size);
+	snapshot->satisfies = HeapTupleSatisfiesMVCC;
+	snapshot->xmin = serialized_snapshot->xmin;
+	snapshot->xmax = serialized_snapshot->xmax;
+	snapshot->xip = NULL;
+	snapshot->xcnt = serialized_snapshot->xcnt;
+	snapshot->subxip = NULL;
+	snapshot->subxcnt = serialized_snapshot->subxcnt;
+	snapshot->suboverflowed = serialized_snapshot->suboverflowed;
+	snapshot->takenDuringRecovery = serialized_snapshot->takenDuringRecovery;
+	snapshot->curcid = serialized_snapshot->curcid;
+
+	/* Copy XIDs, if present. */
+	if (serialized_snapshot->xcnt > 0)
+	{
+		snapshot->xip = (TransactionId *) (snapshot + 1);
+		memcpy(snapshot->xip, serialized_xids,
+			   serialized_snapshot->xcnt * sizeof(TransactionId));
+	}
+
+	/* Copy SubXIDs, if present. */
+	if (serialized_snapshot->subxcnt > 0)
+	{
+		snapshot->subxip = snapshot->xip + serialized_snapshot->xcnt;
+		memcpy(snapshot->subxip, serialized_xids + serialized_snapshot->xcnt,
+			   serialized_snapshot->subxcnt * sizeof(TransactionId));
+	}
+
+	/* Set the copied flag so that the caller will set refcounts correctly. */
+	snapshot->regd_count = 0;
+	snapshot->active_count = 0;
+	snapshot->copied = true;
+
+	return snapshot;
+}
+
+/*
+ * Install a restored snapshot as the transaction snapshot.
+ *
+ * The second argument is of type void * so that snapmgr.h need not include
+ * the declaration for PGPROC.
+ */
+void
+RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
+{
+	SetTransactionSnapshot(snapshot, InvalidTransactionId, master_pgproc);
+}
diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h
new file mode 100644
index 0000000..761ba1f
--- /dev/null
+++ b/src/include/access/parallel.h
@@ -0,0 +1,61 @@
+/*-------------------------------------------------------------------------
+ *
+ * parallel.h
+ *	  Infrastructure for launching parallel workers
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/parallel.h
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef PARALLEL_H
+#define PARALLEL_H
+
+#include "lib/ilist.h"
+#include "postmaster/bgworker.h"
+#include "storage/shm_mq.h"
+#include "storage/shm_toc.h"
+#include "utils/elog.h"
+
+typedef void (*parallel_worker_main_type)(dsm_segment *seg, shm_toc *toc);
+
+typedef struct ParallelWorkerInfo
+{
+	BackgroundWorkerHandle *bgwhandle;
+	shm_mq_handle *error_mqh;
+} ParallelWorkerInfo;
+
+typedef struct ParallelContext
+{
+	dlist_node node;
+	SubTransactionId subid;
+	int nworkers;
+	parallel_worker_main_type entrypoint;
+	char *library_name;
+	char *function_name;
+	ErrorContextCallback *error_context_stack;
+	shm_toc_estimator estimator;
+	dsm_segment *seg;
+	shm_toc *toc;
+	ParallelWorkerInfo *worker;
+} ParallelContext;
+
+extern bool ParallelMessagePending;
+
+extern ParallelContext *CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers);
+extern ParallelContext *CreateParallelContextForExtension(char *library_name,
+								  char *function_name, int nworkers);
+extern void InitializeParallelDSM(ParallelContext *);
+extern void LaunchParallelWorkers(ParallelContext *);
+extern void WaitForParallelWorkersToFinish(ParallelContext *);
+extern void DestroyParallelContext(ParallelContext *);
+extern bool ParallelContextActive(void);
+
+extern void HandleParallelMessageInterrupt(bool signal_handler);
+extern void AtEOXact_Parallel(bool isCommit);
+extern void AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId);
+
+#endif   /* PARALLEL_H */
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index b018aa4..91d9d73 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -77,9 +77,12 @@ extern bool MyXactAccessedTempRel;
 typedef enum
 {
 	XACT_EVENT_COMMIT,
+	XACT_EVENT_PARALLEL_COMMIT,
 	XACT_EVENT_ABORT,
+	XACT_EVENT_PARALLEL_ABORT,
 	XACT_EVENT_PREPARE,
 	XACT_EVENT_PRE_COMMIT,
+	XACT_EVENT_PARALLEL_PRE_COMMIT,
 	XACT_EVENT_PRE_PREPARE
 } XactEvent;
 
@@ -241,6 +244,10 @@ extern void BeginInternalSubTransaction(char *name);
 extern void ReleaseCurrentSubTransaction(void);
 extern void RollbackAndReleaseCurrentSubTransaction(void);
 extern bool IsSubTransaction(void);
+extern Size EstimateTransactionStateSpace(void);
+extern void SerializeTransactionState(Size maxsize, char *start_address);
+extern void StartParallelWorkerTransaction(char *tstatespace);
+extern void EndParallelWorkerTransaction(void);
 extern bool IsTransactionBlock(void);
 extern bool IsTransactionOrTransactionBlock(void);
 extern char TransactionBlockStatusCode(void);
@@ -260,4 +267,8 @@ extern void xact_redo(XLogReaderState *record);
 extern void xact_desc(StringInfo buf, XLogReaderState *record);
 extern const char *xact_identify(uint8 info);
 
+extern void EnterParallelMode(void);
+extern void ExitParallelMode(void);
+extern bool IsInParallelMode(void);
+
 #endif   /* XACT_H */
diff --git a/src/include/libpq/pqmq.h b/src/include/libpq/pqmq.h
index f8e68c9..78583b3 100644
--- a/src/include/libpq/pqmq.h
+++ b/src/include/libpq/pqmq.h
@@ -17,6 +17,7 @@
 #include "storage/shm_mq.h"
 
 extern void	pq_redirect_to_shm_mq(shm_mq *, shm_mq_handle *);
+extern void pq_set_parallel_master(pid_t pid, BackendId backend_id);
 
 extern void pq_parse_errornotice(StringInfo str, ErrorData *edata);
 
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index 1558a75..fb7754f 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -258,6 +258,7 @@ extern void check_stack_depth(void);
 
 /* in tcop/utility.c */
 extern void PreventCommandIfReadOnly(const char *cmdname);
+extern void PreventCommandIfParallelMode(const char *cmdname);
 extern void PreventCommandDuringRecovery(const char *cmdname);
 
 /* in utils/misc/guc.c */
@@ -290,7 +291,7 @@ extern bool InLocalUserIdChange(void);
 extern bool InSecurityRestrictedOperation(void);
 extern void GetUserIdAndContext(Oid *userid, bool *sec_def_context);
 extern void SetUserIdAndContext(Oid userid, bool sec_def_context);
-extern void InitializeSessionUserId(const char *rolename);
+extern void InitializeSessionUserId(const char *rolename, Oid useroid);
 extern void InitializeSessionUserIdStandalone(void);
 extern void SetSessionAuthorization(Oid userid, bool is_superuser);
 extern Oid	GetCurrentRoleId(void);
@@ -391,7 +392,7 @@ extern AuxProcType MyAuxProcType;
 extern void pg_split_opts(char **argv, int *argcp, char *optstr);
 extern void InitializeMaxBackends(void);
 extern void InitPostgres(const char *in_dbname, Oid dboid, const char *username,
-			 char *out_dbname);
+			 Oid useroid, char *out_dbname);
 extern void BaseInit(void);
 
 /* in utils/init/miscinit.c */
diff --git a/src/include/postmaster/bgworker.h b/src/include/postmaster/bgworker.h
index a3b3d5f..a270da4 100644
--- a/src/include/postmaster/bgworker.h
+++ b/src/include/postmaster/bgworker.h
@@ -112,6 +112,8 @@ extern BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle,
 extern BgwHandleStatus
 WaitForBackgroundWorkerStartup(BackgroundWorkerHandle *
 							   handle, pid_t *pid);
+extern BgwHandleStatus
+WaitForBackgroundWorkerShutdown(BackgroundWorkerHandle *);
 
 /* Terminate a bgworker */
 extern void TerminateBackgroundWorker(BackgroundWorkerHandle *handle);
@@ -130,6 +132,9 @@ extern PGDLLIMPORT BackgroundWorker *MyBgworkerEntry;
  */
 extern void BackgroundWorkerInitializeConnection(char *dbname, char *username);
 
+/* Just like the above, but specifying database and user by OID. */
+extern void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid);
+
 /* Block/unblock signals in a background worker process */
 extern void BackgroundWorkerBlockSignals(void);
 extern void BackgroundWorkerUnblockSignals(void);
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index 0c4611b..9f4e4b9 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -46,6 +46,7 @@ extern Snapshot GetSnapshotData(Snapshot snapshot);
 
 extern bool ProcArrayInstallImportedXmin(TransactionId xmin,
 							 TransactionId sourcexid);
+extern bool ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc);
 
 extern RunningTransactions GetRunningTransactionData(void);
 
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index c625562..3046e52 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -31,6 +31,7 @@ typedef enum
 {
 	PROCSIG_CATCHUP_INTERRUPT,	/* sinval catchup interrupt */
 	PROCSIG_NOTIFY_INTERRUPT,	/* listen/notify interrupt */
+	PROCSIG_PARALLEL_MESSAGE,	/* message from cooperating parallel backend */
 
 	/* Recovery conflict reasons */
 	PROCSIG_RECOVERY_CONFLICT_DATABASE,
diff --git a/src/include/utils/combocid.h b/src/include/utils/combocid.h
index 9e482ff..232f729 100644
--- a/src/include/utils/combocid.h
+++ b/src/include/utils/combocid.h
@@ -21,5 +21,8 @@
  */
 
 extern void AtEOXact_ComboCid(void);
+extern void RestoreComboCIDState(char *comboCIDstate);
+extern void SerializeComboCIDState(Size maxsize, char *start_address);
+extern Size EstimateComboCIDStateSpace(void);
 
 #endif   /* COMBOCID_H */
diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h
index abe7016..7efd427 100644
--- a/src/include/utils/snapmgr.h
+++ b/src/include/utils/snapmgr.h
@@ -64,4 +64,10 @@ extern void SetupHistoricSnapshot(Snapshot snapshot_now, struct HTAB *tuplecids)
 extern void TeardownHistoricSnapshot(bool is_error);
 extern bool HistoricSnapshotActive(void);
 
+extern Size EstimateSnapshotSpace(Snapshot snapshot);
+extern void SerializeSnapshot(Snapshot snapshot, Size maxsize,
+							  char *start_address);
+extern Snapshot RestoreSnapshot(char *start_address);
+extern void RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc);
+
 #endif   /* SNAPMGR_H */
