diff --git a/doc/src/sgml/parallel.sgml b/doc/src/sgml/parallel.sgml
index 2ea5c34..df8856e 100644
--- a/doc/src/sgml/parallel.sgml
+++ b/doc/src/sgml/parallel.sgml
@@ -177,13 +177,6 @@ EXPLAIN SELECT * FROM pgbench_accounts WHERE filler LIKE '%x%';
using a very large number of processes.
-
-
-
- The transaction isolation level is serializable. This is
- a limitation of the current implementation.
-
-
@@ -235,16 +228,6 @@ EXPLAIN SELECT * FROM pgbench_accounts WHERE filler LIKE '%x%';
making it ineligible for parallel query.
-
-
-
- The transaction isolation level is serializable. This situation
- does not normally arise, because parallel query plans are not
- generated when the transaction isolation level is serializable.
- However, it can happen if the transaction isolation level is changed to
- serializable after the plan is generated and before it is executed.
-
-
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 3e0ee87..ab2c3e6 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -26,6 +26,7 @@
#include "optimizer/planmain.h"
#include "pgstat.h"
#include "storage/ipc.h"
+#include "storage/predicate_internals.h"
#include "storage/sinval.h"
#include "storage/spin.h"
#include "tcop/tcopprot.h"
@@ -76,6 +77,7 @@ typedef struct FixedParallelState
PGPROC *parallel_master_pgproc;
pid_t parallel_master_pid;
BackendId parallel_master_backend_id;
+ SERIALIZABLEXACT *parallel_master_serializablexact;
/* Entrypoint for parallel workers. */
parallel_worker_main_type entrypoint;
@@ -138,14 +140,6 @@ CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers)
if (dynamic_shared_memory_type == DSM_IMPL_NONE)
nworkers = 0;
- /*
- * If we are running under serializable isolation, we can't use parallel
- * workers, at least not until somebody enhances that mechanism to be
- * parallel-aware.
- */
- if (IsolationIsSerializable())
- nworkers = 0;
-
/* We might be running in a short-lived memory context. */
oldcontext = MemoryContextSwitchTo(TopTransactionContext);
@@ -298,6 +292,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
fps->parallel_master_pgproc = MyProc;
fps->parallel_master_pid = MyProcPid;
fps->parallel_master_backend_id = MyBackendId;
+ fps->parallel_master_serializablexact = GetSerializableXact();
fps->entrypoint = pcxt->entrypoint;
SpinLockInit(&fps->mutex);
fps->last_xlog_end = 0;
@@ -1093,6 +1088,9 @@ ParallelWorkerMain(Datum main_arg)
/* Set ParallelMasterBackendId so we know how to address temp relations. */
ParallelMasterBackendId = fps->parallel_master_backend_id;
+ /* Use the leader's SERIALIZABLEXACT. */
+ SetSerializableXact(fps->parallel_master_serializablexact);
+
/*
* We've initialized all of our state now; nothing should change
* hereafter.
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index 02286d9..993e318 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -232,14 +232,6 @@ standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
* parallel worker. We might eventually be able to relax this
* restriction, but for now it seems best not to have parallel workers
* trying to create their own parallel workers.
- *
- * We can't use parallelism in serializable mode because the predicate
- * locking code is not parallel-aware. It's not catastrophic if someone
- * tries to run a parallel plan in serializable mode; it just won't get
- * any workers and will run serially. But it seems like a good heuristic
- * to assume that the same serialization level will be in effect at plan
- * time and execution time, so don't generate a parallel plan if we're in
- * serializable mode.
*/
if ((cursorOptions & CURSOR_OPT_PARALLEL_OK) != 0 &&
IsUnderPostmaster &&
@@ -247,8 +239,7 @@ standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
parse->commandType == CMD_SELECT &&
!parse->hasModifyingCTE &&
max_parallel_workers_per_gather > 0 &&
- !IsParallelWorker() &&
- !IsolationIsSerializable())
+ !IsParallelWorker())
{
/* all the cheap tests pass, so scan the query tree */
glob->maxParallelHazard = max_parallel_hazard(parse);
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 3e13394..8ff9b83 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -511,6 +511,7 @@ RegisterLWLockTranches(void)
LWLockRegisterTranche(LWTRANCHE_PARALLEL_QUERY_DSA,
"parallel_query_dsa");
LWLockRegisterTranche(LWTRANCHE_TBM, "tbm");
+ LWLockRegisterTranche(LWTRANCHE_SXACT, "sxact");
/* Register named tranches. */
for (i = 0; i < NamedLWLockTrancheRequests; i++)
diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c
index 7aa719d..6d4180c 100644
--- a/src/backend/storage/lmgr/predicate.c
+++ b/src/backend/storage/lmgr/predicate.c
@@ -97,7 +97,9 @@
* - All transactions share this single lock (with no partitioning).
* - There is never a need for a process other than the one running
* an active transaction to walk the list of locks held by that
- * transaction.
+ * transaction, except parallel query workers sharing the leader's
+ * transaction. In the parallel case, an extra per-sxact lock is
+ * taken; see below.
* - It is relatively infrequent that another process needs to
* modify the list for a transaction, but it does happen for such
* things as index page splits for pages with predicate locks and
@@ -116,6 +118,12 @@
* than its own active transaction must acquire an exclusive
* lock.
*
+ * SERIALIZABLE_XACT's member 'lock'
+ * - Protects the linked list of locks held by a transaction. Only
+ * needed for parallel mode, where multiple backends share the
+ * same SERIALIZABLEXACT object. Not needed if
+ * SerializablePredicateLockListLock is held exclusively.
+ *
* FirstPredicateLockMgrLock based partition locks
* - The same lock protects a target, all locks on that target, and
* the linked list of locks on the target..
@@ -184,6 +192,7 @@
#include "postgres.h"
#include "access/htup_details.h"
+#include "access/parallel.h"
#include "access/slru.h"
#include "access/subtrans.h"
#include "access/transam.h"
@@ -1749,6 +1758,7 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot,
SHMQueueInit(&(sxact->predicateLocks));
SHMQueueElemInit(&(sxact->finishedLink));
sxact->flags = 0;
+ LWLockInitialize(&sxact->lock, LWTRANCHE_SXACT);
if (XactReadOnly)
{
sxact->flags |= SXACT_FLAG_READ_ONLY;
@@ -2031,6 +2041,14 @@ RemoveTargetIfNoLongerUsed(PREDICATELOCKTARGET *target, uint32 targettaghash)
Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
+ if (IsInParallelMode())
+ {
+ Assert(LWLockHeldByMeInMode(SerializablePredicateLockListLock,
+ LW_EXCLUSIVE) ||
+ LWLockHeldByMeInMode(&MySerializableXact->lock,
+ LW_EXCLUSIVE));
+ }
+
/* Can't remove it until no locks at this target. */
if (!SHMQueueEmpty(&target->predicateLocks))
return;
@@ -2048,7 +2066,9 @@ RemoveTargetIfNoLongerUsed(PREDICATELOCKTARGET *target, uint32 targettaghash)
* This implementation is assuming that the usage of each target tag field
* is uniform. No need to make this hard if we don't have to.
*
- * We aren't acquiring lightweight locks for the predicate lock or lock
+ * We acquire an LWLock in the case of parallel mode, because worker
+ * backends have access to the leader's SERIALIABLEXACT. Otherwise,
+ * we aren't acquiring lightweight locks for the predicate lock or lock
* target structures associated with this transaction unless we're going
* to modify them, because no other process is permitted to modify our
* locks.
@@ -2061,6 +2081,8 @@ DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag)
LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
sxact = MySerializableXact;
+ if (IsInParallelMode())
+ LWLockAcquire(&sxact->lock, LW_EXCLUSIVE);
predlock = (PREDICATELOCK *)
SHMQueueNext(&(sxact->predicateLocks),
&(sxact->predicateLocks),
@@ -2114,6 +2136,8 @@ DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag)
predlock = nextpredlock;
}
+ if (IsInParallelMode())
+ LWLockRelease(&sxact->lock);
LWLockRelease(SerializablePredicateLockListLock);
}
@@ -2305,6 +2329,8 @@ CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag,
partitionLock = PredicateLockHashPartitionLock(targettaghash);
LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
+ if (IsInParallelMode())
+ LWLockAcquire(&sxact->lock, LW_EXCLUSIVE);
LWLockAcquire(partitionLock, LW_EXCLUSIVE);
/* Make sure that the target is represented. */
@@ -2342,6 +2368,8 @@ CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag,
}
LWLockRelease(partitionLock);
+ if (IsInParallelMode())
+ LWLockRelease(&sxact->lock);
LWLockRelease(SerializablePredicateLockListLock);
}
@@ -2529,7 +2557,8 @@ DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash)
PREDICATELOCK *nextpredlock;
bool found;
- Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
+ Assert(LWLockHeldByMeInMode(SerializablePredicateLockListLock,
+ LW_EXCLUSIVE));
Assert(LWLockHeldByMe(PredicateLockHashPartitionLock(targettaghash)));
predlock = (PREDICATELOCK *)
@@ -2589,7 +2618,7 @@ DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash)
* covers it, or if we are absolutely certain that no one will need to
* refer to that lock in the future.
*
- * Caller must hold SerializablePredicateLockListLock.
+ * Caller must hold SerializablePredicateLockListLock exclusively.
*/
static bool
TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag,
@@ -2604,7 +2633,8 @@ TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag,
bool found;
bool outOfShmem = false;
- Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
+ Assert(LWLockHeldByMeInMode(SerializablePredicateLockListLock,
+ LW_EXCLUSIVE));
oldtargettaghash = PredicateLockTargetTagHashCode(&oldtargettag);
newtargettaghash = PredicateLockTargetTagHashCode(&newtargettag);
@@ -3201,6 +3231,10 @@ ReleasePredicateLocks(bool isCommit)
*/
bool topLevelIsDeclaredReadOnly;
+ /* Only leader processes should release predicate locks. */
+ if (IsParallelWorker())
+ goto cleanup;
+
if (MySerializableXact == InvalidSerializableXact)
{
Assert(LocalPredicateLockHash == NULL);
@@ -3487,6 +3521,7 @@ ReleasePredicateLocks(bool isCommit)
MySerializableXact = InvalidSerializableXact;
MyXactDidWrite = false;
+cleanup:
/* Delete per-transaction lock table */
if (LocalPredicateLockHash != NULL)
{
@@ -4176,6 +4211,8 @@ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag)
PREDICATELOCK *rmpredlock;
LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
+ if (IsInParallelMode())
+ LWLockAcquire(&MySerializableXact->lock, LW_EXCLUSIVE);
LWLockAcquire(partitionLock, LW_EXCLUSIVE);
LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
@@ -4210,6 +4247,8 @@ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag)
LWLockRelease(SerializableXactHashLock);
LWLockRelease(partitionLock);
+ if (IsInParallelMode())
+ LWLockRelease(&MySerializableXact->lock);
LWLockRelease(SerializablePredicateLockListLock);
if (rmpredlock != NULL)
@@ -4758,6 +4797,11 @@ AtPrepare_PredicateLocks(void)
*/
LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
+ /*
+ * No need to take sxact->lock in parallel mode because there cannot be
+ * any parallel workers running while we are preparing a transaction.
+ */
+
predlock = (PREDICATELOCK *)
SHMQueueNext(&(sxact->predicateLocks),
&(sxact->predicateLocks),
@@ -4966,3 +5010,22 @@ predicatelock_twophase_recover(TransactionId xid, uint16 info,
CreatePredicateLock(&lockRecord->target, targettaghash, sxact);
}
}
+
+/*
+ * Accessor to allow parallel leaders to export the current SERIALIZABLEXACT
+ * to parallel workers.
+ */
+SERIALIZABLEXACT *
+GetSerializableXact(void)
+{
+ return MySerializableXact;
+}
+
+/*
+ * Allow parallel workers to import the leader's SERIALIZABLEXACT.
+ */
+void
+SetSerializableXact(SERIALIZABLEXACT *sxact)
+{
+ MySerializableXact = sxact;
+}
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index 0cd45bb..cd72014 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -213,6 +213,7 @@ typedef enum BuiltinTrancheIds
LWTRANCHE_PREDICATE_LOCK_MANAGER,
LWTRANCHE_PARALLEL_QUERY_DSA,
LWTRANCHE_TBM,
+ LWTRANCHE_SXACT,
LWTRANCHE_FIRST_USER_DEFINED
} BuiltinTrancheIds;
diff --git a/src/include/storage/predicate_internals.h b/src/include/storage/predicate_internals.h
index 408d94c..35b63ab 100644
--- a/src/include/storage/predicate_internals.h
+++ b/src/include/storage/predicate_internals.h
@@ -15,6 +15,7 @@
#define PREDICATE_INTERNALS_H
#include "storage/lock.h"
+#include "storage/lwlock.h"
/*
* Commit number.
@@ -91,6 +92,9 @@ typedef struct SERIALIZABLEXACT
SHM_QUEUE finishedLink; /* list link in
* FinishedSerializableTransactions */
+ /* lock to protect predicateLocks list in parallel mode */
+ LWLock lock;
+
/*
* for r/o transactions: list of concurrent r/w transactions that we could
* potentially have conflicts with, and vice versa for r/w transactions
@@ -474,5 +478,7 @@ typedef struct TwoPhasePredicateRecord
* locking internals.
*/
extern PredicateLockData *GetPredicateLockStatusData(void);
+extern SERIALIZABLEXACT *GetSerializableXact(void);
+extern void SetSerializableXact(SERIALIZABLEXACT *sxact);
#endif /* PREDICATE_INTERNALS_H */
diff --git a/src/test/isolation/expected/serializable-parallel.out b/src/test/isolation/expected/serializable-parallel.out
new file mode 100644
index 0000000..f43aa6a
--- /dev/null
+++ b/src/test/isolation/expected/serializable-parallel.out
@@ -0,0 +1,44 @@
+Parsed test spec with 3 sessions
+
+starting permutation: s2rx s2ry s1ry s1wy s1c s2wx s2c s3c
+step s2rx: SELECT balance FROM bank_account WHERE id = 'X';
+balance
+
+0
+step s2ry: SELECT balance FROM bank_account WHERE id = 'Y';
+balance
+
+0
+step s1ry: SELECT balance FROM bank_account WHERE id = 'Y';
+balance
+
+0
+step s1wy: UPDATE bank_account SET balance = 20 WHERE id = 'Y';
+step s1c: COMMIT;
+step s2wx: UPDATE bank_account SET balance = -11 WHERE id = 'X';
+step s2c: COMMIT;
+step s3c: COMMIT;
+
+starting permutation: s2rx s2ry s1ry s1wy s1c s3r s3c s2wx
+step s2rx: SELECT balance FROM bank_account WHERE id = 'X';
+balance
+
+0
+step s2ry: SELECT balance FROM bank_account WHERE id = 'Y';
+balance
+
+0
+step s1ry: SELECT balance FROM bank_account WHERE id = 'Y';
+balance
+
+0
+step s1wy: UPDATE bank_account SET balance = 20 WHERE id = 'Y';
+step s1c: COMMIT;
+step s3r: SELECT id, balance FROM bank_account WHERE id IN ('X', 'Y') ORDER BY id;
+id balance
+
+X 0
+Y 20
+step s3c: COMMIT;
+step s2wx: UPDATE bank_account SET balance = -11 WHERE id = 'X';
+ERROR: could not serialize access due to read/write dependencies among transactions
diff --git a/src/test/isolation/isolation_schedule b/src/test/isolation/isolation_schedule
index 2606a27..1d69820 100644
--- a/src/test/isolation/isolation_schedule
+++ b/src/test/isolation/isolation_schedule
@@ -57,3 +57,4 @@ test: alter-table-3
test: create-trigger
test: async-notify
test: timeouts
+test: serializable-parallel
diff --git a/src/test/isolation/specs/serializable-parallel.spec b/src/test/isolation/specs/serializable-parallel.spec
new file mode 100644
index 0000000..0e7c2c7
--- /dev/null
+++ b/src/test/isolation/specs/serializable-parallel.spec
@@ -0,0 +1,48 @@
+# The example from the paper "A read-only transaction anomaly under snapshot
+# isolation"[1].
+#
+# Here we test that serializable snapshot isolation (SERIALIZABLE) doesn't
+# suffer from the anomaly, because s2 is aborted upon detection of a cycle.
+# In this case the read only query s3 happens to be running in a parallel
+# worker.
+#
+# [1] http://www.cs.umb.edu/~poneil/ROAnom.pdf
+
+setup
+{
+ CREATE TABLE bank_account (id TEXT PRIMARY KEY, balance DECIMAL NOT NULL);
+ INSERT INTO bank_account (id, balance) VALUES ('X', 0), ('Y', 0);
+}
+
+teardown
+{
+ DROP TABLE bank_account;
+}
+
+session "s1"
+setup { BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE; }
+step "s1ry" { SELECT balance FROM bank_account WHERE id = 'Y'; }
+step "s1wy" { UPDATE bank_account SET balance = 20 WHERE id = 'Y'; }
+step "s1c" { COMMIT; }
+
+session "s2"
+setup { BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE; }
+step "s2rx" { SELECT balance FROM bank_account WHERE id = 'X'; }
+step "s2ry" { SELECT balance FROM bank_account WHERE id = 'Y'; }
+step "s2wx" { UPDATE bank_account SET balance = -11 WHERE id = 'X'; }
+step "s2c" { COMMIT; }
+
+session "s3"
+setup {
+ BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;
+ SET max_parallel_workers_per_gather = 2;
+ SET force_parallel_mode = on;
+ }
+step "s3r" { SELECT id, balance FROM bank_account WHERE id IN ('X', 'Y') ORDER BY id; }
+step "s3c" { COMMIT; }
+
+# without s3, s1 and s2 commit
+permutation "s2rx" "s2ry" "s1ry" "s1wy" "s1c" "s2wx" "s2c" "s3c"
+
+# once s3 observes the data committed by s1, a cycle is created and s2 aborts
+permutation "s2rx" "s2ry" "s1ry" "s1wy" "s1c" "s3r" "s3c" "s2wx"