Index: pgsql/doc/src/sgml/func.sgml =================================================================== *** pgsql.orig/doc/src/sgml/func.sgml --- pgsql/doc/src/sgml/func.sgml *************** SELECT pg_type_is_visible('myschema.widg *** 9699,9704 **** --- 9699,9805 ---- databases within each cluster and their descriptions are stored globally as well. + + + shows functions + querying transaction IDs and snapshots. + + + + Transaction state functions + + + Name Return Type Description + + + + + current_txid() + int8 + transaction ID of current transaction + + + + current_snapshot() + snapshot + snapshot for current transaction (or statement) + + + + snapshot_xmin(snapshot) + int8 + minimal txid in snapshot - all transactions below are visible in snapshot + + + + snapshot_xmax(snapshot) + int8 + maximal txid in snapshot - all transactions above are not visible in snapshot + + + + snapshot_active_list(snapshot) + setof int8 + list of active txid-s in snapshot + + + + snapshot_contains(snapshot, int8) + bool + checks if txid is visible in snapshot + + + + pg_sync_txid(int8) + int8 + makes sure the following txids will be greater than given argument + + + + +
+ + + txid + current + + + + snapshot + current + + + + current_txid + + + current_snapshot + + + snapshot_xmin + + + snapshot_xmax + + + snapshot_contains + + + snapshot_uncommitted + + + pg_sync_txid + + + + A snapshot is basically a list of running transactions. + + + + TODO: describe usage. + + + Index: pgsql/src/include/access/xlogutils.h =================================================================== *** pgsql.orig/src/include/access/xlogutils.h --- pgsql/src/include/access/xlogutils.h *************** *** 13,18 **** --- 13,19 ---- #include "storage/buf.h" #include "utils/rel.h" + #include "catalog/pg_control.h" extern void XLogInitRelationCache(void); *************** extern void XLogTruncateRelation(RelFile *** 26,29 **** --- 27,33 ---- extern Buffer XLogReadBuffer(Relation reln, BlockNumber blkno, bool init); + extern void GetTxidEpoch(TxidEpoch *dst); + extern void SyncTxidEpoch(uint64 sync_txid); + #endif Index: pgsql/src/include/catalog/pg_control.h =================================================================== *** pgsql.orig/src/include/catalog/pg_control.h --- pgsql/src/include/catalog/pg_control.h *************** typedef enum DBState *** 62,67 **** --- 62,75 ---- #define LOCALE_NAME_BUFLEN 128 /* + * Long txid in expanded form. + */ + typedef struct { + uint32 epoch; /* epoch value */ + TransactionId epoch_xid; /* corresponding xid */ + } TxidEpoch; + + /* * Contents of pg_control. * * NOTE: try to keep this under 512 bytes so that it will fit on one physical *************** typedef struct ControlFileData *** 143,148 **** --- 151,159 ---- char lc_collate[LOCALE_NAME_BUFLEN]; char lc_ctype[LOCALE_NAME_BUFLEN]; + /* external txid tracking */ + TxidEpoch txid_epoch; + /* CRC of all above ... MUST BE LAST! */ pg_crc32 crc; } ControlFileData; Index: pgsql/src/include/utils/builtins.h =================================================================== *** pgsql.orig/src/include/utils/builtins.h --- pgsql/src/include/utils/builtins.h *************** extern Datum pg_prepared_statement(PG_FU *** 886,889 **** --- 886,904 ---- /* utils/mmgr/portalmem.c */ extern Datum pg_cursor(PG_FUNCTION_ARGS); + /* utils/adt/txid.c */ + extern Datum txid_current_txid(PG_FUNCTION_ARGS); + extern Datum txid_current_snapshot(PG_FUNCTION_ARGS); + extern Datum txid_snapshot_in(PG_FUNCTION_ARGS); + extern Datum txid_snapshot_out(PG_FUNCTION_ARGS); + extern Datum txid_snapshot_recv(PG_FUNCTION_ARGS); + extern Datum txid_snapshot_send(PG_FUNCTION_ARGS); + extern Datum txid_snapshot_contains(PG_FUNCTION_ARGS); + extern Datum txid_snapshot_xmin(PG_FUNCTION_ARGS); + extern Datum txid_snapshot_xmax(PG_FUNCTION_ARGS); + extern Datum txid_snapshot_active_list(PG_FUNCTION_ARGS); + extern Datum txid_snapshot_from_text(PG_FUNCTION_ARGS); + extern Datum txid_snapshot_to_text(PG_FUNCTION_ARGS); + extern Datum pg_sync_txid(PG_FUNCTION_ARGS); + #endif /* BUILTINS_H */ Index: pgsql/src/backend/access/transam/xlog.c =================================================================== *** pgsql.orig/src/backend/access/transam/xlog.c --- pgsql/src/backend/access/transam/xlog.c *************** static void xlog_outrec(StringInfo buf, *** 501,506 **** --- 501,507 ---- static bool read_backup_label(XLogRecPtr *checkPointLoc); static void remove_backup_label(void); static void rm_redo_error_callback(void *arg); + static void update_txid_epoch(TxidEpoch *state, TransactionId cur_xid); /* *************** CreateCheckPoint(bool shutdown, bool for *** 5378,5383 **** --- 5379,5387 ---- ControlFile->checkPoint = ProcLastRecPtr; ControlFile->checkPointCopy = checkPoint; ControlFile->time = time(NULL); + + update_txid_epoch(&ControlFile->txid_epoch, checkPoint.nextXid); + UpdateControlFile(); LWLockRelease(ControlFileLock); *************** rm_redo_error_callback(void *arg) *** 6143,6145 **** --- 6147,6230 ---- pfree(buf.data); } + + /* + * As MAX_INT64 is used for InvalidTransactionId, + * dont allow 0x7FFFFFFF as epoch value, so that no valid + * transaction can get its value. + */ + #define MAX_EPOCH 0x7FFFFFFE + + /* + * Per-checkpoint refresh of pg_control values. + * + * Increases epoch if wraparound happened. + */ + static void update_txid_epoch(TxidEpoch *state, TransactionId cur_xid) + { + if (cur_xid < state->epoch_xid) + { + state->epoch++; + if (state->epoch > MAX_EPOCH) + { + elog(WARNING, "txid epoch wraparound"); + state->epoch = 0; + } + } + state->epoch_xid = cur_xid; + } + + /* + * Return current txid_epoch. + */ + void GetTxidEpoch(TxidEpoch *dst) + { + LWLockAcquire(ControlFileLock, LW_SHARED); + *dst = ControlFile->txid_epoch; + LWLockRelease(ControlFileLock); + } + + /* + * Checks if new issued txid's are greater then txid_sync. + * Inreases epoch if needed. + */ + void SyncTxidEpoch(uint64 txid_sync) + { + TransactionId cur_xid = GetCurrentTransactionId(); + TransactionId sync_xid = (TransactionId)txid_sync; + uint32 sync_epoch = txid_sync >> 32; + int change = 0; + TxidEpoch *state; + + LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); + + state = &ControlFile->txid_epoch; + + if (state->epoch < sync_epoch) + { + state->epoch = sync_epoch; + change = 1; + } + if (state->epoch == sync_epoch) + { + if (cur_xid <= sync_xid) + { + state->epoch++; + change = 1; + } + } + + if (change) + { + if (state->epoch > MAX_EPOCH) + { + elog(WARNING, "txid epoch wraparound"); + state->epoch = 0; + } + state->epoch_xid = cur_xid; + UpdateControlFile(); + } + + LWLockRelease(ControlFileLock); + } + Index: pgsql/src/backend/utils/adt/txid.c =================================================================== *** /dev/null --- pgsql/src/backend/utils/adt/txid.c *************** *** 0 **** --- 1,507 ---- + /*------------------------------------------------------------------------- + * txid.c + * + * Safe handling of transaction ID's. + * + * Copyright (c) 2003-2004, PostgreSQL Global Development Group + * Author: Jan Wieck, Afilias USA INC. + * + * Extend to 8-byte: Marko Kreen, Skype Technologies + *------------------------------------------------------------------------- + */ + + #include "postgres.h" + + #include + + #include "access/xact.h" + #include "access/transam.h" + #include "executor/spi.h" + #include "libpq/pqformat.h" + #include "miscadmin.h" + #include "utils/array.h" + #include "utils/lsyscache.h" + #include "catalog/catversion.h" + #include "catalog/pg_control.h" + #include "funcapi.h" + + #ifdef INT64_IS_BUSTED + #error txid needs working int64 + #endif + + #define MAX_INT64 0x7FFFFFFFFFFFFFFFLL + + /* Use unsigned variant internally */ + typedef uint64 txid; + + /* + * In-memory representation of snapshot. + */ + typedef struct + { + int32 varsz; + uint32 nxip; + txid xmin; + txid xmax; + txid xip[1]; + } txid_snapshot; + + /* + * txid_snapshot_active_list needs to remember + * state between function calls + */ + struct snap_state { + int pos; + txid_snapshot *snap; + }; + + /* + * do a TransactionId -> txid conversion + */ + static txid convert_xid(TransactionId xid, const TxidEpoch *state) + { + uint64 epoch; + + /* avoid issues with the the special meaning of 0 */ + if (xid == InvalidTransactionId) + return MAX_INT64; + + /* return special xid's as-is */ + if (xid < FirstNormalTransactionId) + return xid; + + /* xid can on both sides on wrap-around */ + epoch = state->epoch; + if (TransactionIdPrecedes(xid, state->epoch_xid)) { + if (xid > state->epoch_xid) + epoch--; + } else if (TransactionIdFollows(xid, state->epoch_xid)) { + if (xid < state->epoch_xid) + epoch++; + } + return (epoch << 32) | xid; + } + + static int _cmp_txid(const void *aa, const void *bb) + { + const uint64 *a = aa; + const uint64 *b = bb; + if (*a < *b) + return -1; + if (*a > *b) + return 1; + return 0; + } + + static void sort_snapshot(txid_snapshot *snap) + { + qsort(snap->xip, snap->nxip, sizeof(txid), _cmp_txid); + } + + /* + * Convert a cstring to txid_snapshot + */ + static txid_snapshot * + parse_snapshot(const char *str) + { + int a_size; + txid *xip; + + int a_used = 0; + txid xmin; + txid xmax; + txid last_val = 0, val; + txid_snapshot *snap; + int size; + + char *endp; + + a_size = 1024; + xip = (txid *) palloc(sizeof(txid) * a_size); + + xmin = (txid) strtoull(str, &endp, 0); + if (*endp != ':') + elog(ERROR, "illegal txid_snapshot input format"); + str = endp + 1; + + xmax = (txid) strtoull(str, &endp, 0); + if (*endp != ':') + elog(ERROR, "illegal txid_snapshot input format"); + str = endp + 1; + + /* it should look sane */ + if (xmin >= xmax || xmin > MAX_INT64 || xmax > MAX_INT64 + || xmin == 0 || xmax == 0) + elog(ERROR, "illegal txid_snapshot input format"); + + while (*str != '\0') + { + if (a_used >= a_size) + { + a_size *= 2; + xip = (txid *) repalloc(xip, sizeof(txid) * a_size); + } + + /* read next value */ + if (*str == '\'') + { + str++; + val = (txid) strtoull(str, &endp, 0); + if (*endp != '\'') + elog(ERROR, "illegal txid_snapshot input format"); + str = endp + 1; + } + else + { + val = (txid) strtoull(str, &endp, 0); + str = endp; + } + + /* require the input to be in order */ + if (val < xmin || val <= last_val || val >= xmax) + elog(ERROR, "illegal txid_snapshot input format"); + + xip[a_used++] = val; + last_val = val; + + if (*str == ',') + str++; + else + { + if (*str != '\0') + elog(ERROR, "illegal txid_snapshot input format"); + } + } + + size = offsetof(txid_snapshot, xip) + sizeof(txid) * a_used; + snap = (txid_snapshot *) palloc(size); + snap->varsz = size; + snap->xmin = xmin; + snap->xmax = xmax; + snap->nxip = a_used; + if (a_used > 0) + memcpy(&(snap->xip[0]), xip, sizeof(txid) * a_used); + pfree(xip); + + return snap; + } + + static txid + get_current_txid() + { + TxidEpoch state; + GetTxidEpoch(&state); + return convert_xid(GetTopTransactionId(), &state); + } + + /* + * Return the current transaction ID as txid + */ + Datum + txid_current_txid(PG_FUNCTION_ARGS) + { + PG_RETURN_INT64(get_current_txid()); + } + + /* + * Return current snapshot + */ + Datum + txid_current_snapshot(PG_FUNCTION_ARGS) + { + txid_snapshot *snap; + unsigned num, i, size; + TxidEpoch state; + + if (SerializableSnapshot == NULL) + elog(ERROR, "current_snapshot: SerializableSnapshot == NULL"); + + GetTxidEpoch(&state); + + num = SerializableSnapshot->xcnt; + size = offsetof(txid_snapshot, xip) + sizeof(txid) * num; + snap = palloc(size); + snap->varsz = size; + snap->xmin = convert_xid(SerializableSnapshot->xmin, &state); + snap->xmax = convert_xid(SerializableSnapshot->xmax, &state); + snap->nxip = num; + for (i = 0; i < num; i++) + snap->xip[i] = convert_xid(SerializableSnapshot->xip[i], &state); + + /* we want guaranteed ascending order */ + sort_snapshot(snap); + + PG_RETURN_POINTER(snap); + } + + /* + * Return snapshot's xmin + */ + Datum + txid_snapshot_xmin(PG_FUNCTION_ARGS) + { + txid_snapshot *snap = (txid_snapshot *) PG_GETARG_VARLENA_P(0); + int64 res = snap->xmin; + + PG_FREE_IF_COPY(snap, 0); + PG_RETURN_INT64(res); + } + + /* + * Return snapshot's xmax + */ + Datum + txid_snapshot_xmax(PG_FUNCTION_ARGS) + { + txid_snapshot *snap = (txid_snapshot *) PG_GETARG_VARLENA_P(0); + int64 res = snap->xmax; + + PG_FREE_IF_COPY(snap, 0); + PG_RETURN_INT64(res); + } + + /* + * returns uncommitted TXID's in snapshot. + */ + Datum + txid_snapshot_active_list(PG_FUNCTION_ARGS) + { + FuncCallContext *fctx; + struct snap_state *state; + + if (SRF_IS_FIRSTCALL()) { + txid_snapshot *snap; + int statelen; + + snap = (txid_snapshot *) PG_GETARG_VARLENA_P(0); + + fctx = SRF_FIRSTCALL_INIT(); + statelen = sizeof(*state) + snap->varsz; + state = MemoryContextAlloc(fctx->multi_call_memory_ctx, statelen); + state->pos = 0; + state->snap = (txid_snapshot *)((char *)state + sizeof(*state)); + memcpy(state->snap, snap, snap->varsz); + fctx->user_fctx = state; + PG_FREE_IF_COPY(snap, 0); + } + fctx = SRF_PERCALL_SETUP(); + state = fctx->user_fctx; + if (state->pos < state->snap->nxip) { + Datum res = Int64GetDatum(state->snap->xip[state->pos]); + state->pos++; + SRF_RETURN_NEXT(fctx, res); + } else { + SRF_RETURN_DONE(fctx); + } + } + + /* + * input function for type txid_snapshot + */ + Datum + txid_snapshot_in(PG_FUNCTION_ARGS) + { + txid_snapshot *snap; + char *str = PG_GETARG_CSTRING(0); + + snap = parse_snapshot(str); + PG_RETURN_POINTER(snap); + } + + /* + * output function for type txid_snapshot + */ + Datum + txid_snapshot_out(PG_FUNCTION_ARGS) + { + txid_snapshot *snap = (txid_snapshot *) PG_GETARG_VARLENA_P(0); + + char *str = palloc(60 + snap->nxip * 30); + char *cp = str; + int i; + + snprintf(str, 60, "%llu:%llu:", + (unsigned long long)snap->xmin, + (unsigned long long)snap->xmax); + cp = str + strlen(str); + + for (i = 0; i < snap->nxip; i++) + { + snprintf(cp, 30, "%llu%s", + (unsigned long long)snap->xip[i], + (i < snap->nxip - 1) ? "," : ""); + cp += strlen(cp); + } + + PG_FREE_IF_COPY(snap, 0); + + PG_RETURN_CSTRING(str); + } + + /* + * convert text to txid_snapshot + */ + Datum + txid_snapshot_from_text(PG_FUNCTION_ARGS) + { + text *txt = PG_GETARG_TEXT_P(0); + txid_snapshot *snap; + char *str; + int len; + + len = VARSIZE(txt) - VARHDRSZ; + str = palloc(len + 1); + memcpy(str, VARDATA(txt), len); + str[len] = 0; + + snap = parse_snapshot(str); + + pfree(str); + PG_FREE_IF_COPY(txt, 0); + + PG_RETURN_POINTER(snap); + } + + /* + * convert txid_snapshot to text + */ + Datum + txid_snapshot_to_text(PG_FUNCTION_ARGS) + { + txid_snapshot *snap = (txid_snapshot *) PG_GETARG_VARLENA_P(0); + + text *res = palloc(VARHDRSZ + 60 + snap->nxip * 30); + char *str = VARDATA(res); + char *cp = str; + int i; + + snprintf(str, 60, "%llu:%llu:", + (unsigned long long)snap->xmin, + (unsigned long long)snap->xmax); + cp = str + strlen(str); + + for (i = 0; i < snap->nxip; i++) + { + snprintf(cp, 30, "%llu%s", + (unsigned long long)snap->xip[i], + (i < snap->nxip - 1) ? "," : ""); + cp += strlen(cp); + } + + VARATT_SIZEP(res) = VARHDRSZ + cp - str; + + PG_FREE_IF_COPY(snap, 0); + + PG_RETURN_TEXT_P(res); + } + + /* + * read binary representation + */ + Datum + txid_snapshot_recv(PG_FUNCTION_ARGS) + { + StringInfo buf = (StringInfo) PG_GETARG_POINTER(0); + txid_snapshot *snap; + unsigned i, count, size; + txid val; + + count = pq_getmsgint(buf, 4); + size = offsetof(txid_snapshot, xip) + sizeof(txid) * count; + snap = palloc(size); + + snap->varsz = size; + snap->nxip = count; + snap->xmin = pq_getmsgint64(buf); + snap->xmax = pq_getmsgint64(buf); + val = snap->xmin; + for (i = 0; i < count; i++) { + unsigned delta; + delta = pq_getmsgint(buf, 2); + if (delta & 0x8000) + val += ((delta & 0x7FFF) << 16) + pq_getmsgint(buf, 2); + else + val += delta; + if (val < snap->xmin || val > snap->xmax) + elog(ERROR, "corrupt snapshot data"); + snap->xip[i] = val; + } + + PG_RETURN_POINTER(snap); + } + + /* + * binary storage + */ + Datum + txid_snapshot_send(PG_FUNCTION_ARGS) + { + int i; + txid_snapshot *snap = (txid_snapshot *) PG_GETARG_VARLENA_P(0); + StringInfoData buf; + txid val; + + pq_begintypsend(&buf); + pq_sendint(&buf, snap->nxip, 4); + pq_sendint64(&buf, snap->xmin); + pq_sendint64(&buf, snap->xmax); + val = snap->xmin; + for (i = 0; i < snap->nxip; i++) { + unsigned delta = (unsigned)(snap->xip[i] - val); + val = snap->xip[i]; + if (delta > 0x7FFF) { + pq_sendint(&buf, (delta >> 16) | 0x8000, 2); + pq_sendint(&buf, delta & 0xFFFF, 2); + } else { + pq_sendint(&buf, delta, 2); + } + } + + PG_FREE_IF_COPY(snap, 0); + + PG_RETURN_BYTEA_P(pq_endtypsend(&buf)); + } + + /* + * checks if txid is visible in snapshot? + */ + Datum + txid_snapshot_contains(PG_FUNCTION_ARGS) + { + txid_snapshot *snap = (txid_snapshot *) PG_GETARG_VARLENA_P(0); + txid value = PG_GETARG_INT64(1); + int i; + + if (value < snap->xmin) + PG_RETURN_BOOL(true); + + if (value >= snap->xmax) + PG_RETURN_BOOL(false); + + for (i = 0; i < snap->nxip; i++) + { + if (value == snap->xip[i]) + PG_RETURN_BOOL(false); + } + + PG_FREE_IF_COPY(snap, 0); + + PG_RETURN_BOOL(true); + } + + /* + * Bump epoch to make following txids greater than sync_txid. + */ + Datum + pg_sync_txid(PG_FUNCTION_ARGS) + { + int64 sync_txid = PG_GETARG_INT64(0); + + if (!superuser()) + elog(ERROR, "must be superuser to use pg_sync_txid"); + + SyncTxidEpoch(sync_txid); + + PG_RETURN_INT64(get_current_txid()); + } + Index: pgsql/src/backend/utils/adt/Makefile =================================================================== *** pgsql.orig/src/backend/utils/adt/Makefile --- pgsql/src/backend/utils/adt/Makefile *************** OBJS = acl.o arrayfuncs.o array_userfunc *** 25,31 **** tid.o timestamp.o varbit.o varchar.o varlena.o version.o xid.o \ network.o mac.o inet_net_ntop.o inet_net_pton.o \ ri_triggers.o pg_lzcompress.o pg_locale.o formatting.o \ ! ascii.o quote.o pgstatfuncs.o encode.o dbsize.o genfile.o like.o: like.c like_match.c --- 25,31 ---- tid.o timestamp.o varbit.o varchar.o varlena.o version.o xid.o \ network.o mac.o inet_net_ntop.o inet_net_pton.o \ ri_triggers.o pg_lzcompress.o pg_locale.o formatting.o \ ! ascii.o quote.o pgstatfuncs.o encode.o dbsize.o genfile.o txid.o like.o: like.c like_match.c Index: pgsql/src/include/catalog/pg_proc.h =================================================================== *** pgsql.orig/src/include/catalog/pg_proc.h --- pgsql/src/include/catalog/pg_proc.h *************** DESCR("List all files in a directory"); *** 3086,3091 **** --- 3086,3118 ---- DATA(insert OID = 2626 ( pg_sleep PGNSP PGUID 12 f f t f v 1 2278 "701" _null_ _null_ _null_ pg_sleep - _null_ )); DESCR("Sleep for the specified time in seconds"); + DATA(insert OID = 2789 ( current_txid PGNSP PGUID 12 f f t f v 0 20 "" _null_ _null_ _null_ txid_current_txid - _null_ )); + DESCR("Get current top transaction id"); + DATA(insert OID = 2790 ( current_snapshot PGNSP PGUID 12 f f t f v 0 2800 "" _null_ _null_ _null_ txid_current_snapshot - _null_ )); + DESCR("Returns current snapshot."); + + DATA(insert OID = 2791 ( snapshot_in PGNSP PGUID 12 f f t f i 1 2800 "2275" _null_ _null_ _null_ txid_snapshot_in - _null_ )); + DESCR("I/O"); + DATA(insert OID = 2792 ( snapshot_out PGNSP PGUID 12 f f t f i 1 2275 "2800" _null_ _null_ _null_ txid_snapshot_out - _null_ )); + DESCR("I/O"); + DATA(insert OID = 2793 ( snapshot_recv PGNSP PGUID 12 f f t f i 1 2800 "2281" _null_ _null_ _null_ txid_snapshot_recv - _null_ )); + DESCR("I/O"); + DATA(insert OID = 2794 ( snapshot_send PGNSP PGUID 12 f f t f i 1 17 "2800" _null_ _null_ _null_ txid_snapshot_send - _null_ )); + DESCR("I/O"); + DATA(insert OID = 2795 ( snapshot_xmin PGNSP PGUID 12 f f t f i 1 20 "2800" _null_ _null_ _null_ txid_snapshot_xmin - _null_ )); + DESCR("Returns snapshot's xmin."); + DATA(insert OID = 2796 ( snapshot_xmax PGNSP PGUID 12 f f t f i 1 20 "2800" _null_ _null_ _null_ txid_snapshot_xmax - _null_ )); + DESCR("Returns snapshot's xmax."); + DATA(insert OID = 2797 ( snapshot_contains PGNSP PGUID 12 f f t f i 2 16 "2800 20" _null_ _null_ _null_ txid_snapshot_contains - _null_ )); + DESCR("Checks whether txid is in snapshot."); + DATA(insert OID = 2798 ( snapshot_active_list PGNSP PGUID 12 f f t t i 1 20 "2800" _null_ _null_ _null_ txid_snapshot_active_list - _null_ )); + DESCR("Active transactions between snapshot xmin and xmax."); + DATA(insert OID = 2799 ( pg_sync_txid PGNSP PGUID 12 f f t f v 1 20 "20" _null_ _null_ _null_ pg_sync_txid - _null_ )); + DESCR("Synchronizes epoch so that new txid's will be larger that given txid"); + DATA(insert OID = 2803 ( snapshot PGNSP PGUID 12 f f t f i 1 2800 "25" _null_ _null_ _null_ txid_snapshot_from_text - _null_ )); + DESCR("Converts text to snapshot"); + DATA(insert OID = 2804 ( text PGNSP PGUID 12 f f t f i 1 25 "2800" _null_ _null_ _null_ txid_snapshot_to_text - _null_ )); + DESCR("Converts snapshot to text"); /* Aggregates (moved here from pg_aggregate for 7.3) */ Index: pgsql/src/include/catalog/pg_type.h =================================================================== *** pgsql.orig/src/include/catalog/pg_type.h --- pgsql/src/include/catalog/pg_type.h *************** DATA(insert OID = 2210 ( _regclass PG *** 518,523 **** --- 518,528 ---- DATA(insert OID = 2211 ( _regtype PGNSP PGUID -1 f b t \054 0 2206 array_in array_out array_recv array_send - i x f 0 -1 0 _null_ _null_ )); #define REGTYPEARRAYOID 2211 + DATA(insert OID = 2800 ( snapshot PGNSP PGUID -1 f b t \054 0 0 snapshot_in snapshot_out snapshot_recv snapshot_send - d p f 0 -1 0 _null_ _null_ )); + #define SNAPSHOTOID 2800 + DATA(insert OID = 2801 ( _snapshot PGNSP PGUID -1 f b t \054 0 2800 array_in array_out array_recv array_send - i p f 0 -1 0 _null_ _null_ )); + #define SNAPSHOTARRAYOID 2801 + /* * pseudo-types * Index: pgsql/src/include/catalog/pg_cast.h =================================================================== *** pgsql.orig/src/include/catalog/pg_cast.h --- pgsql/src/include/catalog/pg_cast.h *************** DATA(insert ( 1560 1560 1685 i )); *** 392,395 **** --- 392,399 ---- DATA(insert ( 1562 1562 1687 i )); DATA(insert ( 1700 1700 1703 i )); + /* snapshot to/from text */ + DATA(insert ( 25 2800 2803 e )); + DATA(insert ( 2800 25 2804 e )); + #endif /* PG_CAST_H */ Index: pgsql/src/bin/pg_dump/pg_dumpall.c =================================================================== *** pgsql.orig/src/bin/pg_dump/pg_dumpall.c --- pgsql/src/bin/pg_dump/pg_dumpall.c *************** static PGconn *connectDatabase(const cha *** 59,64 **** --- 59,66 ---- static PGresult *executeQuery(PGconn *conn, const char *query); static void executeCommand(PGconn *conn, const char *query); + static void dumpTxidEpoch(PGconn *conn); + static char pg_dump_bin[MAXPGPATH]; static PQExpBuffer pgdumpopts; static bool output_clean = false; *************** main(int argc, char *argv[]) *** 349,354 **** --- 351,360 ---- if (server_version >= 80000) dumpTablespaces(conn); + /* Dump txid state */ + if (server_version >= 80200) + dumpTxidEpoch(conn); + /* Dump CREATE DATABASE commands */ if (!globals_only) dumpCreateDB(conn); *************** help(void) *** 410,415 **** --- 416,442 ---- } + static void + dumpTxidEpoch(PGconn *conn) + { + long long txid; + PGresult *res; + int ntups; + + res = executeQuery(conn, "SELECT get_current_txid()"); + ntups = PQntuples(res); + if (ntups != 1) + { + fprintf(stderr, "ERROR: get_current_txid() failed\n"); + exit(1); + } + txid = atoll(PQgetvalue(res, 0, 0)); + PQclear(res); + + printf("\n--\n-- synchronize txid epoch\n--\n"); + printf("SELECT pg_sync_txid(%lld);\n\n", txid); + } + /* * Dump roles Index: pgsql/src/test/regress/parallel_schedule =================================================================== *** pgsql.orig/src/test/regress/parallel_schedule --- pgsql/src/test/regress/parallel_schedule *************** test: numerology *** 12,18 **** # ---------- # The second group of parallel test # ---------- ! test: point lseg box path polygon circle date time timetz timestamp timestamptz interval abstime reltime tinterval inet comments oidjoins type_sanity opr_sanity # Depends on point, lseg, box, path, polygon and circle test: geometry --- 12,18 ---- # ---------- # The second group of parallel test # ---------- ! test: point lseg box path polygon circle date time timetz timestamp timestamptz interval abstime reltime tinterval inet comments oidjoins type_sanity opr_sanity txid # Depends on point, lseg, box, path, polygon and circle test: geometry Index: pgsql/src/test/regress/serial_schedule =================================================================== *** pgsql.orig/src/test/regress/serial_schedule --- pgsql/src/test/regress/serial_schedule *************** test: comments *** 35,40 **** --- 35,41 ---- test: oidjoins test: type_sanity test: opr_sanity + test: txid test: geometry test: horology test: insert Index: pgsql/src/test/regress/expected/txid.out =================================================================== *** /dev/null --- pgsql/src/test/regress/expected/txid.out *************** *** 0 **** --- 1,159 ---- + -- i/o + select '12:20:'::snapshot; + snapshot + ---------- + 12:20: + (1 row) + + select '12:15:'::snapshot; + snapshot + ---------- + 12:15: + (1 row) + + -- text conversion + select '12:13:'::text::snapshot; + snapshot + ---------- + 12:13: + (1 row) + + select '12:13:'::snapshot::text; + text + -------- + 12:13: + (1 row) + + -- errors + select '31:12:'::snapshot; + ERROR: illegal txid_snapshot input format + select '0:1:'::snapshot; + ERROR: illegal txid_snapshot input format + select '12:13:0'::snapshot; + ERROR: illegal txid_snapshot input format + select '12:20:14,13'::snapshot; + ERROR: illegal txid_snapshot input format + -- info + select snapshot_xmin('1:2:'::snapshot); + snapshot_xmin + --------------- + 1 + (1 row) + + select snapshot_xmax('1:2:'::snapshot); + snapshot_xmax + --------------- + 2 + (1 row) + + select * from snapshot_active_list('1:20:3,4,5,6,7'::snapshot); + snapshot_active_list + ---------------------- + 3 + 4 + 5 + 6 + 7 + (5 rows) + + -- storage + create table snapshot_test ( + nr integer, + snap snapshot + ); + insert into snapshot_test values (1, '12:13:'); + -- small delta + insert into snapshot_test values (2, '12:20:13,15,18'); + -- large delta + insert into snapshot_test values (3, '2000000:7000000:3000001,4000002,5000003,6000004'); + select snap, + snapshot_xmin(snap), + snapshot_xmax(snap) + from snapshot_test order by nr; + snap | snapshot_xmin | snapshot_xmax + -------------------------------------------------+---------------+--------------- + 12:13: | 12 | 13 + 12:20:13,15,18 | 12 | 20 + 2000000:7000000:3000001,4000002,5000003,6000004 | 2000000 | 7000000 + (3 rows) + + select id, snapshot_contains(snap, id) + from snapshot_test, generate_series(11, 21) id + where nr = 2; + id | snapshot_contains + ----+------------------- + 11 | t + 12 | t + 13 | f + 14 | t + 15 | f + 16 | t + 17 | t + 18 | f + 19 | t + 20 | f + 21 | f + (11 rows) + + -- test current values also + select current_txid() >= snapshot_xmin(current_snapshot()); + ?column? + ---------- + t + (1 row) + + select current_txid() < snapshot_xmax(current_snapshot()); + ?column? + ---------- + t + (1 row) + + select snapshot_contains(current_snapshot(), current_txid()); + snapshot_contains + ------------------- + t + (1 row) + + -- pg_resync + select current_txid() >> 32; + ?column? + ---------- + 0 + (1 row) + + select pg_sync_txid(400); + pg_sync_txid + -------------- + + (1 row) + + select current_txid() >> 32; + ?column? + ---------- + 0 + (1 row) + + select pg_sync_txid(8589934592); + pg_sync_txid + -------------- + + (1 row) + + select current_txid() >> 32; + ?column? + ---------- + 2 + (1 row) + + select pg_sync_txid(400); + pg_sync_txid + -------------- + + (1 row) + + select current_txid() >> 32; + ?column? + ---------- + 2 + (1 row) + Index: pgsql/src/test/regress/sql/txid.sql =================================================================== *** /dev/null --- pgsql/src/test/regress/sql/txid.sql *************** *** 0 **** --- 1,55 ---- + + -- i/o + select '12:20:'::snapshot; + select '12:15:'::snapshot; + + -- text conversion + select '12:13:'::text::snapshot; + select '12:13:'::snapshot::text; + + -- errors + select '31:12:'::snapshot; + select '0:1:'::snapshot; + select '12:13:0'::snapshot; + select '12:20:14,13'::snapshot; + + -- info + select snapshot_xmin('1:2:'::snapshot); + select snapshot_xmax('1:2:'::snapshot); + select * from snapshot_active_list('1:20:3,4,5,6,7'::snapshot); + + -- storage + create table snapshot_test ( + nr integer, + snap snapshot + ); + + insert into snapshot_test values (1, '12:13:'); + -- small delta + insert into snapshot_test values (2, '12:20:13,15,18'); + -- large delta + insert into snapshot_test values (3, '2000000:7000000:3000001,4000002,5000003,6000004'); + + select snap, + snapshot_xmin(snap), + snapshot_xmax(snap) + from snapshot_test order by nr; + + select id, snapshot_contains(snap, id) + from snapshot_test, generate_series(11, 21) id + where nr = 2; + + -- test current values also + select current_txid() >= snapshot_xmin(current_snapshot()); + select current_txid() < snapshot_xmax(current_snapshot()); + select snapshot_contains(current_snapshot(), current_txid()); + + -- pg_resync + select current_txid() >> 32; + select pg_sync_txid(400); + select current_txid() >> 32; + select pg_sync_txid(8589934592); + select current_txid() >> 32; + select pg_sync_txid(400); + select current_txid() >> 32; + Index: pgsql/src/test/regress/output/misc.source =================================================================== *** pgsql.orig/src/test/regress/output/misc.source --- pgsql/src/test/regress/output/misc.source *************** SELECT user_relns() AS user_relns *** 650,655 **** --- 650,656 ---- road shighway slow_emp4000 + snapshot_test street stud_emp student *************** SELECT user_relns() AS user_relns *** 665,671 **** toyemp varchar_tbl xacttest ! (99 rows) SELECT name(equipment(hobby_construct(text 'skywalking', text 'mer'))); name --- 666,672 ---- toyemp varchar_tbl xacttest ! (100 rows) SELECT name(equipment(hobby_construct(text 'skywalking', text 'mer'))); name