diff --git a/contrib/parallel_dummy/Makefile b/contrib/parallel_dummy/Makefile
new file mode 100644
index 0000000..de00f50
--- /dev/null
+++ b/contrib/parallel_dummy/Makefile
@@ -0,0 +1,19 @@
+MODULE_big = parallel_dummy
+OBJS = parallel_dummy.o $(WIN32RES)
+PGFILEDESC = "parallel_dummy - dummy use of parallel infrastructure"
+
+EXTENSION = parallel_dummy
+DATA = parallel_dummy--1.0.sql
+
+REGRESS = parallel_dummy
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = contrib/parallel_dummy
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/contrib/parallel_dummy/parallel_dummy--1.0.sql b/contrib/parallel_dummy/parallel_dummy--1.0.sql
new file mode 100644
index 0000000..2a7251c
--- /dev/null
+++ b/contrib/parallel_dummy/parallel_dummy--1.0.sql
@@ -0,0 +1,7 @@
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION parallel_dummy" to load this file. \quit
+
+CREATE FUNCTION parallel_dummy(sleep_time pg_catalog.int4,
+							  nworkers pg_catalog.int4)
+    RETURNS pg_catalog.void STRICT
+	AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/contrib/parallel_dummy/parallel_dummy.c b/contrib/parallel_dummy/parallel_dummy.c
new file mode 100644
index 0000000..99b830f
--- /dev/null
+++ b/contrib/parallel_dummy/parallel_dummy.c
@@ -0,0 +1,82 @@
+/*--------------------------------------------------------------------------
+ *
+ * parallel_dummy.c
+ *		Test harness code for parallel mode code.
+ *
+ * Copyright (C) 2013-2014, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		contrib/parallel_dummy/parallel_dummy.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/parallel.h"
+#include "access/xact.h"
+#include "fmgr.h"
+#include "miscadmin.h"
+#include "utils/builtins.h"
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(parallel_dummy);
+
+#define		PARALLEL_DUMMY_KEY			1
+
+typedef struct {
+	int32		sleep_time;
+} ParallelDummyInfo;
+
+void		_PG_init(void);
+void		worker_main(shm_toc *toc);
+
+Datum
+parallel_dummy(PG_FUNCTION_ARGS)
+{
+	int32		sleep_time = PG_GETARG_INT32(0);
+	int32		nworkers = PG_GETARG_INT32(1);
+	bool		already_in_parallel_mode = IsInParallelMode();
+	ParallelContext *pcxt;
+	ParallelDummyInfo *info;
+
+	if (nworkers < 1)
+		ereport(ERROR,
+				(errmsg("number of parallel workers must be positive")));
+
+	if (!already_in_parallel_mode)
+		EnterParallelMode();
+
+	pcxt = CreateParallelContextForExtension("parallel_dummy", "worker_main",
+											 nworkers);
+	shm_toc_estimate_chunk(&pcxt->estimator, sizeof(ParallelDummyInfo));
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+	InitializeParallelDSM(pcxt);
+	info = shm_toc_allocate(pcxt->toc, sizeof(ParallelDummyInfo));
+	info->sleep_time = sleep_time;
+	shm_toc_insert(pcxt->toc, PARALLEL_DUMMY_KEY, info);
+	LaunchParallelWorkers(pcxt);
+
+	/* here's where we do the "real work" ... */
+	DirectFunctionCall1(pg_sleep, Float8GetDatum((float8) sleep_time));
+
+	DestroyParallelContext(pcxt);
+
+	if (!already_in_parallel_mode)
+		ExitParallelMode();
+
+	PG_RETURN_VOID();
+}
+
+void
+worker_main(shm_toc *toc)
+{
+	ParallelDummyInfo *info;
+
+	info = shm_toc_lookup(toc, PARALLEL_DUMMY_KEY);
+	Assert(info != NULL);
+
+	/* here's where we do the "real work" ... */
+	DirectFunctionCall1(pg_sleep, Float8GetDatum((float8) info->sleep_time));
+}
diff --git a/contrib/parallel_dummy/parallel_dummy.control b/contrib/parallel_dummy/parallel_dummy.control
new file mode 100644
index 0000000..90bae3f
--- /dev/null
+++ b/contrib/parallel_dummy/parallel_dummy.control
@@ -0,0 +1,4 @@
+comment = 'Dummy parallel code'
+default_version = '1.0'
+module_pathname = '$libdir/parallel_dummy'
+relocatable = true
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index df4853b..31efa37 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -2636,6 +2636,16 @@ heap_delete(Relation relation, ItemPointer tid,
 
 	Assert(ItemPointerIsValid(tid));
 
+	/*
+	 * Forbid this during a parallel operation, lest it allocate a combocid.
+	 * Other workers might need that combocid for visibility checks, and we
+	 * have no provision for broadcasting it to them.
+	 */
+	if (IsInParallelMode())
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+				 errmsg("cannot delete tuples during a parallel operation")));
+
 	block = ItemPointerGetBlockNumber(tid);
 	buffer = ReadBuffer(relation, block);
 	page = BufferGetPage(buffer);
@@ -3077,6 +3087,16 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
 	Assert(ItemPointerIsValid(otid));
 
 	/*
+	 * Forbid this during a parallel operation, lest it allocate a combocid.
+	 * Other workers might need that combocid for visibility checks, and we
+	 * have no provision for broadcasting it to them.
+	 */
+	if (IsInParallelMode())
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+				 errmsg("cannot update tuples during a parallel operation")));
+
+	/*
 	 * Fetch the list of attributes to be checked for HOT update.  This is
 	 * wasted effort if we fail to update or have to put the new tuple on a
 	 * different page.  But we must compute the list before obtaining buffer
diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index 9d4d5db..94455b2 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -12,7 +12,7 @@ subdir = src/backend/access/transam
 top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = clog.o commit_ts.o multixact.o rmgr.o slru.o subtrans.o \
+OBJS = clog.o commit_ts.o multixact.o parallel.o rmgr.o slru.o subtrans.o \
 	timeline.o transam.o twophase.o twophase_rmgr.o varsup.o \
 	xact.o xlog.o xlogarchive.o xlogfuncs.o \
 	xloginsert.o xlogreader.o xlogutils.o
diff --git a/src/backend/access/transam/README.parallel b/src/backend/access/transam/README.parallel
new file mode 100644
index 0000000..e427275
--- /dev/null
+++ b/src/backend/access/transam/README.parallel
@@ -0,0 +1,36 @@
+Overview
+========
+
+Before beginning any parallel operation, call EnterParallelMode(); after all
+parallel operations are completed, call ExitParallelMode().  These functions
+don't launch any workers or directly enable parallelism, but they put in place
+a variety of prohibitions required to make parallelism safe.
+
+To actually parallelize a particular operation, use a ParallelContext.  This
+establishes a dynamic shared memory segment and registers dynamic background
+workers which will attach to that segment.  We arrange to synchronize various
+pieces of state - such as, most simply, the database and user OIDs - from the
+backend that is initiating parallelism to all of the background workers
+launched via a ParallelContext.  The basic coding pattern looks like this:
+
+	EnterParallelMode();		/* prohibit unsafe state changes */
+
+	pcxt = CreateParallelContext(entrypoint, nworkers);
+
+	/* Allow space for application-specific data here. */
+	shm_toc_estimate_chunk(&pcxt->estimator, size);
+	shm_toc_estimate_keys(&pcxt->estimator, keys);
+
+	InitializeParallelDSM(pcxt);	/* create DSM and copy state to it */
+
+	/* Store the data for which we reserved space. */
+	space = shm_toc_allocate(pcxt->toc, size);
+	shm_toc_insert(pcxt->toc, key, space);
+
+	LaunchParallelWorkers(pcxt);
+
+	/* do parallel stuff */
+
+	DestroyParallelContext(pcxt);
+
+	ExitParallelMode();
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
new file mode 100644
index 0000000..eeae591
--- /dev/null
+++ b/src/backend/access/transam/parallel.c
@@ -0,0 +1,787 @@
+/*-------------------------------------------------------------------------
+ *
+ * parallel.c
+ *	  Infrastructure for launching parallel workers
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	  src/backend/access/transam/parallel.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/xact.h"
+#include "access/parallel.h"
+#include "commands/async.h"
+#include "libpq/libpq.h"
+#include "libpq/pqformat.h"
+#include "libpq/pqmq.h"
+#include "miscadmin.h"
+#include "storage/ipc.h"
+#include "storage/sinval.h"
+#include "storage/spin.h"
+#include "utils/combocid.h"
+#include "utils/guc.h"
+#include "utils/memutils.h"
+#include "utils/resowner.h"
+#include "utils/snapmgr.h"
+
+/*
+ * We don't want to waste a lot of memory on an error queue which, most of
+ * the time, will process only a handful of small messages.  However, it is
+ * desirable to make it large enough that a typical ErrorResponse can be sent
+ * without blocking.  That way, a worker that errors out can write the whole
+ * message into the queue and terminate without waiting for the user backend.
+ */
+#define	PARALLEL_ERROR_QUEUE_SIZE			16384
+
+/* Magic number for parallel context TOC. */
+#define PARALLEL_MAGIC						0x50477c7c
+
+/*
+ * Magic numbers for parallel state sharing.  Higher-level code should use
+ * smaller values, leaving these very large ones for use by this module.
+ */
+#define PARALLEL_KEY_FIXED					UINT64CONST(0xFFFFFFFFFFFF0001)
+#define PARALLEL_KEY_ERROR_QUEUE			UINT64CONST(0xFFFFFFFFFFFF0002)
+#define PARALLEL_KEY_GUC					UINT64CONST(0xFFFFFFFFFFFF0003)
+#define PARALLEL_KEY_COMBO_CID				UINT64CONST(0xFFFFFFFFFFFF0004)
+#define PARALLEL_KEY_TRANSACTION_SNAPSHOT	UINT64CONST(0xFFFFFFFFFFFF0005)
+#define PARALLEL_KEY_ACTIVE_SNAPSHOT		UINT64CONST(0xFFFFFFFFFFFF0006)
+#define PARALLEL_KEY_EXTENSION_TRAMPOLINE	UINT64CONST(0xFFFFFFFFFFFF0007)
+
+/* Fixed-size parallel state. */
+typedef struct FixedParallelState
+{
+	/* Fixed-size state that workers must restore. */
+	Oid			database_id;
+	Oid			authenticated_user_id;
+	Oid			current_user_id;
+	int			sec_context;
+	PGPROC	   *parallel_master_pgproc;
+	pid_t		parallel_master_pid;
+	BackendId	parallel_master_backend_id;
+
+	/* Entrypoint for parallel workers. */
+	parallel_worker_main_type	entrypoint;
+
+	/* Track whether workers have attached. */
+	slock_t		mutex;
+	int			workers_expected;
+	int			workers_attached;
+} FixedParallelState;
+
+/*
+ * Our parallel worker number.  We initialize this to -1, meaning that we are
+ * not a parallel worker.  In parallel workers, it will be set to a value >= 0
+ * and < the number of workers before any user code is invoked; each parallel
+ * worker will get a different parallel worker number.
+ */
+int ParallelWorkerNumber = -1;
+
+/* Is there a parallel message pending which we need to receive? */
+bool ParallelMessagePending = false;
+
+/* List of active parallel contexts. */
+static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);
+
+/* Private functions. */
+static void HandleParallelMessages(void);
+static void HandleParallelMessage(StringInfo msg);
+static void ParallelMain(Datum main_arg);
+static void ParallelExtensionTrampoline(shm_toc *toc);
+static void handle_sigterm(SIGNAL_ARGS);
+
+/*
+ * Establish a new parallel context.  This should be done after entering
+ * parallel mode, and (unless there is an error) the context should be
+ * destroyed before exiting the current subtransaction.
+ */
+ParallelContext *
+CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers)
+{
+	MemoryContext	oldcontext;
+	ParallelContext	*pcxt;
+
+	/* It is unsafe to create a parallel context if not in parallel mode. */
+	Assert(IsInParallelMode());
+
+	/* Number of workers should be positive. */
+	Assert(nworkers > 0);
+
+	/* We might be running in a very short-lived memory context. */
+	oldcontext = MemoryContextSwitchTo(TopTransactionContext);
+
+	/* Initialize a new ParallelContext. */
+	pcxt = palloc0(sizeof(ParallelContext));
+	pcxt->subid = GetCurrentSubTransactionId();
+	pcxt->nworkers = nworkers;
+	pcxt->entrypoint = entrypoint;
+	shm_toc_initialize_estimator(&pcxt->estimator);
+	dlist_push_head(&pcxt_list, &pcxt->node);
+
+	/* Restore previous memory context. */
+	MemoryContextSwitchTo(oldcontext);
+
+	return pcxt;
+}
+
+/*
+ * Establish a new parallel context that calls a function provided by an
+ * extension.  This works around the fact that the library might get mapped
+ * at a different address in each backend.
+ */
+ParallelContext *
+CreateParallelContextForExtension(char *library_name, char *function_name,
+								  int nworkers)
+{
+	MemoryContext	oldcontext;
+	ParallelContext *pcxt;
+
+	/* We might be running in a very short-lived memory context. */
+	oldcontext = MemoryContextSwitchTo(TopTransactionContext);
+
+	/* Create the context. */
+	pcxt = CreateParallelContext(ParallelExtensionTrampoline, nworkers);
+	pcxt->library_name = pstrdup(library_name);
+	pcxt->function_name = pstrdup(function_name);
+
+	/* Restore previous memory context. */
+	MemoryContextSwitchTo(oldcontext);
+
+	return pcxt;
+}
+
+/*
+ * Establish the dynamic shared memory segment for a parallel context and
+ * copied state and other bookkeeping information that will need by parallel
+ * workers into it.
+ */
+void
+InitializeParallelDSM(ParallelContext *pcxt)
+{
+	MemoryContext	oldcontext;
+	Size	guc_len;
+	Size	combocidlen;
+	Size	tsnaplen;
+	Size	asnaplen;
+	Size	segsize;
+	int		i;
+	FixedParallelState *fps;
+	char   *gucspace;
+	char   *combocidspace;
+	char   *tsnapspace;
+	char   *asnapspace;
+	char   *error_queue_space;
+	Snapshot	transaction_snapshot = GetTransactionSnapshot();
+	Snapshot	active_snapshot = GetActiveSnapshot();
+
+	/* We might be running in a very short-lived memory context. */
+	oldcontext = MemoryContextSwitchTo(TopTransactionContext);
+
+	/* Allocate space for worker information. */
+	pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers);
+
+	/*
+	 * Estimate how much space we'll need for state sharing.
+	 *
+	 * If you add more chunks here, you probably need more keys, too.
+	 */
+	shm_toc_estimate_chunk(&pcxt->estimator, sizeof(FixedParallelState));
+	guc_len = EstimateGUCStateSpace();
+	shm_toc_estimate_chunk(&pcxt->estimator, guc_len);
+	combocidlen = EstimateComboCIDStateSpace();
+	shm_toc_estimate_chunk(&pcxt->estimator, combocidlen);
+	tsnaplen = EstimateSnapshotSpace(transaction_snapshot);
+	shm_toc_estimate_chunk(&pcxt->estimator, tsnaplen);
+	asnaplen = EstimateSnapshotSpace(active_snapshot);
+	shm_toc_estimate_chunk(&pcxt->estimator, asnaplen);
+	shm_toc_estimate_keys(&pcxt->estimator, 5);
+
+	/* Estimate how much space we'll need for error queues. */
+	StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
+		PARALLEL_ERROR_QUEUE_SIZE,
+		"parallel error queue size not buffer-aligned");
+	shm_toc_estimate_chunk(&pcxt->estimator,
+						   PARALLEL_ERROR_QUEUE_SIZE * pcxt->nworkers);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+	/* Estimate how much we'll need for extension entrypoint information. */
+	if (pcxt->library_name != NULL)
+	{
+		Assert(pcxt->entrypoint == ParallelExtensionTrampoline);
+		Assert(pcxt->function_name != NULL);
+		shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name)
+							   + strlen(pcxt->function_name) + 2);
+		shm_toc_estimate_keys(&pcxt->estimator, 1);
+	}
+
+	/* Create DSM and initialize with new table of contents. */
+	segsize = shm_toc_estimate(&pcxt->estimator);
+	pcxt->seg = dsm_create(segsize);
+	pcxt->toc = shm_toc_create(PARALLEL_MAGIC,
+							   dsm_segment_address(pcxt->seg),
+							   segsize);
+
+	/* Initialize fixed-size state in shared memory. */
+	fps = (FixedParallelState *)
+		shm_toc_allocate(pcxt->toc, sizeof(FixedParallelState));
+	fps->database_id = MyDatabaseId;
+	fps->authenticated_user_id = GetAuthenticatedUserId();
+	GetUserIdAndSecContext(&fps->current_user_id, &fps->sec_context);
+	fps->parallel_master_pgproc = MyProc;
+	fps->parallel_master_pid = MyProcPid;
+	fps->parallel_master_backend_id = MyBackendId;
+	fps->entrypoint = pcxt->entrypoint;
+	SpinLockInit(&fps->mutex);
+	fps->workers_expected = pcxt->nworkers;
+	fps->workers_attached = 0;
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps);
+
+	/* Serialize GUC state to dynamic shared memory. */
+	gucspace = shm_toc_allocate(pcxt->toc, guc_len);
+	SerializeGUCState(guc_len, gucspace);
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_GUC, gucspace);
+
+	/* Serialize combo CID state to dynamic shared memory. */
+	combocidspace = shm_toc_allocate(pcxt->toc, combocidlen);
+	SerializeComboCIDState(combocidlen, combocidspace);
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_COMBO_CID, combocidspace);
+
+	/* Serialize transaction snapshots to dynamic shared memory. */
+	tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen);
+	SerializeSnapshot(transaction_snapshot, tsnaplen, tsnapspace);
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT, tsnapspace);
+	asnapspace = shm_toc_allocate(pcxt->toc, asnaplen);
+	SerializeSnapshot(active_snapshot, asnaplen, asnapspace);
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace);
+
+	/*
+	 * Establish error queues in dynamic shared memory.
+	 *
+	 * These queues should be used only for transmitting ErrorResponse,
+	 * NoticeResponse, and NotifyResponse protocol messages.  Tuple data should
+	 * be transmitted via separate (possibly larger?) queue.
+	 */
+	error_queue_space =
+	   shm_toc_allocate(pcxt->toc, PARALLEL_ERROR_QUEUE_SIZE * pcxt->nworkers);
+	for (i = 0; i < pcxt->nworkers; ++i)
+	{
+		shm_mq *mq;
+
+		mq = shm_mq_create(error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE,
+						   PARALLEL_ERROR_QUEUE_SIZE);
+		shm_mq_set_receiver(mq, MyProc);
+		pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
+	}
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, error_queue_space);
+
+	/* Serialize extension entrypoint information to dynamic shared memory. */
+	if (pcxt->library_name != NULL)
+	{
+		Size	lnamelen = strlen(pcxt->library_name);
+		char *extensionstate;
+
+		extensionstate = shm_toc_allocate(pcxt->toc, lnamelen
+										  + strlen(pcxt->function_name) + 2);
+		strcpy(extensionstate, pcxt->library_name);
+		strcpy(extensionstate + lnamelen + 1, pcxt->function_name);
+		shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXTENSION_TRAMPOLINE,
+					   extensionstate);
+	}
+
+	/* Restore previous memory context. */
+	MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Launch parallel workers.
+ */
+void
+LaunchParallelWorkers(ParallelContext *pcxt)
+{
+	MemoryContext	oldcontext;
+	BackgroundWorker	worker;
+	int		i;
+
+	/* We might be running in a very short-lived memory context. */
+	oldcontext = MemoryContextSwitchTo(TopTransactionContext);
+
+	/* Configure a worker. */
+	snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
+			 MyProcPid);
+	worker.bgw_flags =
+		BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
+	worker.bgw_start_time = BgWorkerStart_ConsistentState;
+	worker.bgw_restart_time = BGW_NEVER_RESTART;
+	worker.bgw_main = ParallelMain;
+	worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(pcxt->seg));
+	worker.bgw_notify_pid = MyProcPid;
+
+	/*
+	 * Start workers.
+	 *
+	 * The caller must be able to tolerate ending up with fewer workers than
+	 * expected, so there is no need to throw an error here if registration
+	 * fails.  It wouldn't help much anyway, because registering the worker
+	 * in no way guarantees that it will start up and initialize successfully.
+	 * We do, however, give up on registering any more workers once
+	 * registration fails the first time; no sense beating our head against
+	 * a brick wall.
+	 */
+	for (i = 0; i < pcxt->nworkers; ++i)
+		if (!RegisterDynamicBackgroundWorker(&worker,
+											 &pcxt->worker[i].bgwhandle))
+			break;
+
+	/* Restore previous memory context. */
+	MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Destroy a parallel context.
+ */
+void
+DestroyParallelContext(ParallelContext *pcxt)
+{
+	int		i;
+
+	/*
+	 * Be careful about order of operations here!  We remove the parallel
+	 * context from the list before we do anything else; otherwise, if an
+	 * error occurs during a subsequent step, we might try to nuke it again
+	 * from AtEOXact_Parallel or AtEOSubXact_Parallel.
+	 */
+	dlist_delete(&pcxt->node);
+
+	/*
+	 * If any background workers have been started, terminate them all.
+	 * To avoid leaking memory, release the background worker and the message
+	 * queue handle.  This doesn't actually detach the message queue; we'll
+	 * take care of that by detaching the dynamic shared memory segment itself.
+	 */
+	if (pcxt->worker != NULL)
+	{
+		for (i = 0; i < pcxt->nworkers; ++i)
+		{
+			if (pcxt->worker[i].bgwhandle != NULL)
+			{
+				TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
+				pfree(pcxt->worker[i].bgwhandle);
+			}
+			pfree(pcxt->worker[i].error_mqh);
+		}
+	}
+
+	/*
+	 * If we have allocated a shared memory segment, detach it.  This will
+	 * implicitly detach the error queues, and any other shared memory queues,
+	 * stored there.
+	 */
+	if (pcxt->seg != NULL)
+		dsm_detach(pcxt->seg);
+
+	/* Free memory. */
+	pfree(pcxt->worker);
+	pfree(pcxt);
+}
+
+/*
+ * Are there any parallel contexts currently active?
+ */
+bool
+ParallelContextActive(void)
+{
+	return !dlist_is_empty(&pcxt_list);
+}
+
+/*
+ * Handle receipt of an interrupt indicating a parallel worker message.
+ *
+ * If signal_handler is true, we are being called from a signal handler and must
+ * be extremely cautious about what we do here!
+ */
+void
+HandleParallelMessageInterrupt(bool signal_handler)
+{
+	int			save_errno = errno;
+
+	/* Don't joggle the elbow of proc_exit */
+	if (!proc_exit_inprogress)
+	{
+		InterruptPending = true;
+		ParallelMessagePending = true;
+
+		/*
+		 * If it's safe to interrupt, service the interrupt immediately.
+		 * (We shouldn't be in parallel mode if waiting for the user to send
+		 * a new query, but we could be waiting for a lock.)
+		 */
+		if ((ImmediateInterruptOK || !signal_handler) && InterruptHoldoffCount == 0
+			&& CritSectionCount == 0)
+		{
+			bool notify_enabled;
+			bool catchup_enabled;
+			bool save_ImmediateInterruptOK;
+
+			/*
+			 * Disable everything that might recursively interrupt us.
+			 *
+			 * If there were any possibility that disabling and re-enabling
+			 * interrupts or handling parallel messages might take a lock, we'd
+			 * need to HOLD_INTERRUPTS() as well, since taking a lock might
+			 * cause ImmediateInterruptOK to get temporarily reset to true.
+			 * But that shouldn't happen, so this is (hopefully) safe.  That's
+			 * good, because it lets us respond to query cancel and die
+			 * interrupts while we're in the midst of message-processing.
+			 */
+			save_ImmediateInterruptOK = ImmediateInterruptOK;
+			ImmediateInterruptOK = false;
+			notify_enabled = DisableNotifyInterrupt();
+			catchup_enabled = DisableCatchupInterrupt();
+
+			/* OK, do the work... */
+			HandleParallelMessages();
+
+			/* Now re-enable whatever was enabled before */
+			if (catchup_enabled)
+				EnableCatchupInterrupt();
+			if (notify_enabled)
+				EnableNotifyInterrupt();
+			ImmediateInterruptOK = save_ImmediateInterruptOK;
+		}
+	}
+
+	errno = save_errno;
+}
+
+/*
+ * Handle any queued protocol messages received from parallel workers.
+ */
+static void
+HandleParallelMessages(void)
+{
+	dlist_iter	iter;
+
+	ParallelMessagePending = false;
+
+	dlist_foreach(iter, &pcxt_list)
+	{
+		ParallelContext *pcxt;
+		int		i;
+		Size	nbytes;
+		void   *data;
+
+		pcxt = dlist_container(ParallelContext, node, iter.cur);
+		if (pcxt->worker == NULL)
+			continue;
+
+		for (i = 0; i < pcxt->nworkers; ++i)
+		{
+			if (pcxt->worker[i].error_mqh == NULL)
+				continue;
+
+			for (;;)
+			{
+				shm_mq_result	res;
+
+				CHECK_FOR_INTERRUPTS();
+
+				res = shm_mq_receive(pcxt->worker[i].error_mqh, &nbytes,
+									 &data, true);
+				if (res == SHM_MQ_SUCCESS)
+				{
+					StringInfoData	msg;
+
+					initStringInfo(&msg);
+					appendBinaryStringInfo(&msg, data, nbytes);
+					HandleParallelMessage(&msg);
+					pfree(msg.data);
+				}
+				if (res == SHM_MQ_DETACHED)
+					ereport(ERROR,
+							(errcode(ERRCODE_INTERNAL_ERROR), /* XXX: wrong errcode? */
+							 errmsg("lost connection to parallel worker")));
+			}
+		}
+	}
+}
+
+/*
+ * Handle a single protocol message received from a single parallel worker.
+ */
+static void
+HandleParallelMessage(StringInfo msg)
+{
+	char	msgtype;
+
+	msgtype = pq_getmsgbyte(msg);
+
+	switch (msgtype)
+	{
+		case 'E':
+		case 'N':
+			{
+				ErrorData	edata;
+
+				/* Parse ErrorReponse or NoticeResponse. */
+				pq_parse_errornotice(msg, &edata);
+
+				/* Death of a worker is insufficient justification for suicide. */
+				edata.elevel = Min(edata.elevel, ERROR);
+
+				/*
+				 * XXX.  We should probably use the error context callbacks in
+				 * effect at the time the parallel context was created.
+				 */
+
+				ThrowErrorData(&edata);
+
+				break;
+			}
+
+		case 'A':
+			{
+				/* Propagate NotifyResponse. */
+				pq_putmessage(msg->data[0], &msg->data[1], msg->len - 1);
+				break;
+			}
+
+		default:
+			{
+				elog(ERROR, "unknown message type: %c (%d bytes)",
+					 msgtype, msg->len);
+			}
+	}
+}
+
+/*
+ * End-of-subtransaction cleanup for parallel contexts.
+ *
+ * Currently, it's forbidden to enter or leave a subtransaction while
+ * parallel mode is in effect, so we could just blow away everything.  But
+ * we may want to relax that restriction in the future, so this code
+ * contemplates that there may be multiple subtransaction IDs in pcxt_list.
+ */
+void
+AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId)
+{
+	while (!dlist_is_empty(&pcxt_list))
+	{
+		ParallelContext *pcxt;
+
+		pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
+		if (pcxt->subid != mySubId)
+			break;
+		if (isCommit)
+			elog(WARNING, "leaked parallel context");
+		DestroyParallelContext(pcxt);
+	}
+}
+
+/*
+ * End-of-transaction cleanup for parallel contexts.
+ */
+void
+AtEOXact_Parallel(bool isCommit)
+{
+	while (!dlist_is_empty(&pcxt_list))
+	{
+		ParallelContext *pcxt;
+
+		pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
+		if (isCommit)
+			elog(WARNING, "leaked parallel context");
+		DestroyParallelContext(pcxt);
+	}
+}
+
+/*
+ * Main entrypoint for parallel workers.
+ */
+static void
+ParallelMain(Datum main_arg)
+{
+	dsm_segment *seg;
+	shm_toc *toc;
+	FixedParallelState *fps;
+	char   *error_queue_space;
+	shm_mq *mq;
+	shm_mq_handle *mqh;
+	char   *gucspace;
+	char   *tsnapspace;
+	char   *asnapspace;
+	char   *combocidspace;
+
+	/* Establish signal handlers. */
+	pqsignal(SIGTERM, handle_sigterm);
+	BackgroundWorkerUnblockSignals();
+
+	/* Set up a memory context and resource owner. */
+	Assert(CurrentResourceOwner == NULL);
+	CurrentResourceOwner = ResourceOwnerCreate(NULL, "parallel toplevel");
+	CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext,
+												 "parallel worker",
+												 ALLOCSET_DEFAULT_MINSIZE,
+												 ALLOCSET_DEFAULT_INITSIZE,
+												 ALLOCSET_DEFAULT_MAXSIZE);
+
+	/*
+	 * Now that we have a resource owner, we can attach to the dynamic
+	 * shared memory segment and read the table of contents.
+	 */
+	seg = dsm_attach(DatumGetInt32(main_arg));
+	if (seg == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("unable to map dynamic shared memory segment")));
+	toc = shm_toc_attach(PARALLEL_MAGIC, dsm_segment_address(seg));
+	if (toc == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("bad magic number in dynamic shared memory segment")));
+
+	/* Determine and set our worker number. */
+	fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED);
+	Assert(fps != NULL);
+	Assert(ParallelWorkerNumber == -1);
+	SpinLockAcquire(&fps->mutex);
+	if (fps->workers_attached < fps->workers_expected)
+		ParallelWorkerNumber = fps->workers_attached++;
+	SpinLockRelease(&fps->mutex);
+	if (ParallelWorkerNumber < 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("too many parallel workers already attached")));
+
+	/*
+	 * Now that we have a worker number, we can find and attach to the error
+	 * queue provided for us.  That's good, because until we do that, any
+	 * errors that happen here will not be reported back to the process that
+	 * requested that this worker be launched.
+	 */
+	error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE);
+	mq = (shm_mq *) (error_queue_space +
+		ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE);
+	shm_mq_set_sender(mq, MyProc);
+	mqh = shm_mq_attach(mq, seg, NULL);
+	pq_redirect_to_shm_mq(mq, mqh);
+	pq_set_parallel_master(fps->parallel_master_pid,
+						   fps->parallel_master_backend_id);
+
+	/*
+	 * Hooray! Primary initialization is complete.  Now, we need to set up
+	 * our backend-local state to match the original backend.
+	 */
+
+	/* Restore database connection. */
+	BackgroundWorkerInitializeConnectionByOid(fps->database_id,
+											  fps->authenticated_user_id);
+
+	/* Restore GUC values from launching backend. */
+	gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC);
+	Assert(gucspace != NULL);
+	StartTransactionCommand();
+	RestoreGUCState(gucspace);
+	CommitTransactionCommand();
+
+	/* Handle local_preload_libraries and session_preload_libraries. */
+	process_session_preload_libraries();
+
+	/*
+	 * XXX.  At this point, we should restore the transaction state as it
+	 * exists in the master.  But since we don't have code for that yet, just
+	 * start a new transaction.
+	 */
+	StartTransactionCommand();
+
+	/* Restore combo CID state. */
+	combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID);
+	Assert(combocidspace != NULL);
+	RestoreComboCIDState(combocidspace);
+
+	/* Restore transaction snapshot. */
+	tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT);
+	Assert(tsnapspace != NULL);
+	RestoreTransactionSnapshot(RestoreSnapshot(tsnapspace),
+							   fps->parallel_master_pgproc);
+
+	/* Restore active snapshot. */
+	asnapspace = shm_toc_lookup(toc, PARALLEL_KEY_ACTIVE_SNAPSHOT);
+	Assert(asnapspace != NULL);
+	PushActiveSnapshot(RestoreSnapshot(asnapspace));
+
+	/* Restore user ID and security context. */
+	SetUserIdAndSecContext(fps->current_user_id, fps->sec_context);
+
+	/*
+	 * We've initialized all of our state now; nothing should change hereafter.
+	 */
+	EnterParallelMode();
+
+	/*
+	 * Time to do the real work: invoke the caller-supplied code.
+	 *
+	 * Note that it's unsafe for this entrypoint to be a function living in a
+	 * dynamically loaded module, because it might not be loaded at the same
+	 * address in every process.  Use CreateParallelContextForExtension()
+	 * rather than CreateParallelContext() to handle that case.
+	 */
+	fps->entrypoint(toc);
+
+	/*
+	 * XXX. There's probably some end-of-parallel-phase cleanup that needs to
+	 * happen here; we shouldn't just exit in the midde of a transaction.
+	 * But I don't know what's needed yet.
+	 */
+}
+
+/*
+ * In-core trampoline to invoke extension entrypints.
+ *
+ * See comments in ParallelMain() and StartBackgroundWorker() for why we
+ * need this.
+ */
+static void
+ParallelExtensionTrampoline(shm_toc *toc)
+{
+	char   *extensionstate;
+	char   *library_name;
+	char   *function_name;
+	parallel_worker_main_type entrypt;
+
+	extensionstate = shm_toc_lookup(toc, PARALLEL_KEY_EXTENSION_TRAMPOLINE);
+	Assert(extensionstate != NULL);
+	library_name = extensionstate;
+	function_name = extensionstate + strlen(library_name) + 1;
+
+	entrypt = (parallel_worker_main_type)
+		load_external_function(library_name, function_name, true, NULL);
+	entrypt(toc);
+}
+
+/*
+ * When we receive a SIGTERM, we set InterruptPending and ProcDiePending just
+ * like a normal backend.  The next CHECK_FOR_INTERRUPTS() will do the right
+ * thing.
+ */
+static void
+handle_sigterm(SIGNAL_ARGS)
+{
+	int		save_errno = errno;
+
+	if (MyProc)
+		SetLatch(&MyProc->procLatch);
+
+	if (!proc_exit_inprogress)
+	{
+		InterruptPending = true;
+		ProcDiePending = true;
+	}
+
+	errno = save_errno;
+}
diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c
index c541156..92f657e 100644
--- a/src/backend/access/transam/varsup.c
+++ b/src/backend/access/transam/varsup.c
@@ -50,6 +50,13 @@ GetNewTransactionId(bool isSubXact)
 	TransactionId xid;
 
 	/*
+	 * Workers synchronize transaction state at the beginning of each parallel
+	 * operation, so we can't account for new XIDs after that point.
+	 */
+	if (IsInParallelMode())
+		elog(ERROR, "cannot assign TransactionIds during a parallel operation");
+
+	/*
 	 * During bootstrap initialization, we return the special bootstrap
 	 * transaction id.
 	 */
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 8b2f714..2fa2c11 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -22,6 +22,7 @@
 
 #include "access/commit_ts.h"
 #include "access/multixact.h"
+#include "access/parallel.h"
 #include "access/subtrans.h"
 #include "access/transam.h"
 #include "access/twophase.h"
@@ -152,6 +153,7 @@ typedef struct TransactionStateData
 	bool		prevXactReadOnly;		/* entry-time xact r/o state */
 	bool		startedInRecovery;		/* did we start in recovery? */
 	bool		didLogXid;		/* has xid been included in WAL record? */
+	bool		parallelMode;	/* current transaction in parallel operation? */
 	struct TransactionStateData *parent;		/* back link to parent */
 } TransactionStateData;
 
@@ -182,6 +184,7 @@ static TransactionStateData TopTransactionStateData = {
 	false,						/* entry-time xact r/o state */
 	false,						/* startedInRecovery */
 	false,						/* didLogXid */
+	false,						/* parallelMode */
 	NULL						/* link to parent state block */
 };
 
@@ -642,7 +645,16 @@ GetCurrentCommandId(bool used)
 {
 	/* this is global to a transaction, not subtransaction-local */
 	if (used)
+	{
+		/*
+		 * Forbid setting currentCommandIdUsed in parallel mode, because we
+		 * have no provision for communicating this back to the master.  We
+		 * could relax this restriction when currentCommandIdUsed was already
+		 * true at the start of the parallel operation.
+		 */
+		Assert(!CurrentTransactionState->parallelMode);
 		currentCommandIdUsed = true;
+	}
 	return currentCommandId;
 }
 
@@ -789,6 +801,53 @@ TransactionStartedDuringRecovery(void)
 }
 
 /*
+ *	EnterParallelMode
+ */
+void
+EnterParallelMode(void)
+{
+	TransactionState s = CurrentTransactionState;
+
+	/*
+	 * Workers synchronize transaction state at the beginning of each
+	 * parallel operation, so we can't let the transaction state be changed
+	 * after that point.  That includes the parallel mode flag itself.
+	 */
+	Assert(!s->parallelMode);
+
+	s->parallelMode = true;
+}
+
+/*
+ *	ExitParallelMode
+ */
+void
+ExitParallelMode(void)
+{
+	TransactionState s = CurrentTransactionState;
+
+	Assert(s->parallelMode);
+	Assert(!ParallelContextActive());
+
+	s->parallelMode = false;
+}
+
+/*
+ *	IsInParallelMode
+ *
+ * Are we in a parallel operation, as either the master or a worker?  Check
+ * this to prohibit operations that change backend-local state expected to
+ * match across all workers.  Mere caches usually don't require such a
+ * restriction.  State modified in a strict push/pop fashion, such as the
+ * active snapshot stack, is often fine.
+ */
+bool
+IsInParallelMode(void)
+{
+	return CurrentTransactionState->parallelMode;
+}
+
+/*
  *	CommandCounterIncrement
  */
 void
@@ -802,6 +861,14 @@ CommandCounterIncrement(void)
 	 */
 	if (currentCommandIdUsed)
 	{
+		/*
+		 * Workers synchronize transaction state at the beginning of each
+		 * parallel operation, so we can't account for new commands after that
+		 * point.
+		 */
+		if (IsInParallelMode())
+			elog(ERROR, "cannot start commands during a parallel operation");
+
 		currentCommandId += 1;
 		if (currentCommandId == InvalidCommandId)
 		{
@@ -1705,6 +1772,8 @@ StartTransaction(void)
 	s = &TopTransactionStateData;
 	CurrentTransactionState = s;
 
+	Assert(!IsInParallelMode());
+
 	/*
 	 * check the current transaction state
 	 */
@@ -1835,6 +1904,8 @@ CommitTransaction(void)
 	TransactionState s = CurrentTransactionState;
 	TransactionId latestXid;
 
+	Assert(!IsInParallelMode());
+
 	ShowTransactionState("CommitTransaction");
 
 	/*
@@ -1921,6 +1992,13 @@ CommitTransaction(void)
 
 	TRACE_POSTGRESQL_TRANSACTION_COMMIT(MyProc->lxid);
 
+	/* Exit from parallel mode, if necessary. */
+	if (IsInParallelMode())
+	{
+		AtEOXact_Parallel(true);
+		s->parallelMode = false;
+	}
+
 	/*
 	 * Let others know about no transaction in progress by me. Note that this
 	 * must be done _before_ releasing locks we hold and _after_
@@ -2040,6 +2118,8 @@ PrepareTransaction(void)
 	GlobalTransaction gxact;
 	TimestampTz prepared_at;
 
+	Assert(!IsInParallelMode());
+
 	ShowTransactionState("PrepareTransaction");
 
 	/*
@@ -2391,6 +2471,13 @@ AbortTransaction(void)
 
 	TRACE_POSTGRESQL_TRANSACTION_ABORT(MyProc->lxid);
 
+	/* Exit from parallel mode, if necessary. */
+	if (IsInParallelMode())
+	{
+		AtEOXact_Parallel(false);
+		s->parallelMode = false;
+	}
+
 	/*
 	 * Let others know about no transaction in progress by me. Note that this
 	 * must be done _before_ releasing locks we hold and _after_
@@ -3540,6 +3627,16 @@ DefineSavepoint(char *name)
 {
 	TransactionState s = CurrentTransactionState;
 
+	/*
+	 * Workers synchronize transaction state at the beginning of each parallel
+	 * operation, so we can't account for new subtransactions after that
+	 * point.
+	 */
+	if (IsInParallelMode())
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+				 errmsg("cannot define savepoints during a parallel operation")));
+
 	switch (s->blockState)
 	{
 		case TBLOCK_INPROGRESS:
@@ -3594,6 +3691,16 @@ ReleaseSavepoint(List *options)
 	ListCell   *cell;
 	char	   *name = NULL;
 
+	/*
+	 * Workers synchronize transaction state at the beginning of each parallel
+	 * operation, so we can't account for transaction state change after that
+	 * point.
+	 */
+	if (IsInParallelMode())
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+				 errmsg("cannot release savepoints during a parallel operation")));
+
 	switch (s->blockState)
 	{
 			/*
@@ -3694,6 +3801,16 @@ RollbackToSavepoint(List *options)
 	ListCell   *cell;
 	char	   *name = NULL;
 
+	/*
+	 * Workers synchronize transaction state at the beginning of each parallel
+	 * operation, so we can't account for transaction state change after that
+	 * point.
+	 */
+	if (IsInParallelMode())
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+				 errmsg("cannot rollback to savepoints during a parallel operation")));
+
 	switch (s->blockState)
 	{
 			/*
@@ -3806,6 +3923,16 @@ BeginInternalSubTransaction(char *name)
 {
 	TransactionState s = CurrentTransactionState;
 
+	/*
+	 * Workers synchronize transaction state at the beginning of each parallel
+	 * operation, so we can't account for new subtransactions after that
+	 * point.
+	 */
+	if (IsInParallelMode())
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+				 errmsg("cannot start subtransactions during a parallel operation")));
+
 	switch (s->blockState)
 	{
 		case TBLOCK_STARTED:
@@ -3860,6 +3987,18 @@ ReleaseCurrentSubTransaction(void)
 {
 	TransactionState s = CurrentTransactionState;
 
+	/*
+	 * Workers synchronize transaction state at the beginning of each parallel
+	 * operation, so we can't account for commit of subtransactions after that
+	 * point.  This should not happen anyway.  Code calling this would
+	 * typically have called BeginInternalSubTransaction() first, failing
+	 * there.
+	 */
+	if (IsInParallelMode())
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+				 errmsg("cannot commit subtransactions during a parallel operation")));
+
 	if (s->blockState != TBLOCK_SUBINPROGRESS)
 		elog(ERROR, "ReleaseCurrentSubTransaction: unexpected state %s",
 			 BlockStateAsString(s->blockState));
@@ -3882,6 +4021,14 @@ RollbackAndReleaseCurrentSubTransaction(void)
 {
 	TransactionState s = CurrentTransactionState;
 
+	/*
+	 * Unlike ReleaseCurrentSubTransaction(), this is nominally permitted
+	 * during parallel operations.  That's because we may be in the master,
+	 * recovering from an error thrown while we were in parallel mode.  We
+	 * won't reach here in a worker, because BeginInternalSubTransaction()
+	 * will have failed.
+	 */
+
 	switch (s->blockState)
 	{
 			/* Must be in a subtransaction */
@@ -4157,6 +4304,13 @@ CommitSubTransaction(void)
 		elog(WARNING, "CommitSubTransaction while in %s state",
 			 TransStateAsString(s->state));
 
+	/* Exit from parallel mode, if necessary. */
+	if (IsInParallelMode())
+	{
+		AtEOSubXact_Parallel(true, s->subTransactionId);
+		s->parallelMode = false;
+	}
+
 	/* Pre-commit processing goes here */
 
 	CallSubXactCallbacks(SUBXACT_EVENT_PRE_COMMIT_SUB, s->subTransactionId,
@@ -4315,6 +4469,13 @@ AbortSubTransaction(void)
 	 */
 	SetUserIdAndSecContext(s->prevUser, s->prevSecContext);
 
+	/* Exit from parallel mode, if necessary. */
+	if (IsInParallelMode())
+	{
+		AtEOSubXact_Parallel(false, s->subTransactionId);
+		s->parallelMode = false;
+	}
+
 	/*
 	 * We can skip all this stuff if the subxact failed before creating a
 	 * ResourceOwner...
@@ -4455,6 +4616,7 @@ PushTransaction(void)
 	s->blockState = TBLOCK_SUBBEGIN;
 	GetUserIdAndSecContext(&s->prevUser, &s->prevSecContext);
 	s->prevXactReadOnly = XactReadOnly;
+	s->parallelMode = false;
 
 	CurrentTransactionState = s;
 
diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c
index 4a542e6..04ecbbb 100644
--- a/src/backend/bootstrap/bootstrap.c
+++ b/src/backend/bootstrap/bootstrap.c
@@ -476,7 +476,7 @@ BootstrapModeMain(void)
 	 */
 	InitProcess();
 
-	InitPostgres(NULL, InvalidOid, NULL, NULL);
+	InitPostgres(NULL, InvalidOid, NULL, InvalidOid, NULL);
 
 	/* Initialize stuff for bootstrap-file processing */
 	for (i = 0; i < MAXATTR; i++)
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 08abe14..4790df5 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -914,9 +914,10 @@ DoCopy(const CopyStmt *stmt, const char *queryString, uint64 *processed)
 	{
 		Assert(rel);
 
-		/* check read-only transaction */
+		/* check read-only transaction and parallel mode */
 		if (XactReadOnly && !rel->rd_islocaltemp)
 			PreventCommandIfReadOnly("COPY FROM");
+		PreventCommandIfParallelMode("COPY FROM");
 
 		cstate = BeginCopyFrom(rel, stmt->filename, stmt->is_program,
 							   stmt->attlist, stmt->options);
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index ba5b938..0b422dc 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -551,6 +551,13 @@ nextval_internal(Oid relid)
 	if (!seqrel->rd_islocaltemp)
 		PreventCommandIfReadOnly("nextval()");
 
+	/*
+	 * Forbid this during parallel operation because, to make it work,
+	 * the cooperating backends would need to share the backend-local cached
+	 * sequence information.  Currently, we don't support that.
+	 */
+	PreventCommandIfParallelMode("nextval()");
+
 	if (elm->last != elm->cached)		/* some numbers were cached */
 	{
 		Assert(elm->last_valid);
@@ -838,6 +845,13 @@ do_setval(Oid relid, int64 next, bool iscalled)
 	if (!seqrel->rd_islocaltemp)
 		PreventCommandIfReadOnly("setval()");
 
+	/*
+	 * Forbid this during parallel operation because, to make it work,
+	 * the cooperating backends would need to share the backend-local cached
+	 * sequence information.  Currently, we don't support that.
+	 */
+	PreventCommandIfParallelMode("setval()");
+
 	/* lock page' buffer and read tuple */
 	seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple);
 
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 8c799d3..9223f5e 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -135,8 +135,20 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
 	/*
 	 * If the transaction is read-only, we need to check if any writes are
 	 * planned to non-temporary tables.  EXPLAIN is considered read-only.
+	 *
+	 * Don't allow writes in parallel mode.  Supporting UPDATE and DELETE would
+	 * require (a) storing the combocid hash in shared memory, rather than
+	 * synchronizing it just once at the start of parallelism, and (b) an
+	 * alternative to heap_update()'s reliance on xmax for mutual exclusion.
+	 * INSERT may have no such troubles, but we forbid it to simplify the
+	 * checks.
+	 *
+	 * We have lower-level defenses in CommandCounterIncrement and elsewhere
+	 * against performing unsafe operations in parallel mode, but this gives
+	 * a more user-friendly error message.
 	 */
-	if (XactReadOnly && !(eflags & EXEC_FLAG_EXPLAIN_ONLY))
+	if ((XactReadOnly || IsInParallelMode()) &&
+		!(eflags & EXEC_FLAG_EXPLAIN_ONLY))
 		ExecCheckXactReadOnly(queryDesc->plannedstmt);
 
 	/*
@@ -679,18 +691,23 @@ ExecCheckRTEPerms(RangeTblEntry *rte)
 }
 
 /*
- * Check that the query does not imply any writes to non-temp tables.
+ * Check that the query does not imply any writes to non-temp tables;
+ * unless we're in parallel mode, in which case don't even allow writes
+ * to temp tables.
  *
  * Note: in a Hot Standby slave this would need to reject writes to temp
- * tables as well; but an HS slave can't have created any temp tables
- * in the first place, so no need to check that.
+ * tables just as we do in parallel mode; but an HS slave can't have created
+ * any temp tables in the first place, so no need to check that.
  */
 static void
 ExecCheckXactReadOnly(PlannedStmt *plannedstmt)
 {
 	ListCell   *l;
 
-	/* Fail if write permissions are requested on any non-temp table */
+	/*
+	 * Fail if write permissions are requested in parallel mode for
+	 * table (temp or non-temp), otherwise fail for any non-temp table.
+	 */
 	foreach(l, plannedstmt->rtable)
 	{
 		RangeTblEntry *rte = (RangeTblEntry *) lfirst(l);
@@ -701,6 +718,8 @@ ExecCheckXactReadOnly(PlannedStmt *plannedstmt)
 		if ((rte->requiredPerms & (~ACL_SELECT)) == 0)
 			continue;
 
+		PreventCommandIfParallelMode(CreateCommandTag((Node *) plannedstmt));
+
 		if (isTempNamespace(get_rel_namespace(rte->relid)))
 			continue;
 
diff --git a/src/backend/executor/functions.c b/src/backend/executor/functions.c
index 4d11260..62b615a 100644
--- a/src/backend/executor/functions.c
+++ b/src/backend/executor/functions.c
@@ -513,6 +513,9 @@ init_execution_state(List *queryTree_list,
 					   errmsg("%s is not allowed in a non-volatile function",
 							  CreateCommandTag(stmt))));
 
+			if (IsInParallelMode() && !CommandIsReadOnly(stmt))
+				PreventCommandIfParallelMode(CreateCommandTag(stmt));
+
 			/* OK, build the execution_state for this query */
 			newes = (execution_state *) palloc(sizeof(execution_state));
 			if (preves)
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index cfa4a24..72d55c7 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -23,6 +23,7 @@
 #include "commands/trigger.h"
 #include "executor/executor.h"
 #include "executor/spi_priv.h"
+#include "miscadmin.h"
 #include "tcop/pquery.h"
 #include "tcop/utility.h"
 #include "utils/builtins.h"
@@ -1322,13 +1323,14 @@ SPI_cursor_open_internal(const char *name, SPIPlanPtr plan,
 	}
 
 	/*
-	 * If told to be read-only, we'd better check for read-only queries. This
-	 * can't be done earlier because we need to look at the finished, planned
-	 * queries.  (In particular, we don't want to do it between GetCachedPlan
-	 * and PortalDefineQuery, because throwing an error between those steps
-	 * would result in leaking our plancache refcount.)
+	 * If told to be read-only, or in parallel mode, verify that this query
+	 * is in fact read-only.  This can't be done earlier because we need to
+	 * look at the finished, planned queries.  (In particular, we don't want
+	 * to do it between GetCachedPlan and PortalDefineQuery, because throwing
+	 * an error between those steps would result in leaking our plancache
+	 * refcount.)
 	 */
-	if (read_only)
+	if (read_only || IsInParallelMode())
 	{
 		ListCell   *lc;
 
@@ -1337,11 +1339,16 @@ SPI_cursor_open_internal(const char *name, SPIPlanPtr plan,
 			Node	   *pstmt = (Node *) lfirst(lc);
 
 			if (!CommandIsReadOnly(pstmt))
-				ereport(ERROR,
-						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-				/* translator: %s is a SQL statement name */
-					   errmsg("%s is not allowed in a non-volatile function",
-							  CreateCommandTag(pstmt))));
+			{
+				if (read_only)
+					ereport(ERROR,
+							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					/* translator: %s is a SQL statement name */
+							 errmsg("%s is not allowed in a non-volatile function",
+									CreateCommandTag(pstmt))));
+				else
+					PreventCommandIfParallelMode(CreateCommandTag(pstmt));
+			}
 		}
 	}
 
@@ -2129,6 +2136,9 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 					   errmsg("%s is not allowed in a non-volatile function",
 							  CreateCommandTag(stmt))));
 
+			if (IsInParallelMode() && !CommandIsReadOnly(stmt))
+				PreventCommandIfParallelMode(CreateCommandTag(stmt));
+
 			/*
 			 * If not read-only mode, advance the command counter before each
 			 * command and update the snapshot.
diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c
index 6e6b429..b07978c 100644
--- a/src/backend/libpq/pqmq.c
+++ b/src/backend/libpq/pqmq.c
@@ -16,12 +16,15 @@
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
 #include "libpq/pqmq.h"
+#include "miscadmin.h"
 #include "tcop/tcopprot.h"
 #include "utils/builtins.h"
 
 static shm_mq *pq_mq;
 static shm_mq_handle *pq_mq_handle;
 static bool pq_mq_busy = false;
+static pid_t pq_mq_parallel_master_pid = 0;
+static pid_t pq_mq_parallel_master_backend_id = InvalidBackendId;
 
 static void mq_comm_reset(void);
 static int	mq_flush(void);
@@ -57,6 +60,18 @@ pq_redirect_to_shm_mq(shm_mq *mq, shm_mq_handle *mqh)
 	FrontendProtocol = PG_PROTOCOL_LATEST;
 }
 
+/*
+ * Arrange to SendProcSignal() to the parallel master each time we transmit
+ * message data via the shm_mq.
+ */
+void
+pq_set_parallel_master(pid_t pid, BackendId backend_id)
+{
+	Assert(PqCommMethods == &PqCommMqMethods);
+	pq_mq_parallel_master_pid = pid;
+	pq_mq_parallel_master_backend_id = backend_id;
+}
+
 static void
 mq_comm_reset(void)
 {
@@ -120,7 +135,23 @@ mq_putmessage(char msgtype, const char *s, size_t len)
 	iov[1].len = len;
 
 	Assert(pq_mq_handle != NULL);
-	result = shm_mq_sendv(pq_mq_handle, iov, 2, false);
+
+	for (;;)
+	{
+		result = shm_mq_sendv(pq_mq_handle, iov, 2, true);
+
+		if (pq_mq_parallel_master_pid != 0)
+			SendProcSignal(pq_mq_parallel_master_pid,
+						   PROCSIG_PARALLEL_MESSAGE,
+						   pq_mq_parallel_master_backend_id);
+
+		if (result != SHM_MQ_WOULD_BLOCK)
+			break;
+
+		WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0);
+		CHECK_FOR_INTERRUPTS();
+		ResetLatch(&MyProc->procLatch);
+	}
 
 	pq_mq_busy = false;
 
diff --git a/src/backend/postmaster/autovacuum.c b/src/backend/postmaster/autovacuum.c
index 1d6e3f3..8fb39d3 100644
--- a/src/backend/postmaster/autovacuum.c
+++ b/src/backend/postmaster/autovacuum.c
@@ -471,7 +471,7 @@ AutoVacLauncherMain(int argc, char *argv[])
 	InitProcess();
 #endif
 
-	InitPostgres(NULL, InvalidOid, NULL, NULL);
+	InitPostgres(NULL, InvalidOid, NULL, InvalidOid, NULL);
 
 	SetProcessingMode(NormalProcessing);
 
@@ -1665,7 +1665,7 @@ AutoVacWorkerMain(int argc, char *argv[])
 		 * Note: if we have selected a just-deleted database (due to using
 		 * stale stats info), we'll fail and exit here.
 		 */
-		InitPostgres(NULL, dbid, NULL, dbname);
+		InitPostgres(NULL, dbid, NULL, InvalidOid, dbname);
 		SetProcessingMode(NormalProcessing);
 		set_ps_display(dbname, false);
 		ereport(DEBUG1,
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 5106f52..451f814 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -5305,7 +5305,30 @@ BackgroundWorkerInitializeConnection(char *dbname, char *username)
 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
 				 errmsg("database connection requirement not indicated during registration")));
 
-	InitPostgres(dbname, InvalidOid, username, NULL);
+	InitPostgres(dbname, InvalidOid, username, InvalidOid, NULL);
+
+	/* it had better not gotten out of "init" mode yet */
+	if (!IsInitProcessingMode())
+		ereport(ERROR,
+				(errmsg("invalid processing mode in background worker")));
+	SetProcessingMode(NormalProcessing);
+}
+
+/*
+ * Connect background worker to a database using OIDs.
+ */
+void
+BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid)
+{
+	BackgroundWorker *worker = MyBgworkerEntry;
+
+	/* XXX is this the right errcode? */
+	if (!(worker->bgw_flags & BGWORKER_BACKEND_DATABASE_CONNECTION))
+		ereport(FATAL,
+				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+				 errmsg("database connection requirement not indicated during registration")));
+
+	InitPostgres(NULL, dboid, NULL, useroid, NULL);
 
 	/* it had better not gotten out of "init" mode yet */
 	if (!IsInitProcessingMode())
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index d953545..7b7b57a 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -1686,6 +1686,50 @@ ProcArrayInstallImportedXmin(TransactionId xmin, TransactionId sourcexid)
 }
 
 /*
+ * ProcArrayInstallRestoredXmin -- install restored xmin into MyPgXact->xmin
+ *
+ * This is like ProcArrayInstallImportedXmin, but we have a pointer to the
+ * PGPROC of the transaction from which we imported the snapshot, rather than
+ * an XID.
+ *
+ * Returns TRUE if successful, FALSE if source xact is no longer running.
+ */
+bool
+ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc)
+{
+	bool		result = false;
+	TransactionId xid;
+	volatile PGXACT *pgxact;
+
+	Assert(TransactionIdIsNormal(xmin));
+	Assert(proc != NULL);
+
+	/* Get lock so source xact can't end while we're doing this */
+	LWLockAcquire(ProcArrayLock, LW_SHARED);
+
+	pgxact = &allPgXact[proc->pgprocno];
+
+	/*
+	 * Be certain that the referenced PGPROC has an advertised xmin which
+	 * is no later than the one we're installing, so that the system-wide
+	 * xmin can't go backwards.  Also, make sure it's running in the same
+	 * database, so that the per-database xmin cannot go backwards.
+	 */
+	xid = pgxact->xmin;		/* fetch just once */
+	if (proc->databaseId == MyDatabaseId &&
+		TransactionIdIsNormal(xid) &&
+		TransactionIdPrecedesOrEquals(xid, xmin))
+	{
+		MyPgXact->xmin = TransactionXmin = xmin;
+		result = true;
+	}
+
+	LWLockRelease(ProcArrayLock);
+
+	return result;
+}
+
+/*
  * GetRunningTransactionData -- returns information about running transactions.
  *
  * Similar to GetSnapshotData but returns more information. We include
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c
index cd9a287..0678678 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -17,6 +17,7 @@
 #include <signal.h>
 #include <unistd.h>
 
+#include "access/parallel.h"
 #include "commands/async.h"
 #include "miscadmin.h"
 #include "storage/latch.h"
@@ -274,6 +275,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
 	if (CheckProcSignal(PROCSIG_NOTIFY_INTERRUPT))
 		HandleNotifyInterrupt();
 
+	if (CheckProcSignal(PROCSIG_PARALLEL_MESSAGE))
+		HandleParallelMessageInterrupt(true);
+
 	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_DATABASE))
 		RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_DATABASE);
 
diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c
index f126118..dd53426 100644
--- a/src/backend/storage/lmgr/predicate.c
+++ b/src/backend/storage/lmgr/predicate.c
@@ -1656,6 +1656,14 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot,
 
 	Assert(!RecoveryInProgress());
 
+	/*
+	 * Since all parts of a serializable transaction must use the same
+	 * snapshot, it is too late to establish one after a parallel operation
+	 * has begun.
+	 */
+	if (IsInParallelMode())
+		elog(ERROR, "cannot establish serializable snapshot during a parallel operation");
+
 	proc = MyProc;
 	Assert(proc != NULL);
 	GET_VXID_FROM_PGPROC(vxid, *proc);
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index cc62b2c..4285748 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -37,6 +37,7 @@
 #include "rusagestub.h"
 #endif
 
+#include "access/parallel.h"
 #include "access/printtup.h"
 #include "access/xact.h"
 #include "catalog/pg_type.h"
@@ -2968,7 +2969,8 @@ ProcessInterrupts(void)
 					 errmsg("canceling statement due to user request")));
 		}
 	}
-	/* If we get here, do nothing (probably, QueryCancelPending was reset) */
+	if (ParallelMessagePending)
+		HandleParallelMessageInterrupt(false);
 }
 
 
@@ -3704,7 +3706,7 @@ PostgresMain(int argc, char *argv[],
 	 * it inside InitPostgres() instead.  In particular, anything that
 	 * involves database access should be there, not here.
 	 */
-	InitPostgres(dbname, InvalidOid, username, NULL);
+	InitPostgres(dbname, InvalidOid, username, InvalidOid, NULL);
 
 	/*
 	 * If the PostmasterContext is still around, recycle the space; we don't
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index aa8fe88..c4862a9 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -128,14 +128,15 @@ CommandIsReadOnly(Node *parsetree)
 static void
 check_xact_readonly(Node *parsetree)
 {
-	if (!XactReadOnly)
+	/* Only perform the check if we have a reason to do so. */
+	if (!XactReadOnly && !IsInParallelMode())
 		return;
 
 	/*
 	 * Note: Commands that need to do more complicated checking are handled
 	 * elsewhere, in particular COPY and plannable statements do their own
-	 * checking.  However they should all call PreventCommandIfReadOnly to
-	 * actually throw the error.
+	 * checking.  However they should all call PreventCommandIfReadOnly
+	 * or PreventCommandIfParallelMode to actually throw the error.
 	 */
 
 	switch (nodeTag(parsetree))
@@ -207,6 +208,7 @@ check_xact_readonly(Node *parsetree)
 		case T_ImportForeignSchemaStmt:
 		case T_SecLabelStmt:
 			PreventCommandIfReadOnly(CreateCommandTag(parsetree));
+			PreventCommandIfParallelMode(CreateCommandTag(parsetree));
 			break;
 		default:
 			/* do nothing */
@@ -232,6 +234,24 @@ PreventCommandIfReadOnly(const char *cmdname)
 }
 
 /*
+ * PreventCommandIfParallelMode: throw error if current (sub)transaction is
+ * in parallel mode.
+ *
+ * This is useful mainly to ensure consistency of the error message wording;
+ * most callers have checked IsInParallelMode() for themselves.
+ */
+void
+PreventCommandIfParallelMode(const char *cmdname)
+{
+	if (IsInParallelMode())
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+		/* translator: %s is name of a SQL command, eg CREATE */
+				 errmsg("cannot execute %s during a parallel operation",
+						cmdname)));
+}
+
+/*
  * PreventCommandDuringRecovery: throw error if RecoveryInProgress
  *
  * The majority of operations that are unsafe in a Hot Standby slave
@@ -630,6 +650,7 @@ standard_ProcessUtility(Node *parsetree,
 		case T_ClusterStmt:
 			/* we choose to allow this during "read only" transactions */
 			PreventCommandDuringRecovery("CLUSTER");
+			/* forbidden in parallel mode due to CommandIsReadOnly */
 			cluster((ClusterStmt *) parsetree, isTopLevel);
 			break;
 
@@ -640,6 +661,7 @@ standard_ProcessUtility(Node *parsetree,
 				/* we choose to allow this during "read only" transactions */
 				PreventCommandDuringRecovery((stmt->options & VACOPT_VACUUM) ?
 											 "VACUUM" : "ANALYZE");
+				/* forbidden in parallel mode due to CommandIsReadOnly */
 				vacuum(stmt, InvalidOid, true, NULL, false, isTopLevel);
 			}
 			break;
@@ -716,6 +738,7 @@ standard_ProcessUtility(Node *parsetree,
 			 * outside a transaction block is presumed to be user error.
 			 */
 			RequireTransactionChain(isTopLevel, "LOCK TABLE");
+			/* forbidden in parallel mode due to CommandIsReadOnly */
 			LockTableCommand((LockStmt *) parsetree);
 			break;
 
@@ -747,6 +770,7 @@ standard_ProcessUtility(Node *parsetree,
 
 				/* we choose to allow this during "read only" transactions */
 				PreventCommandDuringRecovery("REINDEX");
+				/* forbidden in parallel mode due to CommandIsReadOnly */
 				switch (stmt->kind)
 				{
 					case REINDEX_OBJECT_INDEX:
diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c
index 8fccb4c..a045062 100644
--- a/src/backend/utils/init/miscinit.c
+++ b/src/backend/utils/init/miscinit.c
@@ -350,11 +350,10 @@ has_rolreplication(Oid roleid)
  * Initialize user identity during normal backend startup
  */
 void
-InitializeSessionUserId(const char *rolename)
+InitializeSessionUserId(const char *rolename, Oid roleid)
 {
 	HeapTuple	roleTup;
 	Form_pg_authid rform;
-	Oid			roleid;
 
 	/*
 	 * Don't do scans if we're bootstrapping, none of the system catalogs
@@ -365,7 +364,10 @@ InitializeSessionUserId(const char *rolename)
 	/* call only once */
 	AssertState(!OidIsValid(AuthenticatedUserId));
 
-	roleTup = SearchSysCache1(AUTHNAME, PointerGetDatum(rolename));
+	if (rolename != NULL)
+		roleTup = SearchSysCache1(AUTHNAME, PointerGetDatum(rolename));
+	else
+		roleTup = SearchSysCache1(AUTHOID, ObjectIdGetDatum(roleid));
 	if (!HeapTupleIsValid(roleTup))
 		ereport(FATAL,
 				(errcode(ERRCODE_INVALID_AUTHORIZATION_SPECIFICATION),
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index c348034..8f6d517 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -523,6 +523,9 @@ BaseInit(void)
  * name can be returned to the caller in out_dbname.  If out_dbname isn't
  * NULL, it must point to a buffer of size NAMEDATALEN.
  *
+ * Similarly, the username can be passed by name, using the username parameter,
+ * or by OID using the useroid parameter.
+ *
  * In bootstrap mode no parameters are used.  The autovacuum launcher process
  * doesn't use any parameters either, because it only goes far enough to be
  * able to read pg_database; it doesn't connect to any particular database.
@@ -537,7 +540,7 @@ BaseInit(void)
  */
 void
 InitPostgres(const char *in_dbname, Oid dboid, const char *username,
-			 char *out_dbname)
+			 Oid useroid, char *out_dbname)
 {
 	bool		bootstrap = IsBootstrapProcessingMode();
 	bool		am_superuser;
@@ -692,18 +695,18 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
 					(errcode(ERRCODE_UNDEFINED_OBJECT),
 					 errmsg("no roles are defined in this database system"),
 					 errhint("You should immediately run CREATE USER \"%s\" SUPERUSER;.",
-							 username)));
+							 username != NULL ? username : "postgres")));
 	}
 	else if (IsBackgroundWorker)
 	{
-		if (username == NULL)
+		if (username == NULL && !OidIsValid(useroid))
 		{
 			InitializeSessionUserIdStandalone();
 			am_superuser = true;
 		}
 		else
 		{
-			InitializeSessionUserId(username);
+			InitializeSessionUserId(username, useroid);
 			am_superuser = superuser();
 		}
 	}
@@ -712,7 +715,7 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
 		/* normal multiuser case */
 		Assert(MyProcPort != NULL);
 		PerformAuthentication(MyProcPort);
-		InitializeSessionUserId(username);
+		InitializeSessionUserId(username, useroid);
 		am_superuser = superuser();
 	}
 
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index b1bff7f..a30d809 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -5656,6 +5656,20 @@ set_config_option(const char *name, const char *value,
 			elevel = ERROR;
 	}
 
+	/*
+	 * GUC_ACTION_SAVE changes are acceptable during a parallel operation,
+	 * because the current worker will also pop the change.  We're probably
+	 * dealing with a function having a proconfig entry.  Only the function's
+	 * body should observe the change, and peer workers do not share in the
+	 * execution of a function call started by this worker.
+	 *
+	 * Other changes might need to affect other workers, so forbid them.
+	 */
+	if (IsInParallelMode() && changeVal && action != GUC_ACTION_SAVE)
+		ereport(elevel,
+				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+				 errmsg("cannot set parameters during a parallel operation")));
+
 	record = find_option(name, true, elevel);
 	if (record == NULL)
 	{
@@ -6929,6 +6943,15 @@ ExecSetVariableStmt(VariableSetStmt *stmt, bool isTopLevel)
 {
 	GucAction	action = stmt->is_local ? GUC_ACTION_LOCAL : GUC_ACTION_SET;
 
+	/*
+	 * Workers synchronize these parameters at the start of the parallel
+	 * operation; then, we block SET during the operation.
+	 */
+	if (IsInParallelMode())
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+				 errmsg("cannot set parameters during a parallel operation")));
+
 	switch (stmt->kind)
 	{
 		case VAR_SET_VALUE:
diff --git a/src/backend/utils/time/combocid.c b/src/backend/utils/time/combocid.c
index a70c754..d2f23c0 100644
--- a/src/backend/utils/time/combocid.c
+++ b/src/backend/utils/time/combocid.c
@@ -44,6 +44,7 @@
 #include "miscadmin.h"
 #include "access/htup_details.h"
 #include "access/xact.h"
+#include "storage/shmem.h"
 #include "utils/combocid.h"
 #include "utils/hsearch.h"
 #include "utils/memutils.h"
@@ -287,3 +288,76 @@ GetRealCmax(CommandId combocid)
 	Assert(combocid < usedComboCids);
 	return comboCids[combocid].cmax;
 }
+
+/*
+ * Estimate the amount of space required to serialize the current ComboCID
+ * state.
+ */
+Size
+EstimateComboCIDStateSpace(void)
+{
+	Size		size;
+
+	/* Add space required for saving usedComboCids */
+	size = sizeof(int);
+
+	/* Add space required for saving the combocids key */
+	size = add_size(size, mul_size(sizeof(ComboCidKeyData), usedComboCids));
+
+	return size;
+}
+
+/*
+ * Serialize the ComboCID state into the memory, beginning at start_address.
+ * maxsize should be at least as large as the value returned by
+ * EstimateComboCIDStateSpace.
+ */
+void
+SerializeComboCIDState(Size maxsize, char *start_address)
+{
+	char	   *endptr;
+
+	/* First, we store the number of currently-existing ComboCIDs. */
+	* (int *) start_address = usedComboCids;
+
+	/* If maxsize is too small, throw an error. */
+	endptr = start_address + sizeof(int) +
+		(sizeof(ComboCidKeyData) * usedComboCids);
+	if (endptr < start_address || endptr > start_address + maxsize)
+		elog(ERROR, "not enough space to serialize ComboCID state");
+
+	/* Now, copy the actual cmin/cmax pairs. */
+	memcpy(start_address + sizeof(int), comboCids,
+		   (sizeof(ComboCidKeyData) * usedComboCids));
+}
+
+/*
+ * Read the ComboCID state at the specified address and initialize this
+ * backend with the same ComboCIDs.  This is only valid in a backend that
+ * currently has no ComboCIDs (and only makes sense if the transaction state
+ * is serialized and restored as well).
+ */
+void
+RestoreComboCIDState(char *comboCIDstate)
+{
+	int			num_elements;
+	ComboCidKeyData *keydata;
+	int			i;
+	CommandId	cid;
+
+	Assert(!comboCids || !comboHash);
+
+	/* First, we retrieve the number of ComboCIDs that were serialized. */
+	num_elements = * (int *) comboCIDstate;
+	keydata = (ComboCidKeyData *) (comboCIDstate + sizeof(int));
+
+	/* Use GetComboCommandId to restore each ComboCID. */
+	for (i = 0; i < num_elements; i++)
+	{
+		cid = GetComboCommandId(keydata[i].cmin, keydata[i].cmax);
+
+		/* Verify that we got the expected answer. */
+		if (cid != i)
+			elog(ERROR, "unexpected command ID while restoring combo CIDs");
+	}
+}
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index d601efe..b823827 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -155,6 +155,22 @@ static Snapshot CopySnapshot(Snapshot snapshot);
 static void FreeSnapshot(Snapshot snapshot);
 static void SnapshotResetXmin(void);
 
+/*
+ * Snapshot fields to be serialized.
+ *
+ * Only these fields need to be sent to the cooperating backend; the
+ * remaining ones can (and must) set by the receiver upon restore.
+ */
+typedef struct SerializedSnapshotData
+{
+	TransactionId xmin;
+	TransactionId xmax;
+	uint32		xcnt;
+	int32		subxcnt;
+	bool		suboverflowed;
+	bool		takenDuringRecovery;
+	CommandId	curcid;
+} SerializedSnapshotData;
 
 /*
  * GetTransactionSnapshot
@@ -186,6 +202,10 @@ GetTransactionSnapshot(void)
 		Assert(RegisteredSnapshots == 0);
 		Assert(FirstXactSnapshot == NULL);
 
+		if (IsInParallelMode())
+			elog(ERROR,
+				 "cannot take query snapshot during a parallel operation");
+
 		/*
 		 * In transaction-snapshot mode, the first snapshot must live until
 		 * end of xact regardless of what the caller does with it, so we must
@@ -237,6 +257,13 @@ Snapshot
 GetLatestSnapshot(void)
 {
 	/*
+	 * We might be able to relax this, but nothing that could otherwise work
+	 * needs it.
+	 */
+	if (IsInParallelMode())
+		elog(ERROR, "cannot update SecondarySnapshpt during a parallel operation");
+
+	/*
 	 * So far there are no cases requiring support for GetLatestSnapshot()
 	 * during logical decoding, but it wouldn't be hard to add if required.
 	 */
@@ -345,7 +372,8 @@ SnapshotSetCommandId(CommandId curcid)
  * in GetTransactionSnapshot.
  */
 static void
-SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid)
+SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid,
+					   PGPROC *sourceproc)
 {
 	/* Caller should have checked this already */
 	Assert(!FirstSnapshotSet);
@@ -392,7 +420,15 @@ SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid)
 	 * doesn't seem worth contorting the logic here to avoid two calls,
 	 * especially since it's not clear that predicate.c *must* do this.
 	 */
-	if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcexid))
+	if (sourceproc != NULL)
+	{
+		if (!ProcArrayInstallRestoredXmin(CurrentSnapshot->xmin, sourceproc))
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("could not import the requested snapshot"),
+			   errdetail("The source transaction is not running anymore.")));
+	}
+	else if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcexid))
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("could not import the requested snapshot"),
@@ -548,11 +584,24 @@ PushCopiedSnapshot(Snapshot snapshot)
 void
 UpdateActiveSnapshotCommandId(void)
 {
+	CommandId	save_curcid, curcid;
 	Assert(ActiveSnapshot != NULL);
 	Assert(ActiveSnapshot->as_snap->active_count == 1);
 	Assert(ActiveSnapshot->as_snap->regd_count == 0);
 
-	ActiveSnapshot->as_snap->curcid = GetCurrentCommandId(false);
+	/*
+	 * Don't allow modification of the active snapshot during parallel
+	 * operation.  We share the snapshot to worker backends at beginning of
+	 * parallel operation, so any change to snapshot can lead to
+	 * inconsistencies.  We have other defenses against
+	 * CommandCounterIncrement, but there are a few places that call this
+	 * directly, so we put an additional guard here.
+	 */
+	save_curcid = ActiveSnapshot->as_snap->curcid;
+	curcid = GetCurrentCommandId(false);
+	if (IsInParallelMode() && save_curcid != curcid)
+		elog(ERROR, "cannot modify commandid in active snapshot during a parallel operation");
+	ActiveSnapshot->as_snap->curcid = curcid;
 }
 
 /*
@@ -1247,7 +1296,7 @@ ImportSnapshot(const char *idstr)
 			  errmsg("cannot import a snapshot from a different database")));
 
 	/* OK, install the snapshot */
-	SetTransactionSnapshot(&snapshot, src_xid);
+	SetTransactionSnapshot(&snapshot, src_xid, NULL);
 }
 
 /*
@@ -1350,3 +1399,152 @@ HistoricSnapshotGetTupleCids(void)
 	Assert(HistoricSnapshotActive());
 	return tuplecid_data;
 }
+
+/*
+ * EstimateSnapshotSpace
+ *		Returns the size need to store the given snapshot.
+ *
+ * We are exporting only required fields from the Snapshot, stored in
+ * SerializedSnapshotData.
+ */
+Size
+EstimateSnapshotSpace(Snapshot snap)
+{
+	Size		size;
+
+	Assert(snap != InvalidSnapshot);
+	Assert(snap->satisfies == HeapTupleSatisfiesMVCC);
+
+	/* We allocate any XID arrays needed in the same palloc block. */
+	size = add_size(sizeof(SerializedSnapshotData),
+					mul_size(snap->xcnt, sizeof(TransactionId)));
+	if (snap->subxcnt > 0 &&
+		(!snap->suboverflowed || snap->takenDuringRecovery))
+		size = add_size(size,
+						mul_size(snap->subxcnt, sizeof(TransactionId)));
+
+	return size;
+}
+
+/*
+ * SerializeSnapshot
+ * 		Dumps the serialized snapshot (extracted from given snapshot) onto the
+ * 		memory location at start_address.
+ */
+void
+SerializeSnapshot(Snapshot snapshot, Size maxsize, char *start_address)
+{
+	SerializedSnapshotData *serialized_snapshot;
+
+	/* If the size is small, throw an error */
+	if (maxsize < EstimateSnapshotSpace(snapshot))
+		elog(ERROR, "not enough space to serialize given snapshot");
+
+	Assert(snapshot->xcnt >= 0);
+	Assert(snapshot->subxcnt >= 0);
+
+	serialized_snapshot = (SerializedSnapshotData *) start_address;
+
+	/* Copy all required fields */
+	serialized_snapshot->xmin = snapshot->xmin;
+	serialized_snapshot->xmax = snapshot->xmax;
+	serialized_snapshot->xcnt = snapshot->xcnt;
+	serialized_snapshot->subxcnt = snapshot->subxcnt;
+	serialized_snapshot->suboverflowed = snapshot->suboverflowed;
+	serialized_snapshot->takenDuringRecovery = snapshot->takenDuringRecovery;
+	serialized_snapshot->curcid = snapshot->curcid;
+
+	/*
+	 * Ignore the SubXID array if it has overflowed, unless the snapshot
+	 * was taken during recovey - in that case, top-level XIDs are in subxip
+	 * as well, and we mustn't lose them.
+	 */
+	if (serialized_snapshot->suboverflowed && !snapshot->takenDuringRecovery)
+		serialized_snapshot->subxcnt = 0;
+
+	/* Copy XID array */
+	if (snapshot->xcnt > 0)
+		memcpy((TransactionId *) (serialized_snapshot + 1),
+			   snapshot->xip, snapshot->xcnt * sizeof(TransactionId));
+
+	/*
+	 * Copy SubXID array. Don't bother to copy it if it had overflowed,
+	 * though, because it's not used anywhere in that case. Except if it's a
+	 * snapshot taken during recovery; all the top-level XIDs are in subxip as
+	 * well in that case, so we mustn't lose them.
+	 */
+	if (snapshot->subxcnt > 0)
+	{
+		Size subxipoff = sizeof(SerializedSnapshotData) +
+			snapshot->xcnt * sizeof(TransactionId);
+
+		memcpy((TransactionId *) ((char *) serialized_snapshot + subxipoff),
+			   snapshot->subxip, snapshot->subxcnt * sizeof(TransactionId));
+	}
+}
+
+/*
+ * RestoreSnapshot
+ *		Restore a serialized snapshot from the specified address.
+ *
+ * The copy is palloc'd in TopTransactionContext and has initial refcounts set
+ * to 0.  The returned snapshot has the copied flag set.
+ *
+ * If set_transaction_snapshot is true, the snapshot is additionally installed
+ * as the transaction snapshot.
+ */
+Snapshot
+RestoreSnapshot(char *start_address)
+{
+	SerializedSnapshotData *serialized_snapshot;
+	Size		size;
+	Size		subxipoff;
+	Snapshot	snapshot;
+
+	serialized_snapshot = (SerializedSnapshotData *) start_address;
+
+	/* We allocate any XID arrays needed in the same palloc block. */
+	size = subxipoff = sizeof(SnapshotData) +
+		serialized_snapshot->xcnt * sizeof(TransactionId);
+	size += serialized_snapshot->subxcnt * sizeof(TransactionId);
+
+	/* Copy all required fields */
+	snapshot = (Snapshot) MemoryContextAlloc(TopTransactionContext, size);
+	snapshot->xmin = serialized_snapshot->xmin;
+	snapshot->xmax = serialized_snapshot->xmax;
+	snapshot->xcnt = serialized_snapshot->xcnt;
+	snapshot->subxcnt = serialized_snapshot->subxcnt;
+	snapshot->suboverflowed = serialized_snapshot->suboverflowed;
+	snapshot->takenDuringRecovery = serialized_snapshot->takenDuringRecovery;
+	snapshot->curcid = serialized_snapshot->curcid;
+
+	/* Copy XIDs, if present. */
+	if (serialized_snapshot->xcnt > 0)
+		memcpy(snapshot->xip, (TransactionId *) (serialized_snapshot + 1),
+			   serialized_snapshot->xcnt * sizeof(TransactionId));
+
+	/* Copy SubXIDs, if present. */
+	if (serialized_snapshot->subxcnt > 0)
+		memcpy(snapshot->subxip,
+			   (TransactionId *) ((char *) serialized_snapshot + subxipoff),
+			   serialized_snapshot->subxcnt * sizeof(TransactionId));
+
+	/* Set the copied flag so that the caller will set refcounts correctly. */
+	snapshot->regd_count = 0;
+	snapshot->active_count = 0;
+	snapshot->copied = true;
+
+	return snapshot;
+}
+
+/*
+ * Install a restored snapshot as the transaction snapshot.
+ *
+ * The second argument is of type void * so that snapmgr.h need not include
+ * the declaration for PGPROC.
+ */
+void
+RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
+{
+	SetTransactionSnapshot(snapshot, InvalidTransactionId, master_pgproc);
+}
diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h
new file mode 100644
index 0000000..9fe2df8
--- /dev/null
+++ b/src/include/access/parallel.h
@@ -0,0 +1,58 @@
+/*-------------------------------------------------------------------------
+ *
+ * parallel.h
+ *	  Infrastructure for launching parallel workers
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/parallel.h
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef PARALLEL_H
+#define PARALLEL_H
+
+#include "lib/ilist.h"
+#include "postmaster/bgworker.h"
+#include "storage/shm_mq.h"
+#include "storage/shm_toc.h"
+
+typedef void (*parallel_worker_main_type)(shm_toc *toc);
+
+typedef struct ParallelWorkerInfo
+{
+	BackgroundWorkerHandle *bgwhandle;
+	shm_mq_handle *error_mqh;
+} ParallelWorkerInfo;
+
+typedef struct ParallelContext
+{
+	dlist_node node;
+	SubTransactionId subid;
+	int nworkers;
+	parallel_worker_main_type entrypoint;
+	char *library_name;
+	char *function_name;
+	shm_toc_estimator estimator;
+	dsm_segment *seg;
+	shm_toc *toc;
+	ParallelWorkerInfo *worker;
+} ParallelContext;
+
+extern bool ParallelMessagePending;
+
+extern ParallelContext *CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers);
+extern ParallelContext *CreateParallelContextForExtension(char *library_name,
+								  char *function_name, int nworkers);
+extern void InitializeParallelDSM(ParallelContext *);
+extern void LaunchParallelWorkers(ParallelContext *);
+extern void DestroyParallelContext(ParallelContext *);
+extern bool ParallelContextActive(void);
+
+extern void HandleParallelMessageInterrupt(bool signal_handler);
+extern void AtEOXact_Parallel(bool isCommit);
+extern void AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId);
+
+#endif   /* PARALLEL_H */
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index b018aa4..6e8290f 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -260,4 +260,8 @@ extern void xact_redo(XLogReaderState *record);
 extern void xact_desc(StringInfo buf, XLogReaderState *record);
 extern const char *xact_identify(uint8 info);
 
+extern void EnterParallelMode(void);
+extern void ExitParallelMode(void);
+extern bool IsInParallelMode(void);
+
 #endif   /* XACT_H */
diff --git a/src/include/libpq/pqmq.h b/src/include/libpq/pqmq.h
index f8e68c9..78583b3 100644
--- a/src/include/libpq/pqmq.h
+++ b/src/include/libpq/pqmq.h
@@ -17,6 +17,7 @@
 #include "storage/shm_mq.h"
 
 extern void	pq_redirect_to_shm_mq(shm_mq *, shm_mq_handle *);
+extern void pq_set_parallel_master(pid_t pid, BackendId backend_id);
 
 extern void pq_parse_errornotice(StringInfo str, ErrorData *edata);
 
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index 1558a75..fb7754f 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -258,6 +258,7 @@ extern void check_stack_depth(void);
 
 /* in tcop/utility.c */
 extern void PreventCommandIfReadOnly(const char *cmdname);
+extern void PreventCommandIfParallelMode(const char *cmdname);
 extern void PreventCommandDuringRecovery(const char *cmdname);
 
 /* in utils/misc/guc.c */
@@ -290,7 +291,7 @@ extern bool InLocalUserIdChange(void);
 extern bool InSecurityRestrictedOperation(void);
 extern void GetUserIdAndContext(Oid *userid, bool *sec_def_context);
 extern void SetUserIdAndContext(Oid userid, bool sec_def_context);
-extern void InitializeSessionUserId(const char *rolename);
+extern void InitializeSessionUserId(const char *rolename, Oid useroid);
 extern void InitializeSessionUserIdStandalone(void);
 extern void SetSessionAuthorization(Oid userid, bool is_superuser);
 extern Oid	GetCurrentRoleId(void);
@@ -391,7 +392,7 @@ extern AuxProcType MyAuxProcType;
 extern void pg_split_opts(char **argv, int *argcp, char *optstr);
 extern void InitializeMaxBackends(void);
 extern void InitPostgres(const char *in_dbname, Oid dboid, const char *username,
-			 char *out_dbname);
+			 Oid useroid, char *out_dbname);
 extern void BaseInit(void);
 
 /* in utils/init/miscinit.c */
diff --git a/src/include/postmaster/bgworker.h b/src/include/postmaster/bgworker.h
index a3b3d5f..273cecb 100644
--- a/src/include/postmaster/bgworker.h
+++ b/src/include/postmaster/bgworker.h
@@ -130,6 +130,9 @@ extern PGDLLIMPORT BackgroundWorker *MyBgworkerEntry;
  */
 extern void BackgroundWorkerInitializeConnection(char *dbname, char *username);
 
+/* Just like the above, but specifying database and user by OID. */
+extern void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid);
+
 /* Block/unblock signals in a background worker process */
 extern void BackgroundWorkerBlockSignals(void);
 extern void BackgroundWorkerUnblockSignals(void);
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index 0c4611b..9f4e4b9 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -46,6 +46,7 @@ extern Snapshot GetSnapshotData(Snapshot snapshot);
 
 extern bool ProcArrayInstallImportedXmin(TransactionId xmin,
 							 TransactionId sourcexid);
+extern bool ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc);
 
 extern RunningTransactions GetRunningTransactionData(void);
 
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index c625562..3046e52 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -31,6 +31,7 @@ typedef enum
 {
 	PROCSIG_CATCHUP_INTERRUPT,	/* sinval catchup interrupt */
 	PROCSIG_NOTIFY_INTERRUPT,	/* listen/notify interrupt */
+	PROCSIG_PARALLEL_MESSAGE,	/* message from cooperating parallel backend */
 
 	/* Recovery conflict reasons */
 	PROCSIG_RECOVERY_CONFLICT_DATABASE,
diff --git a/src/include/utils/combocid.h b/src/include/utils/combocid.h
index 9e482ff..232f729 100644
--- a/src/include/utils/combocid.h
+++ b/src/include/utils/combocid.h
@@ -21,5 +21,8 @@
  */
 
 extern void AtEOXact_ComboCid(void);
+extern void RestoreComboCIDState(char *comboCIDstate);
+extern void SerializeComboCIDState(Size maxsize, char *start_address);
+extern Size EstimateComboCIDStateSpace(void);
 
 #endif   /* COMBOCID_H */
diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h
index abe7016..7efd427 100644
--- a/src/include/utils/snapmgr.h
+++ b/src/include/utils/snapmgr.h
@@ -64,4 +64,10 @@ extern void SetupHistoricSnapshot(Snapshot snapshot_now, struct HTAB *tuplecids)
 extern void TeardownHistoricSnapshot(bool is_error);
 extern bool HistoricSnapshotActive(void);
 
+extern Size EstimateSnapshotSpace(Snapshot snapshot);
+extern void SerializeSnapshot(Snapshot snapshot, Size maxsize,
+							  char *start_address);
+extern Snapshot RestoreSnapshot(char *start_address);
+extern void RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc);
+
 #endif   /* SNAPMGR_H */
