diff -rcN base/doc/src/sgml/libpq.sgml new/doc/src/sgml/libpq.sgml
*** base/doc/src/sgml/libpq.sgml 2009-09-21 16:48:36.000000000 +0900
--- new/doc/src/sgml/libpq.sgml 2009-09-29 21:37:10.000000000 +0900
***************
*** 4752,4767 ****
PQgetXLogData> obtains a set of WAL records from WAL stream.
If successful, returns the length of WAL records (always > 0) as result,
! sets buffer> to a pointer to them, and sets xlogid
! > and xrecoff> to the ending WAL location.
! finishing_seg> is set to true (1) if the WAL records are at the end of WAL
! segment, false (0) otherwise. fsync_requested> is set to true
! if the WAL records need to be fsynced, false otherwise.
!
!
! Since the buffer> points to the internal buffer which is
! reused repeatedly, the caller doesn't need to free it, instead must call
! PQmarkConsumed> when it's no longer necessary.
When the async> is true (1), PQgetXLogData> will
--- 4752,4762 ----
PQgetXLogData> obtains a set of WAL records from WAL stream.
If successful, returns the length of WAL records (always > 0) as result,
! sets buffer> to a pointer to a malloc'd records, and sets
! xlogid> and xrecoff> to the starting WAL location.
! finishing_seg> is set to true (1) if the WAL records are
! at the end of WAL segment, false (0) otherwise. fsync_requested>
! is set to true if the WAL records need to be fsynced, false otherwise.
When the async> is true (1), PQgetXLogData> will
***************
*** 5121,5152 ****
!
!
! PQmarkConsumed
!
! PQmarkConsumed
!
!
!
!
!
! Mark the previously read message as consumed.
!
! void PQmarkConsumed(PGconn *conn);
!
!
!
!
! Some libpq functions don't mark a message as consumed
! because the same message may be read again. But unless the message is consumed,
! the next message can not be read. This function is used when the caller wants
! to read the next message and is sure that the current message need not be read again.
!
!
!
!
!
--- 5116,5122 ----
!
diff -rcN base/src/backend/access/nbtree/nbtsort.c new/src/backend/access/nbtree/nbtsort.c
*** base/src/backend/access/nbtree/nbtsort.c 2009-09-21 16:48:36.000000000 +0900
--- new/src/backend/access/nbtree/nbtsort.c 2009-09-29 21:37:10.000000000 +0900
***************
*** 214,221 ****
* We need to log index creation in WAL iff either WAL archiving is enabled
* or XLOG streaming is allowed AND it's not a temp index.
*/
! wstate.btws_use_wal = (XLogArchivingActive() || XLogStreamingAllowed()) &&
! !wstate.index->rd_istemp;
/* reserve the metapage */
wstate.btws_pages_alloced = BTREE_METAPAGE + 1;
--- 214,220 ----
* We need to log index creation in WAL iff either WAL archiving is enabled
* or XLOG streaming is allowed AND it's not a temp index.
*/
! wstate.btws_use_wal = XLogIsNeeded() && !wstate.index->rd_istemp;
/* reserve the metapage */
wstate.btws_pages_alloced = BTREE_METAPAGE + 1;
diff -rcN base/src/backend/access/transam/xlog.c new/src/backend/access/transam/xlog.c
*** base/src/backend/access/transam/xlog.c 2009-09-21 16:48:36.000000000 +0900
--- new/src/backend/access/transam/xlog.c 2009-09-29 22:03:25.000000000 +0900
***************
*** 185,190 ****
--- 185,191 ----
/* options taken from recovery.conf for XLOG streaming */
static bool StandbyMode = false;
+ char *PrimaryConnInfo = NULL;
char *TriggerFile = NULL;
/* if recoveryStopsHere returns true, it saves actual stop xid/time here */
***************
*** 4688,4693 ****
--- 4689,4701 ----
errmsg("could not close control file: %m")));
}
+ uint64
+ GetSystemIdentifier(void)
+ {
+ Assert(ControlFile != NULL);
+ return ControlFile->system_identifier;
+ }
+
/*
* Initialization of shared memory for XLOG
*/
***************
*** 4934,4940 ****
{
FILE *fd;
char cmdline[MAXPGPATH];
- char *conninfo = NULL;
TimeLineID rtli = 0;
bool rtliGiven = false;
bool syntaxError = false;
--- 4942,4947 ----
***************
*** 5087,5096 ****
}
else if (strcmp(tok1, "primary_conninfo") == 0)
{
! conninfo = pstrdup(tok2);
ereport(LOG,
(errmsg("primary_conninfo = '%s'",
! conninfo)));
}
else if (strcmp(tok1, "trigger_file") == 0)
{
--- 5094,5103 ----
}
else if (strcmp(tok1, "primary_conninfo") == 0)
{
! PrimaryConnInfo = pstrdup(tok2);
ereport(LOG,
(errmsg("primary_conninfo = '%s'",
! PrimaryConnInfo)));
}
else if (strcmp(tok1, "trigger_file") == 0)
{
***************
*** 5113,5126 ****
cmdline),
errhint("Lines should have the format parameter = 'value'.")));
- /* Inform walreceiver of the connection information via file */
- if (StandbyMode)
- {
- write_conninfo_file(conninfo);
- if (conninfo)
- pfree(conninfo);
- }
-
/* If not in standby mode, restore_command must be supplied */
if (!StandbyMode && recoveryRestoreCommand == NULL)
ereport(FATAL,
--- 5120,5125 ----
diff -rcN base/src/backend/commands/cluster.c new/src/backend/commands/cluster.c
*** base/src/backend/commands/cluster.c 2009-09-21 16:48:36.000000000 +0900
--- new/src/backend/commands/cluster.c 2009-09-29 21:37:10.000000000 +0900
***************
*** 798,804 ****
* We need to log the copied data in WAL iff either WAL archiving is enabled
* or XLOG streaming is allowed AND it's not a temp rel.
*/
! use_wal = (XLogArchivingActive() || XLogStreamingAllowed()) && !NewHeap->rd_istemp;
/* use_wal off requires rd_targblock be initially invalid */
Assert(NewHeap->rd_targblock == InvalidBlockNumber);
--- 798,804 ----
* We need to log the copied data in WAL iff either WAL archiving is enabled
* or XLOG streaming is allowed AND it's not a temp rel.
*/
! use_wal = XLogIsNeeded() && !NewHeap->rd_istemp;
/* use_wal off requires rd_targblock be initially invalid */
Assert(NewHeap->rd_targblock == InvalidBlockNumber);
diff -rcN base/src/backend/commands/copy.c new/src/backend/commands/copy.c
*** base/src/backend/commands/copy.c 2009-09-21 16:48:36.000000000 +0900
--- new/src/backend/commands/copy.c 2009-09-29 21:37:10.000000000 +0900
***************
*** 1738,1744 ****
cstate->rel->rd_newRelfilenodeSubid != InvalidSubTransactionId)
{
hi_options |= HEAP_INSERT_SKIP_FSM;
! if (!XLogArchivingActive() && !XLogStreamingAllowed())
hi_options |= HEAP_INSERT_SKIP_WAL;
}
--- 1738,1744 ----
cstate->rel->rd_newRelfilenodeSubid != InvalidSubTransactionId)
{
hi_options |= HEAP_INSERT_SKIP_FSM;
! if (!XLogIsNeeded())
hi_options |= HEAP_INSERT_SKIP_WAL;
}
diff -rcN base/src/backend/commands/tablecmds.c new/src/backend/commands/tablecmds.c
*** base/src/backend/commands/tablecmds.c 2009-09-21 16:48:36.000000000 +0900
--- new/src/backend/commands/tablecmds.c 2009-09-29 21:37:10.000000000 +0900
***************
*** 7011,7017 ****
* We need to log the copied data in WAL iff either WAL archiving is enabled
* or XLOG streaming is allowed AND it's not a temp rel.
*/
! use_wal = (XLogArchivingActive() || XLogStreamingAllowed()) && !istemp;
nblocks = smgrnblocks(src, forkNum);
--- 7011,7017 ----
* We need to log the copied data in WAL iff either WAL archiving is enabled
* or XLOG streaming is allowed AND it's not a temp rel.
*/
! use_wal = XLogIsNeeded() && !istemp;
nblocks = smgrnblocks(src, forkNum);
diff -rcN base/src/backend/executor/execMain.c new/src/backend/executor/execMain.c
*** base/src/backend/executor/execMain.c 2009-09-21 16:48:36.000000000 +0900
--- new/src/backend/executor/execMain.c 2009-09-29 21:37:10.000000000 +0900
***************
*** 2980,2987 ****
* XLOG streaming is allowed. We can skip the FSM in any case.
*/
myState->hi_options = HEAP_INSERT_SKIP_FSM |
! (XLogArchivingActive() || XLogStreamingAllowed() ?
! 0 : HEAP_INSERT_SKIP_WAL);
myState->bistate = GetBulkInsertState();
/* Not using WAL requires rd_targblock be initially invalid */
--- 2980,2986 ----
* XLOG streaming is allowed. We can skip the FSM in any case.
*/
myState->hi_options = HEAP_INSERT_SKIP_FSM |
! (XLogIsNeeded() ? 0 : HEAP_INSERT_SKIP_WAL);
myState->bistate = GetBulkInsertState();
/* Not using WAL requires rd_targblock be initially invalid */
diff -rcN base/src/backend/libpq/hba.c new/src/backend/libpq/hba.c
*** base/src/backend/libpq/hba.c 2009-09-21 16:48:36.000000000 +0900
--- new/src/backend/libpq/hba.c 2009-09-29 21:37:10.000000000 +0900
***************
*** 25,32 ****
--- 25,34 ----
#include
#include
+ #include "access/xlog.h"
#include "libpq/ip.h"
#include "libpq/libpq.h"
+ #include "postmaster/walsender.h"
#include "regex/regex.h"
#include "storage/fd.h"
#include "utils/acl.h"
***************
*** 182,188 ****
(strcmp(start_buf, "all") == 0 ||
strcmp(start_buf, "sameuser") == 0 ||
strcmp(start_buf, "samegroup") == 0 ||
! strcmp(start_buf, "samerole") == 0))
{
/* append newline to a magical keyword */
*buf++ = '\n';
--- 184,191 ----
(strcmp(start_buf, "all") == 0 ||
strcmp(start_buf, "sameuser") == 0 ||
strcmp(start_buf, "samegroup") == 0 ||
! strcmp(start_buf, "samerole") == 0 ||
! strcmp(start_buf, "replication") == 0))
{
/* append newline to a magical keyword */
*buf++ = '\n';
***************
*** 506,511 ****
--- 509,517 ----
if (is_member(roleid, dbname))
return true;
}
+ else if (strcmp(tok, "replication\n") == 0 &&
+ am_walsender)
+ return true;
else if (strcmp(tok, dbname) == 0)
return true;
}
diff -rcN base/src/backend/libpq/pg_hba.conf.sample new/src/backend/libpq/pg_hba.conf.sample
*** base/src/backend/libpq/pg_hba.conf.sample 2009-09-21 16:48:36.000000000 +0900
--- new/src/backend/libpq/pg_hba.conf.sample 2009-09-29 21:37:10.000000000 +0900
***************
*** 20,27 ****
# "host" is either a plain or SSL-encrypted TCP/IP socket, "hostssl" is an
# SSL-encrypted TCP/IP socket, and "hostnossl" is a plain TCP/IP socket.
#
! # DATABASE can be "all", "sameuser", "samerole", a database name, or
! # a comma-separated list thereof.
#
# USER can be "all", a user name, a group name prefixed with "+", or
# a comma-separated list thereof. In both the DATABASE and USER fields
--- 20,27 ----
# "host" is either a plain or SSL-encrypted TCP/IP socket, "hostssl" is an
# SSL-encrypted TCP/IP socket, and "hostnossl" is a plain TCP/IP socket.
#
! # DATABASE can be "all", "sameuser", "samerole", "replication",
! # a database name, or a comma-separated list thereof.
#
# USER can be "all", a user name, a group name prefixed with "+", or
# a comma-separated list thereof. In both the DATABASE and USER fields
***************
*** 44,52 ****
# for a list of which options are available for which authentication methods.
#
# Database and user names containing spaces, commas, quotes and other special
! # characters must be quoted. Quoting one of the keywords "all", "sameuser" or
! # "samerole" makes the name lose its special character, and just match a
! # database or username with that name.
#
# This file is read on server startup and when the postmaster receives
# a SIGHUP signal. If you edit the file on a running system, you have
--- 44,52 ----
# for a list of which options are available for which authentication methods.
#
# Database and user names containing spaces, commas, quotes and other special
! # characters must be quoted. Quoting one of the keywords "all", "sameuser",
! # "samerole" or "replication" makes the name lose its special character,
! # and just match a database or username with that name.
#
# This file is read on server startup and when the postmaster receives
# a SIGHUP signal. If you edit the file on a running system, you have
diff -rcN base/src/backend/postmaster/postmaster.c new/src/backend/postmaster/postmaster.c
*** base/src/backend/postmaster/postmaster.c 2009-09-21 16:48:36.000000000 +0900
--- new/src/backend/postmaster/postmaster.c 2009-09-30 00:10:32.000000000 +0900
***************
*** 1789,1794 ****
--- 1789,1798 ----
if (strlen(port->user_name) >= NAMEDATALEN)
port->user_name[NAMEDATALEN - 1] = '\0';
+ /* Walsender is not related to a particular database */
+ if (am_walsender)
+ port->database_name[0] = '\0';
+
/*
* Done putting stuff in TopMemoryContext.
*/
***************
*** 4368,4373 ****
--- 4372,4379 ----
case WalReceiverProcess:
ereport(LOG,
(errmsg("could not fork WAL receiver process: %m")));
+ /* Report the failure to the startup process */
+ WalRcvFailed();
break;
default:
ereport(LOG,
diff -rcN base/src/backend/postmaster/walreceiver.c new/src/backend/postmaster/walreceiver.c
*** base/src/backend/postmaster/walreceiver.c 2009-09-21 16:48:36.000000000 +0900
--- new/src/backend/postmaster/walreceiver.c 2009-09-30 00:07:59.000000000 +0900
***************
*** 48,54 ****
static WalRcvData *WalRcv = NULL;
/* streamConn is a PGconn object of a connection to walsender from walreceiver */
! static PGconn *streamConn;
/* Path for the connection information file (relative to $PGDATA) */
#define CONNINFO_FILENAME "global/conninfo"
--- 48,54 ----
static WalRcvData *WalRcv = NULL;
/* streamConn is a PGconn object of a connection to walsender from walreceiver */
! static PGconn *streamConn = NULL;
/* Path for the connection information file (relative to $PGDATA) */
#define CONNINFO_FILENAME "global/conninfo"
***************
*** 62,67 ****
--- 62,70 ----
static uint32 recvSeg = 0;
static uint32 recvOff = 0;
+ /* Buffer for currently read records */
+ static char *recvBuf = NULL;
+
/*
* ZeroedRecPtr indicates the byte position that we have already zeroed. It is
* updated when walreceiver writes a half-filled page that needs to be zeroed.
***************
*** 102,115 ****
static void WalRcvKill(int code, Datum arg);
static void XLogWalRcvWrite(char *buf, Size len, XLogRecPtr recptr);
static void XLogWalRcvFlush(void);
- static char *read_conninfo_file(void);
/* Main entry point for walreceiver process */
void
WalReceiverMain(void)
{
MemoryContext walrcv_context;
char *conninfo;
/* Mark walreceiver in progress */
InitWalRcv();
--- 105,119 ----
static void WalRcvKill(int code, Datum arg);
static void XLogWalRcvWrite(char *buf, Size len, XLogRecPtr recptr);
static void XLogWalRcvFlush(void);
/* Main entry point for walreceiver process */
void
WalReceiverMain(void)
{
+ sigjmp_buf local_sigjmp_buf;
MemoryContext walrcv_context;
char *conninfo;
+ uint64 *system_identifier;
/* Mark walreceiver in progress */
InitWalRcv();
***************
*** 168,173 ****
--- 172,224 ----
ALLOCSET_DEFAULT_MAXSIZE);
MemoryContextSwitchTo(walrcv_context);
+ /*
+ * If an exception is encountered, processing resumes here.
+ *
+ * This code is heavily based on bgwriter.c, q.v.
+ */
+ if (sigsetjmp(local_sigjmp_buf, 1) != 0)
+ {
+ /* Since not using PG_TRY, must reset error stack by hand */
+ error_context_stack = NULL;
+
+ /* Prevent interrupts while cleaning up */
+ HOLD_INTERRUPTS();
+
+ /* Report the error to the server log */
+ EmitErrorReport();
+
+ /* Free the data structure related to a connection */
+ PQfinish(streamConn);
+ streamConn = NULL;
+ if (recvBuf != NULL)
+ PQfreemem(recvBuf);
+ recvBuf = NULL;
+
+ /*
+ * Now return to normal top-level context and clear ErrorContext for
+ * next time.
+ */
+ MemoryContextSwitchTo(walrcv_context);
+ FlushErrorState();
+
+ /* Flush any leaked data in the top-level context */
+ MemoryContextResetAndDeleteChildren(walrcv_context);
+
+ /* Now we can allow interrupts again */
+ RESUME_INTERRUPTS();
+
+ /*
+ * Sleep at least 1 second after any error. A write error is likely
+ * to be repeated, and we don't want to be filling the error logs as
+ * fast as we can.
+ */
+ pg_usleep(1000000L);
+ }
+
+ /* We can now handle ereport(ERROR) */
+ PG_exception_stack = &local_sigjmp_buf;
+
/* Unblock signals (they were blocked when the postmaster forked us) */
PG_SETMASK(&UnBlockSig);
***************
*** 178,201 ****
SpinLockAcquire(&walrcv->mutex);
LogstreamResult = walrcv->LogstreamResult;
SpinLockRelease(&walrcv->mutex);
/* Report XLOG streaming progress in PS display */
ReportLogstreamResult();
}
- /* Read the connection information used to connect with the primary */
- conninfo = read_conninfo_file();
-
/* Set up a connection for XLOG streaming */
streamConn = PQstartXLogStreaming(conninfo,
LogstreamResult.Write.xlogid,
LogstreamResult.Write.xrecoff);
if (PQstatus(streamConn) != CONNECTION_OK)
! ereport(FATAL,
(errmsg("could not connect to the primary server : %s",
PQerrorMessage(streamConn))));
! pfree(conninfo);
/*
* Confirm that the current timeline of the primary is the same
--- 229,273 ----
SpinLockAcquire(&walrcv->mutex);
LogstreamResult = walrcv->LogstreamResult;
+ conninfo = pstrdup((char *) walrcv->conninfo);
SpinLockRelease(&walrcv->mutex);
/* Report XLOG streaming progress in PS display */
ReportLogstreamResult();
}
/* Set up a connection for XLOG streaming */
streamConn = PQstartXLogStreaming(conninfo,
LogstreamResult.Write.xlogid,
LogstreamResult.Write.xrecoff);
if (PQstatus(streamConn) != CONNECTION_OK)
! ereport(ERROR,
(errmsg("could not connect to the primary server : %s",
PQerrorMessage(streamConn))));
!
! /*
! * Confirm that the system identifier of the primary is the same
! * as ours.
! */
! system_identifier = (uint64 *) PQsystemIdentifier(streamConn);
! if (*system_identifier != WalRcv->SystemIdentifier)
! {
! char primary_sysid[32];
! char standby_sysid[32];
!
! /*
! * Format sysids separately to keep platform-dependent format code
! * out of the translatable message string.
! */
! snprintf(primary_sysid, sizeof(primary_sysid), UINT64_FORMAT,
! *system_identifier);
! snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
! WalRcv->SystemIdentifier);
! ereport(FATAL,
! (errmsg("system differs between the primary and standby"),
! errdetail("the primary SYSID is %s, standby SYSID is %s",
! primary_sysid, standby_sysid)));
! }
/*
* Confirm that the current timeline of the primary is the same
***************
*** 218,224 ****
static void
WalRcvLoop(void)
{
- char *buf;
bool finishing_seg;
bool fsync_requested;
int len;
--- 290,295 ----
***************
*** 254,267 ****
proc_exit(0);
/* Receive XLogData message (wait for new message to arrive) */
! len = PQgetXLogData(streamConn, &buf,
(int *) &recptr.xlogid, (int *) &recptr.xrecoff,
(char *) &finishing_seg, (char *) &fsync_requested, 0);
if (len < 0) /* end-of-streaming or error */
break;
! if (buf == NULL) /* should not happen */
continue;
#ifdef REPLICATION_DEBUG
--- 325,338 ----
proc_exit(0);
/* Receive XLogData message (wait for new message to arrive) */
! len = PQgetXLogData(streamConn, &recvBuf,
(int *) &recptr.xlogid, (int *) &recptr.xrecoff,
(char *) &finishing_seg, (char *) &fsync_requested, 0);
if (len < 0) /* end-of-streaming or error */
break;
! if (recvBuf == NULL) /* should not happen */
continue;
#ifdef REPLICATION_DEBUG
***************
*** 295,307 ****
* can recover all transactions from the primary).
*/
! XLogWalRcvWrite(buf, len, recptr);
!
! /*
! * The logs in the XLogData message were written successfully,
! * so we mark the message already consumed.
! */
! PQmarkConsumed(streamConn);
/*
* If the primary requested us to fsync, do so now and send
--- 366,373 ----
* can recover all transactions from the primary).
*/
! XLogWalRcvWrite(recvBuf, len, recptr);
! PQfreemem(recvBuf);
/*
* If the primary requested us to fsync, do so now and send
***************
*** 325,331 ****
}
/* error */
! ereport(FATAL,
(errmsg("could not read xlog records: %s",
PQerrorMessage(streamConn))));
}
--- 391,397 ----
}
/* error */
! ereport(ERROR,
(errmsg("could not read xlog records: %s",
PQerrorMessage(streamConn))));
}
***************
*** 462,467 ****
--- 528,543 ----
return walrcv->in_progress;
}
+ /* Called from postmaster to report a failure to fork walreceiver */
+ void
+ WalRcvFailed(void)
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalRcvData *walrcv = WalRcv;
+
+ walrcv->in_progress = false;
+ }
+
/*
* Write the log to disk.
*
***************
*** 665,671 ****
/* Let the primary know */
if (PQputXLogRecPtr(streamConn, LogstreamResult.Flush.xlogid,
LogstreamResult.Flush.xrecoff, 1) == -1)
! ereport(FATAL,
(errmsg("could not send a message to the primary: %s",
PQerrorMessage(streamConn))));
}
--- 741,747 ----
/* Let the primary know */
if (PQputXLogRecPtr(streamConn, LogstreamResult.Flush.xlogid,
LogstreamResult.Flush.xrecoff, 1) == -1)
! ereport(ERROR,
(errmsg("could not send a message to the primary: %s",
PQerrorMessage(streamConn))));
}
***************
*** 877,884 ****
--- 953,963 ----
SpinLockAcquire(&walrcv->mutex);
walrcv->LogstreamResult = LogstreamResult;
+ walrcv->SystemIdentifier = GetSystemIdentifier();
if (tli != 0)
walrcv->RecoveryTargetTLI = tli;
+ if (PrimaryConnInfo != NULL)
+ StrNCpy((char *) walrcv->conninfo, PrimaryConnInfo, MAXCONNINFO);
walrcv->in_progress = true; /* Mark that walreceiver is in progress */
SpinLockRelease(&walrcv->mutex);
***************
*** 905,988 ****
return recptr;
}
-
- /* Write the connection information to the file */
- void
- write_conninfo_file(char *conninfo)
- {
- FILE *fp;
-
- fp = AllocateFile(CONNINFO_FILENAME, "w");
- if (!fp)
- {
- ereport(FATAL,
- (errcode_for_file_access(),
- errmsg("could not write to file \"%s\": %m",
- CONNINFO_FILENAME)));
- }
-
- /*
- * The format is:
- *
- * conninfo string, null terminated
- *
- * If a connection information was not supplied (e.g., recovery.conf did not
- * specify primary_conninfo parameter), an empty string is written, which
- * means that the default values that are available from the environment etc
- * are used for connection of XLOG streaming.
- *
- * Add 'replication' as the database name to connect to, into the tail of
- * conninfo. Since libpq prefers a posteriorly-located setting, the database
- * name specified by an user is always ignored.
- */
- if (conninfo != NULL)
- fprintf(fp, "%s", conninfo);
- fputs(" dbname=replication", fp);
- fputc(0, fp);
-
- if (FreeFile(fp))
- {
- ereport(FATAL,
- (errcode_for_file_access(),
- errmsg("could not write to file \"%s\": %m",
- CONNINFO_FILENAME)));
- }
- }
-
- /* Return a malloc'd connection information read from the file */
- static char *
- read_conninfo_file(void)
- {
- FILE *fp;
- StringInfoData buf;
- int ch;
- char *conninfo;
-
- initStringInfo(&buf);
-
- fp = AllocateFile(CONNINFO_FILENAME, "r");
- if (!fp)
- {
- ereport(FATAL,
- (errcode_for_file_access(),
- errmsg("could not read from file \"%s\": %m",
- CONNINFO_FILENAME)));
- }
-
- /* Read a string to a null-termination or the end of the file */
- for (;;)
- {
- ch = fgetc(fp);
- if (ch == 0 || ch == EOF)
- break;
-
- appendStringInfoChar(&buf, (char) ch);
- }
-
- FreeFile(fp);
-
- conninfo = pstrdup(buf.data);
- pfree(buf.data);
-
- return conninfo;
- }
--- 984,986 ----
diff -rcN base/src/backend/postmaster/walsender.c new/src/backend/postmaster/walsender.c
*** base/src/backend/postmaster/walsender.c 2009-09-21 16:48:36.000000000 +0900
--- new/src/backend/postmaster/walsender.c 2009-09-29 22:09:42.000000000 +0900
***************
*** 133,145 ****
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser to start walsender")));
- /* Check whether current database is for replication */
- if (strcmp(MyProcPort->database_name, "replication") != 0)
- ereport(FATAL,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg("current database \"%s\" is not for replication",
- MyProcPort->database_name)));
-
/* Create a per-walsender data structure in shared memory */
InitWalSnd();
--- 133,138 ----
***************
*** 166,187 ****
PG_SETMASK(&UnBlockSig);
/*
! * Inform the standby of the current timeline ID only once. That timeline
! * ID will not change while this walsender process is in progress.
*/
{
StringInfoData buf;
pq_beginmessage(&buf, 'l');
pq_sendint(&buf, ThisTimeLineID, 4);
pq_endmessage(&buf);
/* Need not flush since ReadyForQuery will do it. */
}
ReadyForQuery(DestRemote);
- /* Tell the collector that we are streaming xlog */
- pgstat_report_activity("XLOG streaming");
-
MaxPagesPerXLogData = XLOGbuffers;
/* Main loop of walsender */
--- 159,182 ----
PG_SETMASK(&UnBlockSig);
/*
! * Inform the standby of the system identifier and current timeline ID only
! * once, since they will not change while this walsender is in progress.
! *
! * The byte order of the system identifier does not need to be considered,
! * since xlog streaming between the different architecture is not supported.
*/
{
StringInfoData buf;
+ uint64 sysid = GetSystemIdentifier();
pq_beginmessage(&buf, 'l');
pq_sendint(&buf, ThisTimeLineID, 4);
+ pq_sendbytes(&buf, (char *) &sysid, sizeof(sysid));
pq_endmessage(&buf);
/* Need not flush since ReadyForQuery will do it. */
}
ReadyForQuery(DestRemote);
MaxPagesPerXLogData = XLOGbuffers;
/* Main loop of walsender */
diff -rcN base/src/backend/utils/init/postinit.c new/src/backend/utils/init/postinit.c
*** base/src/backend/utils/init/postinit.c 2009-09-21 16:48:36.000000000 +0900
--- new/src/backend/utils/init/postinit.c 2009-09-29 21:37:10.000000000 +0900
***************
*** 35,40 ****
--- 35,41 ----
#include "pgstat.h"
#include "postmaster/autovacuum.h"
#include "postmaster/postmaster.h"
+ #include "postmaster/walsender.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/ipc.h"
***************
*** 467,472 ****
--- 468,474 ----
* In bootstrap mode no parameters are used. The autovacuum launcher process
* doesn't use any parameters either, because it only goes far enough to be
* able to read pg_database; it doesn't connect to any particular database.
+ * In walsender mode only username is used.
*
* As of PostgreSQL 8.2, we expect InitProcess() was already called, so we
* already have a PGPROC struct ... but it's not completely filled in yet.
***************
*** 578,587 ****
* Set up the global variables holding database id and default tablespace.
* But note we won't actually try to touch the database just yet.
*
! * We take a shortcut in the bootstrap case, otherwise we have to look up
! * the db's entry in pg_database.
*/
! if (bootstrap)
{
MyDatabaseId = TemplateDbOid;
MyDatabaseTableSpace = DEFAULTTABLESPACE_OID;
--- 580,589 ----
* Set up the global variables holding database id and default tablespace.
* But note we won't actually try to touch the database just yet.
*
! * We take a shortcut in the bootstrap and walsender case, otherwise we
! * have to look up the db's entry in pg_database.
*/
! if (bootstrap || am_walsender)
{
MyDatabaseId = TemplateDbOid;
MyDatabaseTableSpace = DEFAULTTABLESPACE_OID;
***************
*** 644,650 ****
* AccessShareLock for such sessions and thereby not conflict against
* CREATE DATABASE.
*/
! if (!bootstrap)
LockSharedObject(DatabaseRelationId, MyDatabaseId, 0,
RowExclusiveLock);
--- 646,652 ----
* AccessShareLock for such sessions and thereby not conflict against
* CREATE DATABASE.
*/
! if (!bootstrap && !am_walsender)
LockSharedObject(DatabaseRelationId, MyDatabaseId, 0,
RowExclusiveLock);
***************
*** 653,659 ****
* If there was a concurrent DROP DATABASE, this ensures we will die
* cleanly without creating a mess.
*/
! if (!bootstrap)
{
HeapTuple tuple;
--- 655,661 ----
* If there was a concurrent DROP DATABASE, this ensures we will die
* cleanly without creating a mess.
*/
! if (!bootstrap && !am_walsender)
{
HeapTuple tuple;
***************
*** 673,679 ****
*/
fullpath = GetDatabasePath(MyDatabaseId, MyDatabaseTableSpace);
! if (!bootstrap)
{
if (access(fullpath, F_OK) == -1)
{
--- 675,681 ----
*/
fullpath = GetDatabasePath(MyDatabaseId, MyDatabaseTableSpace);
! if (!bootstrap && !am_walsender)
{
if (access(fullpath, F_OK) == -1)
{
***************
*** 745,751 ****
* database-access infrastructure is up. (Also, it wants to know if the
* user is a superuser, so the above stuff has to happen first.)
*/
! if (!bootstrap)
CheckMyDatabase(dbname, am_superuser);
/*
--- 747,753 ----
* database-access infrastructure is up. (Also, it wants to know if the
* user is a superuser, so the above stuff has to happen first.)
*/
! if (!bootstrap && !am_walsender)
CheckMyDatabase(dbname, am_superuser);
/*
***************
*** 842,847 ****
--- 844,853 ----
/* initialize client encoding */
InitializeClientEncoding();
+ /* reset the database for walsender */
+ if (am_walsender)
+ MyProc->databaseId = MyDatabaseId = InvalidOid;
+
/* report this backend in the PgBackendStatus array */
if (!bootstrap)
pgstat_bestart();
diff -rcN base/src/bin/initdb/initdb.c new/src/bin/initdb/initdb.c
*** base/src/bin/initdb/initdb.c 2009-09-21 16:48:36.000000000 +0900
--- new/src/bin/initdb/initdb.c 2009-09-29 21:37:10.000000000 +0900
***************
*** 19,25 ****
*
* For largely-historical reasons, the template1 database is the one built
* by the basic bootstrap process. After it is complete, template0 and
! * the default database, postgres, replication, are made just by copying template1.
*
* To create template1, we run the postgres (backend) program in bootstrap
* mode and feed it data from the postgres.bki library file. After this
--- 19,25 ----
*
* For largely-historical reasons, the template1 database is the one built
* by the basic bootstrap process. After it is complete, template0 and
! * the default database, postgres, are made just by copying template1.
*
* To create template1, we run the postgres (backend) program in bootstrap
* mode and feed it data from the postgres.bki library file. After this
***************
*** 2003,2039 ****
check_ok();
}
- /*
- * copy template1 to replication
- */
- static void
- make_replication(void)
- {
- PG_CMD_DECL;
- const char **line;
- static const char *postgres_setup[] = {
- "CREATE DATABASE replication;\n",
- NULL
- };
-
- fputs(_("copying template1 to replication ... "), stdout);
- fflush(stdout);
-
- snprintf(cmd, sizeof(cmd),
- "\"%s\" %s template1 >%s",
- backend_exec, backend_options,
- DEVNULL);
-
- PG_CMD_OPEN;
-
- for (line = postgres_setup; *line; line++)
- PG_CMD_PUTS(*line);
-
- PG_CMD_CLOSE;
-
- check_ok();
- }
-
/*
* signal handler in case we are interrupted.
--- 2003,2008 ----
***************
*** 3171,3178 ****
make_postgres();
- make_replication();
-
if (authwarning != NULL)
fprintf(stderr, "%s", authwarning);
--- 3140,3145 ----
diff -rcN base/src/include/access/xlog.h new/src/include/access/xlog.h
*** base/src/include/access/xlog.h 2009-09-21 16:48:36.000000000 +0900
--- new/src/include/access/xlog.h 2009-09-29 21:37:10.000000000 +0900
***************
*** 204,209 ****
--- 204,210 ----
extern XLogstreamResult LogstreamResult;
+ extern char *PrimaryConnInfo;
extern char *TriggerFile;
***************
*** 234,239 ****
--- 235,241 ----
extern bool XLogInsertAllowed(void);
extern void UpdateControlFile(void);
+ extern uint64 GetSystemIdentifier(void);
extern Size XLOGShmemSize(void);
extern void XLOGShmemInit(void);
extern void BootStrapXLOG(void);
diff -rcN base/src/include/postmaster/walreceiver.h new/src/include/postmaster/walreceiver.h
*** base/src/include/postmaster/walreceiver.h 2009-09-21 16:48:36.000000000 +0900
--- new/src/include/postmaster/walreceiver.h 2009-09-30 00:08:26.000000000 +0900
***************
*** 14,25 ****
--- 14,38 ----
#include "storage/spin.h"
+ /*
+ * MAXCONNINFO: maximum size of a connection string.
+ *
+ * XXX: Should this move to pg_config_manual.h?
+ */
+ #define MAXCONNINFO 1024
+
/* Shared memory area for management of walreceiver process */
typedef struct
{
pid_t pid; /* walreceiver's process id, or 0 */
/*
+ * connection string; is used for walreceiver to connect with
+ * the primary.
+ */
+ char conninfo[MAXCONNINFO];
+
+ /*
* in_progress indicates whether walreceiver is in progress
* (or just starting up). This flag is set to TRUE when
* startup process requests walreceiver to start XLOG streaming,
***************
*** 36,41 ****
--- 49,60 ----
XLogstreamResult LogstreamResult;
/*
+ * unique system identifier; must be the same as the system
+ * identifier of the primary.
+ */
+ uint64 SystemIdentifier;
+
+ /*
* recovery target timeline; must be the same as the current
* timeline of the primary.
*/
***************
*** 48,58 ****
extern Size WalRcvShmemSize(void);
extern void WalRcvShmemInit(void);
extern bool WalRcvInProgress(void);
extern void WaitNextXLogAvailable(XLogRecPtr recptr);
extern void ShutdownWalRcv(void);
extern void WaitForTrigger(void);
extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr);
extern XLogRecPtr GetWalRcvWriteRecPtr(void);
- extern void write_conninfo_file(char *conninfo);
#endif /* _WALRECEIVER_H */
--- 67,77 ----
extern Size WalRcvShmemSize(void);
extern void WalRcvShmemInit(void);
extern bool WalRcvInProgress(void);
+ extern void WalRcvFailed(void);
extern void WaitNextXLogAvailable(XLogRecPtr recptr);
extern void ShutdownWalRcv(void);
extern void WaitForTrigger(void);
extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr);
extern XLogRecPtr GetWalRcvWriteRecPtr(void);
#endif /* _WALRECEIVER_H */
diff -rcN base/src/include/postmaster/walsender.h new/src/include/postmaster/walsender.h
*** base/src/include/postmaster/walsender.h 2009-09-21 16:48:36.000000000 +0900
--- new/src/include/postmaster/walsender.h 2009-09-29 21:37:10.000000000 +0900
***************
*** 53,61 ****
extern int MaxWalSnds;
extern int WalSndDelay;
! /* Is XLOG streaming allowed? This is used to decide if WAL-logging is necessary */
#define XLogStreamingAllowed() (MaxWalSnds > 0)
extern int WalSenderMain(void);
extern void WalSndSignals(void);
extern Size WalSndShmemSize(void);
--- 53,67 ----
extern int MaxWalSnds;
extern int WalSndDelay;
! /* Is XLOG streaming allowed? */
#define XLogStreamingAllowed() (MaxWalSnds > 0)
+ /*
+ * Is WAL-logging necessary? We need to log an XLOG record iff either
+ * WAL archiving is enabled or XLOG streaming is allowed.
+ */
+ #define XLogIsNeeded() (XLogArchivingActive() || XLogStreamingAllowed())
+
extern int WalSenderMain(void);
extern void WalSndSignals(void);
extern Size WalSndShmemSize(void);
diff -rcN base/src/interfaces/libpq/exports.txt new/src/interfaces/libpq/exports.txt
*** base/src/interfaces/libpq/exports.txt 2009-09-21 16:48:36.000000000 +0900
--- new/src/interfaces/libpq/exports.txt 2009-09-29 21:37:10.000000000 +0900
***************
*** 155,160 ****
PQinitOpenSSL 153
PQstartXLogStreaming 154
PQgetXLogData 155
! PQmarkConsumed 156
! PQputXLogRecPtr 157
! PQtimeline 158
--- 155,160 ----
PQinitOpenSSL 153
PQstartXLogStreaming 154
PQgetXLogData 155
! PQputXLogRecPtr 156
! PQtimeline 157
! PQsystemIdentifier 158
diff -rcN base/src/interfaces/libpq/fe-connect.c new/src/interfaces/libpq/fe-connect.c
*** base/src/interfaces/libpq/fe-connect.c 2009-09-21 16:48:36.000000000 +0900
--- new/src/interfaces/libpq/fe-connect.c 2009-09-29 21:48:40.000000000 +0900
***************
*** 2123,2128 ****
--- 2123,2130 ----
free(conn->inBuffer);
if (conn->outBuffer)
free(conn->outBuffer);
+ if (conn->sysid)
+ free(conn->sysid);
termPQExpBuffer(&conn->errorMessage);
termPQExpBuffer(&conn->workBuffer);
***************
*** 3719,3724 ****
--- 3721,3734 ----
return conn->be_pid;
}
+ char *
+ PQsystemIdentifier(const PGconn *conn)
+ {
+ if (!conn)
+ return NULL;
+ return conn->sysid;
+ }
+
int
PQtimeline(const PGconn *conn)
{
diff -rcN base/src/interfaces/libpq/fe-exec.c new/src/interfaces/libpq/fe-exec.c
*** base/src/interfaces/libpq/fe-exec.c 2009-09-21 16:48:36.000000000 +0900
--- new/src/interfaces/libpq/fe-exec.c 2009-09-29 21:37:10.000000000 +0900
***************
*** 2316,2329 ****
* PQgetXLogData - obtain a set of XLOG records from XLOG stream
*
* If successful, returns the length of XLOG records (always > 0) as result,
! * sets *buffer to a pointer to them, and sets xlogid/xrecoff to the ending
! * XLOG location. finishing_seg is set to true (= 1) if XLOG records are
! * at the end of segment, false (= 0) otherwise. fsync_requested is set to
! * true if XLOG records need to be fsynced, false otherwise.
! *
! * Since *buffer points to the internal buffer which is reused repeatedly,
! * the caller doesn't need to free it, instead must mark it as consumed
! * by calling PQmarkConsumed, when it's no longer necessary.
*
* Returns 0 if no records available yet (only possible if async is true),
* -1 if end of streaming (consult PQgetResult), or -2 if error (consult
--- 2316,2325 ----
* PQgetXLogData - obtain a set of XLOG records from XLOG stream
*
* If successful, returns the length of XLOG records (always > 0) as result,
! * sets *buffer to a pointer to a malloc'd records, and sets xlogid/xrecoff
! * to the starting XLOG location. finishing_seg is set to true (= 1) if XLOG
! * records are at the end of segment, false (= 0) otherwise. fsync_requested
! * is set to true if XLOG records need to be fsynced, false otherwise.
*
* Returns 0 if no records available yet (only possible if async is true),
* -1 if end of streaming (consult PQgetResult), or -2 if error (consult
***************
*** 3424,3442 ****
*retbuflen = buflen;
return tmpbuf;
}
-
-
- /*
- * PQmarkConsumed - mark the read message consumed
- *
- * Some libpq functions don't immediately consume a message because
- * it might need to be read again. On the other hand, until it has
- * been consumed, we cannot read a next message. So, when a message
- * becomes unnecessary any longer, we call this function explicitly
- * and mark it consumed.
- */
- void
- PQmarkConsumed(PGconn *conn)
- {
- conn->inStart = conn->inCursor;
- }
--- 3420,3422 ----
diff -rcN base/src/interfaces/libpq/fe-protocol3.c new/src/interfaces/libpq/fe-protocol3.c
*** base/src/interfaces/libpq/fe-protocol3.c 2009-09-21 16:48:36.000000000 +0900
--- new/src/interfaces/libpq/fe-protocol3.c 2009-09-29 22:52:50.000000000 +0900
***************
*** 50,55 ****
--- 50,56 ----
static int getParameterStatus(PGconn *conn);
static int getNotify(PGconn *conn);
static int getCopyStart(PGconn *conn, ExecStatusType copytype);
+ static int getXLogStart(PGconn *conn, int msgLength);
static int getReadyForQuery(PGconn *conn);
static void reportErrorPosition(PQExpBuffer msg, const char *query,
int loc, int encoding);
***************
*** 380,386 ****
*/
break;
case 'l': /* XLog Streaming Start */
! if (pqGetInt((int *)&conn->timeline, 4, conn))
return;
break;
case 'w': /* XLog Data */
--- 381,387 ----
*/
break;
case 'l': /* XLog Streaming Start */
! if (getXLogStart(conn, msgLength))
return;
break;
case 'w': /* XLog Data */
***************
*** 1662,1667 ****
--- 1663,1702 ----
/*
+ * getXLogStart - process ReplicationStart message
+ *
+ * parseInput already read the message type and length.
+ */
+ static int
+ getXLogStart(PGconn *conn, int msgLength)
+ {
+ msgLength -= 4;
+ if (msgLength <= 0 || msgLength > 8)
+ {
+ printfPQExpBuffer(&conn->errorMessage,
+ libpq_gettext("invalid message length\n"));
+ return EOF;
+ }
+
+ if (pqGetInt((int *)&conn->timeline, 4, conn))
+ return EOF;
+
+ conn->sysid = (char *) malloc(msgLength + 1);
+ if (conn->sysid == NULL)
+ {
+ printfPQExpBuffer(&conn->errorMessage,
+ libpq_gettext("out of memory\n"));
+ return EOF;
+ }
+
+ if (pqGetnchar(conn->sysid, msgLength, conn))
+ return EOF;
+ conn->sysid[msgLength] = '\0'; /* Add terminating null */
+
+ return 0;
+ }
+
+ /*
* getXLogMessage - fetch next XLogData message, process async messages
*
* Returns length word of XLogData message (> 0), or 0 if no complete
***************
*** 1748,1761 ****
* pqGetXLogData3 - obtain a set of XLOG records from XLOG stream
*
* If successful, returns the length of XLOG records (always > 0) as result,
! * sets *buffer to a pointer to them, and sets xlogid/xrecoff to the ending
! * XLOG location. finishing_seg is set to true (= 1) if XLOG records are
! * at the end of segment, false (= 0) otherwise. fsync_requested is set to
! * true if XLOG records need to be fsynced, false otherwise.
! *
! * Since *buffer points to the internal buffer which is reused repeatedly,
! * the caller doesn't need to free it, instead must mark it as consumed
! * by calling PQmarkConsumed, when it's no longer necessary.
*
* Returns 0 if no records available yet (only possible if async is true),
* -1 if end of streaming (consult PQgetResult), or -2 if error (consult
--- 1783,1792 ----
* pqGetXLogData3 - obtain a set of XLOG records from XLOG stream
*
* If successful, returns the length of XLOG records (always > 0) as result,
! * sets *buffer to a pointer to a malloc'd records, and sets xlogid/xrecoff
! * to the starting XLOG location. finishing_seg is set to true (= 1) if XLOG
! * records are at the end of segment, false (= 0) otherwise. fsync_requested
! * is set to true if XLOG records need to be fsynced, false otherwise.
*
* Returns 0 if no records available yet (only possible if async is true),
* -1 if end of streaming (consult PQgetResult), or -2 if error (consult
***************
*** 1816,1823 ****
*finishing_seg = (flags & XLOGSTREAM_END_SEG) != 0;
*fsync_requested = (flags & XLOGSTREAM_FLUSH) != 0;
! *buffer = &conn->inBuffer[conn->inCursor];
! conn->inCursor += msgLength; /* cannot mark message consumed yet here */
return msgLength;
}
--- 1847,1865 ----
*finishing_seg = (flags & XLOGSTREAM_END_SEG) != 0;
*fsync_requested = (flags & XLOGSTREAM_FLUSH) != 0;
! *buffer = (char *) malloc(msgLength + 1);
! if (*buffer == NULL)
! {
! printfPQExpBuffer(&conn->errorMessage,
! libpq_gettext("out of memory\n"));
! return -2;
! }
! memcpy(*buffer, &conn->inBuffer[conn->inCursor], msgLength);
! (*buffer)[msgLength] = '\0'; /* Add terminating null */
!
! /* Mark message consumed */
! conn->inStart = conn->inCursor + msgLength;
!
return msgLength;
}
diff -rcN base/src/interfaces/libpq/libpq-fe.h new/src/interfaces/libpq/libpq-fe.h
*** base/src/interfaces/libpq/libpq-fe.h 2009-09-21 16:48:36.000000000 +0900
--- new/src/interfaces/libpq/libpq-fe.h 2009-09-29 21:37:10.000000000 +0900
***************
*** 290,295 ****
--- 290,296 ----
extern char *PQerrorMessage(const PGconn *conn);
extern int PQsocket(const PGconn *conn);
extern int PQbackendPID(const PGconn *conn);
+ extern char *PQsystemIdentifier(const PGconn *conn);
extern int PQtimeline(const PGconn *conn);
extern int PQconnectionNeedsPassword(const PGconn *conn);
extern int PQconnectionUsedPassword(const PGconn *conn);
***************
*** 492,500 ****
extern unsigned char *PQescapeBytea(const unsigned char *from, size_t from_length,
size_t *to_length);
- /* Mark message consumed */
- extern void PQmarkConsumed(PGconn *conn);
-
/* === in fe-print.c === */
--- 493,498 ----
diff -rcN base/src/interfaces/libpq/libpq-int.h new/src/interfaces/libpq/libpq-int.h
*** base/src/interfaces/libpq/libpq-int.h 2009-09-21 16:48:36.000000000 +0900
--- new/src/interfaces/libpq/libpq-int.h 2009-09-29 21:37:10.000000000 +0900
***************
*** 353,358 ****
--- 353,359 ----
/* XLog streaming stuff */
bool xlog_streaming; /* XLOG streaming in progress? */
+ char *sysid; /* system identifier of backend */
int timeline; /* timeline ID of backend */
/* Miscellaneous stuff */