diff --git a/GNUmakefile.in b/GNUmakefile.in index f4e31a7..284fba1 100644 --- a/GNUmakefile.in +++ b/GNUmakefile.in @@ -67,6 +67,20 @@ check check-tests installcheck installcheck-parallel installcheck-tests: CHECKPR check check-tests installcheck installcheck-parallel installcheck-tests: submake-generated-headers $(MAKE) -C src/test/regress $@ +TEST_PGDATA_DATA=/var/tmp/test-pg +trunc_check: + make -j20 install PROFILE=-O0 + $(CC) -g -Wall -I`pg_config --includedir` -L`pg_config --libdir` -Wl,-R`pg_config --libdir` trunc-clog-concurrency.c -lpq -o trunc-clog-concurrency-tester + pg_ctl -m fast -w stop ||: + rm -rf $(TEST_PGDATA_DATA) + initdb -N $(TEST_PGDATA_DATA) + cp -p ~/conf-test-pg $(TEST_PGDATA_DATA)/postgresql.conf + echo "default_transaction_isolation = 'read committed'" >>$(TEST_PGDATA_DATA)/postgresql.conf + pg_ctl -w start + createdb + psql -Xc 'alter database template0 allow_connections on' + env time ./trunc-clog-concurrency-tester + $(call recurse,check-world,src/test src/pl src/interfaces/ecpg contrib src/bin,check) $(call recurse,checkprep, src/test src/pl src/interfaces/ecpg contrib src/bin) diff --git a/src/backend/access/transam/clog.c b/src/backend/access/transam/clog.c index aa089d8..7202f34 100644 --- a/src/backend/access/transam/clog.c +++ b/src/backend/access/transam/clog.c @@ -42,6 +42,7 @@ #include "pgstat.h" #include "pg_trace.h" #include "storage/proc.h" +#include "utils/fmgrprotos.h" /* * Defines for CLOG page sizes. A page is the same BLCKSZ as is used @@ -917,6 +918,22 @@ TruncateCLOG(TransactionId oldestXact, Oid oldestxid_datoid) if (!SlruScanDirectory(ClogCtl, SlruScanDirCbReportPresence, &cutoffPage)) return; /* nothing to remove */ +#if 0 + /* FIXME Move sleep duration into a GUC? */ + if (LWLockConditionalAcquire(TruncSleepLock, LW_EXCLUSIVE)) + { + elog(LOG, "TruncSleepLock taken: sleeping (%d for %u)", + cutoffPage, oldestXact); + DirectFunctionCall1(pg_sleep, Float8GetDatum(10.0)); + /* TODO increase time, attach debugger and check caller vars */ + LWLockRelease(TruncSleepLock); + elog(LOG, "TruncSleepLock done: proceeding"); + } + else + elog(LOG, "TruncSleepLock unavailable: proceeding (%d for %u)", + cutoffPage, oldestXact); +#endif + /* * Advance oldestClogXid before truncating clog, so concurrent xact status * lookups can ensure they don't attempt to access truncated-away clog. diff --git a/src/backend/access/transam/slru.c b/src/backend/access/transam/slru.c index 3623352..ab28eda 100644 --- a/src/backend/access/transam/slru.c +++ b/src/backend/access/transam/slru.c @@ -57,6 +57,7 @@ #include "pgstat.h" #include "storage/fd.h" #include "storage/shmem.h" +#include "utils/fmgrprotos.h" #include "miscadmin.h" @@ -1171,11 +1172,6 @@ SimpleLruTruncate(SlruCtl ctl, int cutoffPage) int slotno; /* - * The cutoff point is the start of the segment containing cutoffPage. - */ - cutoffPage -= cutoffPage % SLRU_PAGES_PER_SEGMENT; - - /* * Scan shared memory and remove any pages preceding the cutoff page, to * ensure we won't rewrite them later. (Since this is normally called in * or just after a checkpoint, any dirty pages should have been flushed @@ -1191,6 +1187,21 @@ restart:; * have already wrapped around, and proceeding with the truncation would * risk removing the current segment. */ + if (shared->ControlLock == CLogControlLock) + { + int test = shared->latest_page_number; + elog(WARNING, "important safety check: %d latest < %d cutoff?", + shared->latest_page_number, cutoffPage); + while (test < 130000) + { + if (ctl->PagePrecedes(test, cutoffPage)) + { + elog(WARNING, "safety check would trip at %d", test); + break; + } + test++; + } + } if (ctl->PagePrecedes(shared->latest_page_number, cutoffPage)) { LWLockRelease(shared->ControlLock); @@ -1233,6 +1244,29 @@ restart:; LWLockRelease(shared->ControlLock); +#if 1 + if (shared->ControlLock == CLogControlLock) + { + /* FIXME Move sleep duration into a GUC? */ + if (LWLockConditionalAcquire(TruncSleepLock, LW_EXCLUSIVE)) + { + elog(LOG, "TruncSleepLock taken: sleeping (%d)", + cutoffPage); + DirectFunctionCall1(pg_sleep, Float8GetDatum(10.0)); + /* TODO increase time, attach debugger and check caller vars */ + LWLockRelease(TruncSleepLock); + elog(LOG, "TruncSleepLock done: proceeding"); + } + else + elog(LOG, "TruncSleepLock unavailable: proceeding (%d)", + cutoffPage); + } +#endif + + if (ctl->PagePrecedes(shared->latest_page_number, cutoffPage)) + ereport(LOG, + (errmsg("too late, but apparent wraparound"))); + /* Now we can remove the old segment(s) */ (void) SlruScanDirectory(ctl, SlruScanDirCbDeleteCutoff, &cutoffPage); } @@ -1320,11 +1354,10 @@ restart: bool SlruScanDirCbReportPresence(SlruCtl ctl, char *filename, int segpage, void *data) { + int seg_last_page = segpage + SLRU_PAGES_PER_SEGMENT - 1; int cutoffPage = *(int *) data; - cutoffPage -= cutoffPage % SLRU_PAGES_PER_SEGMENT; - - if (ctl->PagePrecedes(segpage, cutoffPage)) + if (ctl->PagePrecedes(seg_last_page, cutoffPage)) return true; /* found one; don't iterate any more */ return false; /* keep going */ @@ -1337,9 +1370,10 @@ SlruScanDirCbReportPresence(SlruCtl ctl, char *filename, int segpage, void *data static bool SlruScanDirCbDeleteCutoff(SlruCtl ctl, char *filename, int segpage, void *data) { + int seg_last_page = segpage + SLRU_PAGES_PER_SEGMENT - 1; int cutoffPage = *(int *) data; - if (ctl->PagePrecedes(segpage, cutoffPage)) + if (ctl->PagePrecedes(seg_last_page, cutoffPage)) SlruInternalDeleteSegment(ctl, filename); return false; /* keep going */ diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c index fe94fda..37db959 100644 --- a/src/backend/access/transam/varsup.c +++ b/src/backend/access/transam/varsup.c @@ -33,6 +33,8 @@ /* pointer to "variable cache" in shared memory (set up by shmem.c) */ VariableCache ShmemVariableCache = NULL; +int JJ_xid=0; + /* * Allocate the next XID for a new transaction or subtransaction. @@ -168,6 +170,11 @@ GetNewTransactionId(bool isSubXact) * * Extend pg_subtrans and pg_commit_ts too. */ + { + int incr; + for (incr=0; incr <=JJ_xid; incr++) + { + xid = ShmemVariableCache->nextXid; ExtendCLOG(xid); ExtendCommitTs(xid); ExtendSUBTRANS(xid); @@ -180,6 +187,13 @@ GetNewTransactionId(bool isSubXact) */ TransactionIdAdvance(ShmemVariableCache->nextXid); + /* If JJ_xid opposes xidStopLimit, the latter wins */ + if (TransactionIdFollowsOrEquals(ShmemVariableCache->nextXid, + ShmemVariableCache->xidStopLimit)) + break; + } + } + /* * We must store the new XID into the shared ProcArray before releasing * XidGenLock. This ensures that every active XID older than @@ -302,9 +316,7 @@ SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid) * We'll refuse to continue assigning XIDs in interactive mode once we get * within 1M transactions of data loss. This leaves lots of room for the * DBA to fool around fixing things in a standalone backend, while not - * being significant compared to total XID space. (Note that since - * vacuuming requires one transaction per table cleaned, we had better be - * sure there's lots of XIDs left...) + * being significant compared to total XID space. */ xidStopLimit = xidWrapLimit - 1000000; if (xidStopLimit < FirstNormalTransactionId) diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index a707d4d..fa7ab6a 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -5143,7 +5143,7 @@ sigusr1_handler(SIGNAL_ARGS) * that by launching another iteration as soon as the current one * completes. */ - start_autovac_launcher = true; + /* start_autovac_launcher = true; */ } if (CheckPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER) && diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt index db47843..335521c 100644 --- a/src/backend/storage/lmgr/lwlocknames.txt +++ b/src/backend/storage/lmgr/lwlocknames.txt @@ -49,3 +49,4 @@ MultiXactTruncationLock 41 OldSnapshotTimeMapLock 42 LogicalRepWorkerLock 43 CLogTruncationLock 44 +TruncSleepLock 45 diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index f81e042..4e59a68 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -116,6 +116,7 @@ /* XXX these should appear in other modules' header files */ extern bool Log_disconnections; extern int CommitDelay; +extern int JJ_xid; extern int CommitSiblings; extern char *default_tablespace; extern char *temp_tablespaces; @@ -2617,6 +2618,15 @@ static struct config_int ConfigureNamesInt[] = }, { + {"JJ_xid", PGC_USERSET, WAL_SETTINGS, + gettext_noop("Skip this many xid every time we acquire one"), + NULL + }, + &JJ_xid, + 0, 0, 1000000, NULL, NULL + }, + + { {"commit_siblings", PGC_USERSET, WAL_SETTINGS, gettext_noop("Sets the minimum concurrent open transactions before performing " "commit_delay."), diff --git a/trunc-clog-concurrency.c b/trunc-clog-concurrency.c new file mode 100644 index 0000000..2c35dd9 --- /dev/null +++ b/trunc-clog-concurrency.c @@ -0,0 +1,178 @@ +#include + +#include +#include +#include +#include +#include +#include + +static void +report_query_failure(const char *query, PGresult *res) +{ + fprintf(stderr, "query \"%s\" failed unexpectedly: %s", + query, PQresultErrorMessage(res)); +} + +static void +safe_query(PGconn *conn, const char *query) +{ + PGresult *res; + + res = PQexec(conn, query); + if (PQresultStatus(res) != PGRES_TUPLES_OK && + PQresultStatus(res) != PGRES_COMMAND_OK) + { + report_query_failure(query, res); + exit(1); + } + PQclear(res); +} + +static int +is_stop_limit(PGresult *res) +{ + return PQresultStatus(res) == PGRES_FATAL_ERROR + && strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "54000") == 0; +} + +int +main(int argc, char **argv) +{ + bool reached_stop_limit = false; + PGconn *mutate_conn, *hold1_conn, *hold2_conn, *burn_conn; + PGresult *res; + int n_burns = 0, n_inserts = 0; + + if (argc != 1) + { + fputs("Usage: trunc-clog-concurrency\n", stderr); + return 1; + } + + mutate_conn = PQconnectdb(""); + if (PQstatus(mutate_conn) != CONNECTION_OK) + { + fprintf(stderr, "PGconnectdb failed: %s", PQerrorMessage(mutate_conn)); + return 1; + } + + hold1_conn = PQconnectdb(""); + if (PQstatus(hold1_conn) != CONNECTION_OK) + { + fprintf(stderr, "PGconnectdb failed: %s", PQerrorMessage(hold1_conn)); + return 1; + } + + hold2_conn = PQconnectdb(""); + if (PQstatus(hold2_conn) != CONNECTION_OK) + { + fprintf(stderr, "PGconnectdb failed: %s", PQerrorMessage(hold2_conn)); + return 1; + } + + burn_conn = PQconnectdb("options=--JJ_xid=1000000"); + if (PQstatus(burn_conn) != CONNECTION_OK) + { + fprintf(stderr, "PGconnectdb failed: %s", PQerrorMessage(burn_conn)); + return 1; + } + + /* Start a transaction having an xid. */ + safe_query(mutate_conn, "BEGIN;"); + safe_query(mutate_conn, "DROP TABLE IF EXISTS trunc_clog_concurrency;"); + safe_query(mutate_conn, "CREATE TABLE trunc_clog_concurrency ();"); + + /* Burn the entire XID space. */ + while (!reached_stop_limit) + { + const char query[] = "SELECT txid_current();"; + res = PQexec(burn_conn, query); + if (PQresultStatus(res) == PGRES_TUPLES_OK) + { + ++n_burns; + if (n_burns == 2) + { + safe_query(hold1_conn, "BEGIN ISOLATION LEVEL READ COMMITTED;"); + safe_query(hold1_conn, "CREATE TABLE trunc_clog_concurrency_hold1 ();"); + } + if (n_burns == 10) + { + safe_query(hold2_conn, "BEGIN ISOLATION LEVEL READ COMMITTED;"); + safe_query(hold2_conn, "CREATE TABLE trunc_clog_concurrency_hold2 ();"); + system("psql -Xc 'select state, backend_xid, backend_xmin, query from pg_stat_activity'"); + system("psql -Xc 'select datname,datallowconn,datfrozenxid from pg_database'"); + } + /* keep burning */; + } + else if (is_stop_limit(res)) + { + reached_stop_limit = true; + fprintf(stderr, "reached stop limit: %s", + PQresultErrorMessage(res)); + } + else + { + reached_stop_limit = true; /* FIXME not really */ + report_query_failure(query, res); + } + PQclear(res); + } + + /* Finish the first transaction. xmin raises from start to start+2M. */ + safe_query(mutate_conn, "COMMIT;"); + + /* Raise datfrozenxid of all but template1 to start+2M. No truncation. */ + system("for d in postgres template0 test; do vacuumdb -F $d; done; " + "echo -n 'DONE1 '; date"); + /* Raise xmin to start+10M */ + safe_query(hold1_conn, "COMMIT;"); + /* Sleep on lock before truncating to start+2M. */ + system("(vacuumdb -F template1; echo -n 'DONE2 '; date) &"); + usleep(4000*1000); /* 4s */ + + /* Truncate to start+10M. */ + system("(vacuumdb -aF; echo -n 'DONE3 '; date)"); + system("psql -Xc 'select state, backend_xid, backend_xmin, query from pg_stat_activity'"); + system("psql -Xc 'select datname,datallowconn,datfrozenxid from pg_database'"); + + /* + * We want to burn at least 1M xids (the amount protected by xidStopLimit) + * but not more than 200M (autovacuum_freeze_max_age default) to avoid a + * second set of VACUUMs. + */ + while (n_inserts < 150) + { + const char query[] = + "INSERT INTO trunc_clog_concurrency DEFAULT VALUES"; + res = PQexec(burn_conn, query); + if (PQresultStatus(res) == PGRES_COMMAND_OK) + { + n_inserts++; + fprintf(stderr, "insert %d ", n_inserts); + system("date >&2"); + } + else if (is_stop_limit(res)) + { + fprintf(stderr, "reached stop limit: %s", + PQresultErrorMessage(res)); + break; + } + else + { + report_query_failure(query, res); + return 1; + } + PQclear(res); + } + + system("psql -Xc 'select state, backend_xid, backend_xmin, query from pg_stat_activity'"); + system("psql -Xc 'select datname,datallowconn,datfrozenxid from pg_database'"); + + PQfinish(mutate_conn); + PQfinish(hold1_conn); + PQfinish(hold2_conn); + PQfinish(burn_conn); + + return 0; +}