[PATCH 13/16] Introduction of pair of logical walreceiver/sender

From: Andres Freund <andres(at)2ndquadrant(dot)com>
To: pgsql-hackers(at)postgresql(dot)org
Subject: [PATCH 13/16] Introduction of pair of logical walreceiver/sender
Date: 2012-06-13 11:28:44
Message-ID: 1339586927-13156-13-git-send-email-andres@2ndquadrant.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

From: Andres Freund <andres(at)anarazel(dot)de>

A logical WALReceiver is started directly by Postmaster when we enter PM_RUN
state and the new parameter multimaster_conninfo is set. For now only one of
those is started, but the code doesn't rely on that. In future multiple ones
should be allowed.

To transfer that data a new command, START_LOGICAL_REPLICATION is introduced in
the walsender reusing most of the infrastructure for START_REPLICATION. The
former uses the same on-the-wire format as the latter.

To make initialization possibly IDENTIFY_SYSTEM returns two new columns node_id
returning the multimaster_node_id and last_checkpoint returning the RedoRecPtr.

The walreceiver writes that data into the previously introduce pg_lcr/$node_id
directory.

Future Directions/TODO:
- pass node_ids were interested in to START_LOGICAL_REPLICATION to allow
complex topologies
- allow to pass filters to reduce the transfer volume
- compress the transferred data by actually removing uninteresting records
instead of replacing them by NOOP records. This adds some complexities
because we still need to map the received lsn to the requested lsn so we know
where to restart transferring data and such.
- check that wal on the sending side was generated with WAL_LEVEL_LOGICAL
---
src/backend/postmaster/postmaster.c | 10 +-
.../libpqwalreceiver/libpqwalreceiver.c | 104 ++++-
src/backend/replication/repl_gram.y | 19 +-
src/backend/replication/repl_scanner.l | 1 +
src/backend/replication/walreceiver.c | 165 +++++++-
src/backend/replication/walreceiverfuncs.c | 1 +
src/backend/replication/walsender.c | 422 +++++++++++++++-----
src/backend/utils/misc/guc.c | 9 +
src/backend/utils/misc/postgresql.conf.sample | 1 +
src/include/nodes/nodes.h | 1 +
src/include/nodes/replnodes.h | 10 +
src/include/replication/logical.h | 4 +
src/include/replication/walreceiver.h | 9 +-
13 files changed, 624 insertions(+), 132 deletions(-)

diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 71cfd6d..13e9592 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -1449,6 +1449,11 @@ ServerLoop(void)
kill(AutoVacPID, SIGUSR2);
}

+ /* Restart walreceiver process in certain states only. */
+ if (WalReceiverPID == 0 && pmState == PM_RUN &&
+ LogicalWalReceiverActive())
+ WalReceiverPID = StartWalReceiver();
+
/* Check all the workers requested are running. */
if (pmState == PM_RUN)
StartBackgroundWorkers();
@@ -2169,7 +2174,8 @@ pmdie(SIGNAL_ARGS)
/* and the walwriter too */
if (WalWriterPID != 0)
signal_child(WalWriterPID, SIGTERM);
-
+ if (WalReceiverPID != 0)
+ signal_child(WalReceiverPID, SIGTERM);
/*
* If we're in recovery, we can't kill the startup process
* right away, because at present doing so does not release
@@ -2421,6 +2427,8 @@ reaper(SIGNAL_ARGS)
PgArchPID = pgarch_start();
if (PgStatPID == 0)
PgStatPID = pgstat_start();
+ if (WalReceiverPID == 0 && LogicalWalReceiverActive())
+ WalReceiverPID = StartWalReceiver();
StartBackgroundWorkers();

/* at this point we are really open for business */
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 979b66b..0ea3fce 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -46,7 +46,8 @@ static PGconn *streamConn = NULL;
static char *recvBuf = NULL;

/* Prototypes for interface functions */
-static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint);
+static bool libpqrcv_connect(char *conninfo, XLogRecPtr* redo, XLogRecPtr* where_at, bool startedDuringRecovery);
+static bool libpqrcv_start(char *conninfo, XLogRecPtr* startpoint, bool startedDuringRecovery);
static bool libpqrcv_receive(int timeout, unsigned char *type,
char **buffer, int *len);
static void libpqrcv_send(const char *buffer, int nbytes);
@@ -63,10 +64,12 @@ void
_PG_init(void)
{
/* Tell walreceiver how to reach us */
- if (walrcv_connect != NULL || walrcv_receive != NULL ||
- walrcv_send != NULL || walrcv_disconnect != NULL)
+ if (walrcv_connect != NULL || walrcv_start != NULL ||
+ walrcv_receive != NULL || walrcv_send != NULL ||
+ walrcv_disconnect != NULL)
elog(ERROR, "libpqwalreceiver already loaded");
walrcv_connect = libpqrcv_connect;
+ walrcv_start = libpqrcv_start;
walrcv_receive = libpqrcv_receive;
walrcv_send = libpqrcv_send;
walrcv_disconnect = libpqrcv_disconnect;
@@ -76,7 +79,7 @@ _PG_init(void)
* Establish the connection to the primary server for XLOG streaming
*/
static bool
-libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
+libpqrcv_connect(char *conninfo, XLogRecPtr* redo, XLogRecPtr* where_at, bool startedDuringRecovery)
{
char conninfo_repl[MAXCONNINFO + 75];
char *primary_sysid;
@@ -84,7 +87,8 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
TimeLineID primary_tli;
TimeLineID standby_tli;
PGresult *res;
- char cmd[64];
+
+ elog(LOG, "wal receiver connecting");

/*
* Connect using deliberately undocumented parameter: replication. The
@@ -96,10 +100,16 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
conninfo);

streamConn = PQconnectdb(conninfo_repl);
- if (PQstatus(streamConn) != CONNECTION_OK)
+ if (PQstatus(streamConn) != CONNECTION_OK){
+ /*
+ * FIXME: its very annoying for development if the whole buffer is
+ * immediately filled. We need a better solution.
+ */
+ pg_usleep(1000000);
ereport(ERROR,
(errmsg("could not connect to the primary server: %s",
PQerrorMessage(streamConn))));
+ }

/*
* Get the system identifier and timeline ID as a DataRow message from the
@@ -114,7 +124,7 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
"the primary server: %s",
PQerrorMessage(streamConn))));
}
- if (PQnfields(res) != 3 || PQntuples(res) != 1)
+ if (PQnfields(res) != 5 || PQntuples(res) != 1)
{
int ntuples = PQntuples(res);
int nfields = PQnfields(res);
@@ -122,14 +132,40 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
PQclear(res);
ereport(ERROR,
(errmsg("invalid response from primary server"),
- errdetail("Expected 1 tuple with 3 fields, got %d tuples with %d fields.",
+ errdetail("Expected 1 tuple with 5 fields, got %d tuples with %d fields.",
ntuples, nfields)));
}
primary_sysid = PQgetvalue(res, 0, 0);
+
primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);

+ /* FIXME: this should be already implemented nicely somewhere? */
+ if(sscanf(PQgetvalue(res, 0, 2),
+ "%X/%X", &where_at->xlogid, &where_at->xrecoff) != 2){
+ elog(FATAL, "couldn't parse the xlog address from the other side: %s",
+ PQgetvalue(res, 0, 2));
+ }
+
+ elog(LOG, "other end is currently at %X/%X",
+ where_at->xlogid, where_at->xrecoff);
+
+ receiving_from_node_id = pg_atoi(PQgetvalue(res, 0, 3), 4, 0);
+
+ /* FIXME: this should be already implemented nicely somewhere? */
+ if(sscanf(PQgetvalue(res, 0, 4),
+ "%X/%X", &redo->xlogid, &redo->xrecoff) != 2){
+ elog(FATAL, "couldn't parse the xlog address from the other side: %s",
+ PQgetvalue(res, 0, 4));
+ }
+
+ elog(LOG, "other end's redo is currently at %X/%X",
+ redo->xlogid, redo->xrecoff);
+
+
/*
* Confirm that the system identifier of the primary is the same as ours.
+ *
+ * FIXME: do we wan't that restriction for mm?
*/
snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
GetSystemIdentifier());
@@ -142,21 +178,49 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
primary_sysid, standby_sysid)));
}

- /*
- * Confirm that the current timeline of the primary is the same as the
- * recovery target timeline.
- */
- standby_tli = GetRecoveryTargetTLI();
PQclear(res);
- if (primary_tli != standby_tli)
- ereport(ERROR,
- (errmsg("timeline %u of the primary does not match recovery target timeline %u",
- primary_tli, standby_tli)));
- ThisTimeLineID = primary_tli;

/* Start streaming from the point requested by startup process */
- snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
- startpoint.xlogid, startpoint.xrecoff);
+ if (startedDuringRecovery)
+ {
+ /*
+ * Confirm that the current timeline of the primary is the same as the
+ * recovery target timeline.
+ */
+ standby_tli = GetRecoveryTargetTLI();
+ if (primary_tli != standby_tli)
+ ereport(ERROR,
+ (errmsg("timeline %u of the primary does not match recovery target timeline %u",
+ primary_tli, standby_tli)));
+ ThisTimeLineID = primary_tli;
+ }
+
+ return true;
+}
+
+/*
+ * start streaming data
+ */
+static bool
+libpqrcv_start(char *conninfo, XLogRecPtr* startpoint, bool startedDuringRecovery)
+{
+ PGresult *res;
+ char cmd[64];
+
+ if(startedDuringRecovery)
+ {
+ snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
+ startpoint->xlogid, startpoint->xrecoff);
+ }
+ else
+ {
+ /* ignore the timeline */
+ elog(LOG, "receiving_from_node_id: %u at %X/%X", receiving_from_node_id,
+ startpoint->xlogid, startpoint->xrecoff);
+ snprintf(cmd, sizeof(cmd), "START_LOGICAL_REPLICATION %X/%X",
+ startpoint->xlogid, startpoint->xrecoff);
+ }
+
res = libpqrcv_PQexec(cmd);
if (PQresultStatus(res) != PGRES_COPY_BOTH)
{
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index b6cfdac..b49ae6f 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -76,9 +76,10 @@ Node *replication_parse_result;
%token K_NOWAIT
%token K_WAL
%token K_START_REPLICATION
+%token K_START_LOGICAL_REPLICATION

%type <node> command
-%type <node> base_backup start_replication identify_system
+%type <node> base_backup start_replication start_logical_replication identify_system
%type <list> base_backup_opt_list
%type <defelt> base_backup_opt
%%
@@ -97,6 +98,7 @@ command:
identify_system
| base_backup
| start_replication
+ | start_logical_replication
;

/*
@@ -166,6 +168,21 @@ start_replication:
$$ = (Node *) cmd;
}
;
+
+/*
+ * START_LOGICAL_REPLICATION %X/%X
+ */
+start_logical_replication:
+ K_START_LOGICAL_REPLICATION RECPTR
+ {
+ StartLogicalReplicationCmd *cmd;
+
+ cmd = makeNode(StartLogicalReplicationCmd);
+ cmd->startpoint = $2;
+
+ $$ = (Node *) cmd;
+ }
+ ;
%%

#include "repl_scanner.c"
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index 9d4edcf..f8be982 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -64,6 +64,7 @@ NOWAIT { return K_NOWAIT; }
PROGRESS { return K_PROGRESS; }
WAL { return K_WAL; }
START_REPLICATION { return K_START_REPLICATION; }
+START_LOGICAL_REPLICATION { return K_START_LOGICAL_REPLICATION; }
"," { return ','; }
";" { return ';'; }

diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index e97196b..73a3021 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -44,6 +44,7 @@
#include "replication/walprotocol.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
+#include "replication/logical.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/procarray.h"
@@ -58,9 +59,12 @@ bool am_walreceiver;
/* GUC variable */
int wal_receiver_status_interval;
bool hot_standby_feedback;
+char *mm_conninfo = 0;
+RepNodeId receiving_from_node_id = InvalidMultimasterNodeId;

/* libpqreceiver hooks to these when loaded */
walrcv_connect_type walrcv_connect = NULL;
+walrcv_start_type walrcv_start = NULL;
walrcv_receive_type walrcv_receive = NULL;
walrcv_send_type walrcv_send = NULL;
walrcv_disconnect_type walrcv_disconnect = NULL;
@@ -93,9 +97,13 @@ static struct
XLogRecPtr Flush; /* last byte + 1 flushed in the standby */
} LogstreamResult;

+XLogRecPtr curRecv;
+
static StandbyReplyMessage reply_message;
static StandbyHSFeedbackMessage feedback_message;

+static bool startedDuringRecovery; /* are we going to receive WAL data */
+
/*
* About SIGTERM handling:
*
@@ -122,6 +130,9 @@ static void WalRcvDie(int code, Datum arg);
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
static void XLogWalRcvFlush(bool dying);
+
+static void LogicalWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
+
static void XLogWalRcvSendReply(void);
static void XLogWalRcvSendHSFeedback(void);
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
@@ -170,13 +181,17 @@ void
WalReceiverMain(void)
{
char conninfo[MAXCONNINFO];
- XLogRecPtr startpoint;
+ XLogRecPtr startpoint = {0, 0};
+ XLogRecPtr other_end_at;
+ XLogRecPtr other_end_redo;

/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;

am_walreceiver = true;

+ elog(LOG, "wal receiver starting");
+
/*
* WalRcv should be set up already (if we are a backend, we inherit this
* by fork() or EXEC_BACKEND mechanism from the postmaster).
@@ -200,8 +215,11 @@ WalReceiverMain(void)
/* fall through */

case WALRCV_STOPPED:
- SpinLockRelease(&walrcv->mutex);
- proc_exit(1);
+ if (startedDuringRecovery)
+ {
+ SpinLockRelease(&walrcv->mutex);
+ proc_exit(1);
+ }
break;

case WALRCV_STARTING:
@@ -212,13 +230,35 @@ WalReceiverMain(void)
/* Shouldn't happen */
elog(PANIC, "walreceiver still running according to shared memory state");
}
- /* Advertise our PID so that the startup process can kill us */
+ /* Advertise our PID so that we can be killed */
walrcv->pid = MyProcPid;
walrcv->walRcvState = WALRCV_RUNNING;

- /* Fetch information required to start streaming */
- strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
- startpoint = walrcv->receiveStart;
+ /*
+ * Fetch information required to start streaming.
+ *
+ * During recovery the WALReceiver is started from the Startup process,
+ * by sending a postmaster signal. In normal running the Postmaster
+ * starts the WALReceiver directly. In that case the walrcv shmem struct
+ * is simply zeroed, so walrcv->startedDuringRecovery will show as false.
+ *
+ * The connection info required to access the upstream master comes from
+ * the multimaster_conninfo parameter, stored in the mm_conninfo variable.
+ *
+ * XXX The starting point for logical replication is not yet determined.
+ */
+ startedDuringRecovery = walrcv->startedDuringRecovery;
+ if (startedDuringRecovery)
+ {
+ strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
+ startpoint = walrcv->receiveStart;
+ }
+ else
+ {
+ elog(LOG, "logical replication starting: %s", mm_conninfo);
+ strlcpy(conninfo, (char *) mm_conninfo, MAXCONNINFO);
+ /* The startpoint for logical replay can only be determined after connecting */
+ }

/* Initialise to a sanish value */
walrcv->lastMsgSendTime = walrcv->lastMsgReceiptTime = GetCurrentTimestamp();
@@ -262,8 +302,9 @@ WalReceiverMain(void)

/* Load the libpq-specific functions */
load_file("libpqwalreceiver", false);
- if (walrcv_connect == NULL || walrcv_receive == NULL ||
- walrcv_send == NULL || walrcv_disconnect == NULL)
+ if (walrcv_connect == NULL || walrcv_start == NULL ||
+ walrcv_receive == NULL || walrcv_send == NULL ||
+ walrcv_disconnect == NULL)
elog(ERROR, "libpqwalreceiver didn't initialize correctly");

/*
@@ -277,7 +318,58 @@ WalReceiverMain(void)

/* Establish the connection to the primary for XLOG streaming */
EnableWalRcvImmediateExit();
- walrcv_connect(conninfo, startpoint);
+ walrcv_connect(conninfo, &other_end_redo, &other_end_at, startedDuringRecovery);
+
+ if(LogicalWalReceiverActive()){
+ char buf[MAXPGPATH];
+
+ if(RecoveryInProgress()){
+ elog(FATAL, "cannot have the logical receiver running while recovery is ongoing");
+ }
+
+ if(receiving_from_node_id == InvalidMultimasterNodeId)
+ elog(FATAL, "didn't setup/derive other node id");
+
+ Assert(WalRcv);
+
+ startpoint = WalRcv->mm_receiveState[receiving_from_node_id];
+
+ /*
+ * in this case we connect to some master we haven't yet received data
+ * from yet.
+ * FIXME: This means we would need to initialize the local cluster!
+ */
+ if(XLByteEQ(startpoint, zeroRecPtr)){
+ startpoint = other_end_redo;
+
+ /* we need to scroll back to the begin of the segment */
+ startpoint.xrecoff -= startpoint.xrecoff % XLogSegSize;
+
+ WalRcv->mm_receiveState[receiving_from_node_id] = startpoint;
+
+ WalRcv->mm_applyState[receiving_from_node_id] = other_end_redo;
+
+ /* FIXME: this should be an ereport */
+ elog(LOG, "initializing recovery from logical node %d to %X/%X, transfer from %X/%X",
+ receiving_from_node_id,
+ other_end_at.xlogid, other_end_at.xrecoff,
+ startpoint.xlogid, startpoint.xrecoff);
+ }
+ else if(XLByteLT(other_end_at, startpoint)){
+ elog(FATAL, "something went wrong, the other side has a too small xlogid/xlrecoff. Other: %X/%X, self: %X/%X",
+ other_end_at.xlogid, other_end_at.xrecoff,
+ startpoint.xlogid, startpoint.xrecoff);
+ }
+
+ /*
+ * the set of foreign nodes can increase all the time, so we just make
+ * sure the particular one we need exists.
+ */
+ snprintf(buf, MAXPGPATH-1, "%s/%u", LCRDIR, receiving_from_node_id);
+ pg_mkdir_p(buf, S_IRWXU);
+ }
+
+ walrcv_start(conninfo, &startpoint, startedDuringRecovery);
DisableWalRcvImmediateExit();

/* Loop until end-of-streaming or error */
@@ -298,7 +390,7 @@ WalReceiverMain(void)
* Exit walreceiver if we're not in recovery. This should not happen,
* but cross-check the status here.
*/
- if (!RecoveryInProgress())
+ if (!RecoveryInProgress() && !LogicalWalReceiverActive())
ereport(FATAL,
(errmsg("cannot continue WAL streaming, recovery has already ended")));

@@ -443,7 +535,17 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)

buf += sizeof(WalDataMessageHeader);
len -= sizeof(WalDataMessageHeader);
- XLogWalRcvWrite(buf, len, msghdr.dataStart);
+
+ /*
+ * The WALReceiver connects either during recovery or during
+ * normal running. During recovery then pure WAL data is
+ * received, whereas during normal running we send Logical
+ * Change Records (LCRs) which are stored differently.
+ */
+ if (LogicalWalReceiverActive())
+ XLogWalRcvWrite(buf, len, msghdr.dataStart);
+ else
+ LogicalWalRcvWrite(buf, len, msghdr.dataStart);
break;
}
case 'k': /* Keepalive */
@@ -477,6 +579,10 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
int startoff;
int byteswritten;

+#ifdef VERBOSE_DEBUG
+ elog(LOG, "received data len %lu, at %X/%X",
+ nbytes, recptr.xlogid, recptr.xrecoff);
+#endif
while (nbytes > 0)
{
int segbytes;
@@ -509,7 +615,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
/* Create/use new log file */
XLByteToSeg(recptr, recvId, recvSeg);
use_existent = true;
- recvFile = XLogFileInit(InvalidMultimasterNodeId, recvId, recvSeg, &use_existent, true);
+ recvFile = XLogFileInit(receiving_from_node_id, recvId, recvSeg, &use_existent, true);
recvOff = 0;
}

@@ -585,6 +691,27 @@ XLogWalRcvFlush(bool dying)
{
walrcv->latestChunkStart = walrcv->receivedUpto;
walrcv->receivedUpto = LogstreamResult.Flush;
+
+ /* FIXME */
+ if(LogicalWalReceiverActive()){
+ if(XLByteLE(curRecv, LogstreamResult.Write)){
+ WalRcv->mm_receiveState[receiving_from_node_id] = curRecv;
+
+ if(WalRcv->mm_receiveLatch[receiving_from_node_id])
+ SetLatch(WalRcv->mm_receiveLatch[receiving_from_node_id]);
+#if 0
+ elog(LOG, "confirming flush to %X/%X",
+ curRecv.xlogid, curRecv.xrecoff);
+#endif
+ }
+ else{
+#if 0
+ elog(LOG, "not conf flush to %X/%X, wrote to %X/%X",
+ curRecv.xlogid, curRecv.xrecoff,
+ LogstreamResult.Write.xlogid, LogstreamResult.Write.xrecoff);
+#endif
+ }
+ }
}
SpinLockRelease(&walrcv->mutex);

@@ -614,6 +741,15 @@ XLogWalRcvFlush(bool dying)
}

/*
+ * Handle LCR data.
+ */
+static void
+LogicalWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
+{
+ elog(LOG, "received msg of length %u", (uint) nbytes);
+}
+
+/*
* Send reply message to primary, indicating our current XLOG positions and
* the current time.
*/
@@ -750,6 +886,9 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
SpinLockRelease(&walrcv->mutex);

+ /* we need to store that in shmem */
+ curRecv = walEnd;
+
if (log_min_messages <= DEBUG2)
elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
timestamptz_to_str(sendTime),
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index cb49282..aa07746 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -207,6 +207,7 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
walrcv->conninfo[0] = '\0';
walrcv->walRcvState = WALRCV_STARTING;
walrcv->startTime = now;
+ walrcv->startedDuringRecovery = true;

/*
* If this is the first startup of walreceiver, we initialize receivedUpto
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 8cd3a00..d2e1c76 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -37,9 +37,13 @@
#include <signal.h>
#include <unistd.h>

+#include "access/xlogreader.h"
#include "access/transam.h"
+#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/pg_type.h"
+#include "catalog/pg_class.h"
+#include "catalog/pg_control.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
@@ -64,6 +68,7 @@
#include "utils/ps_status.h"
#include "utils/resowner.h"
#include "utils/timestamp.h"
+#include "utils/syscache.h"


/* Array of WalSnds in shared memory */
@@ -74,9 +79,12 @@ WalSnd *MyWalSnd = NULL;

/* Global state */
bool am_walsender = false; /* Am I a walsender process ? */
+
bool am_cascading_walsender = false; /* Am I cascading WAL to
* another standby ? */

+bool am_doing_logical = false; /* Am I sending logical changes instead of physical ones */
+
/* User-settable parameters for walsender */
int max_wal_senders = 0; /* the maximum number of concurrent walsenders */
int replication_timeout = 60 * 1000; /* maximum time to send one
@@ -112,6 +120,12 @@ static TimestampTz last_reply_timestamp;
*/
static bool wroteNewXlogData = false;

+/*
+ * state for continuous reading of the local servers wal for sending logical
+ * wal
+ */
+static XLogReaderState* xlogreader_state = 0;
+
/* Flags set by signal handlers for later service in main loop */
static volatile sig_atomic_t got_SIGHUP = false;
volatile sig_atomic_t walsender_shutdown_requested = false;
@@ -131,8 +145,19 @@ static void InitWalSnd(void);
static void WalSndHandshake(void);
static void WalSndKill(int code, Datum arg);
static void XLogSend(char *msgbuf, bool *caughtup);
+static Size XLogSendPhysical(char *msgbuf, bool *caughtup, XLogRecPtr startptr,
+ XLogRecPtr endptr);
+static Size XLogSendLogical(char *msgbuf, bool *caughtup, XLogRecPtr startptr,
+ XLogRecPtr endptr);
static void IdentifySystem(void);
static void StartReplication(StartReplicationCmd *cmd);
+static void StartLogicalReplication(StartLogicalReplicationCmd *cmd);
+
+static bool RecordRelevantForLogicalReplication(XLogReaderState* state, XLogRecord* r);
+static void ProcessRecord(XLogReaderState* state, XLogRecordBuffer* buf);
+static void WriteoutData(XLogReaderState* state, char* data, Size len);
+static void XLogReadPage(XLogReaderState* state, char *buf, XLogRecPtr startptr);
+
static void ProcessStandbyMessage(void);
static void ProcessStandbyReplyMessage(void);
static void ProcessStandbyHSFeedbackMessage(void);
@@ -293,8 +318,10 @@ IdentifySystem(void)
char sysid[32];
char tli[11];
char xpos[MAXFNAMELEN];
+ char node_id[MAXFNAMELEN];//FIXME
+ char redoptr_s[MAXFNAMELEN];
XLogRecPtr logptr;
-
+ XLogRecPtr redoptr = GetRedoRecPtr();
/*
* Reply with a result set with one row, three columns. First col is
* system ID, second is timeline ID, and third is current xlog location.
@@ -309,9 +336,14 @@ IdentifySystem(void)
snprintf(xpos, sizeof(xpos), "%X/%X",
logptr.xlogid, logptr.xrecoff);

+ snprintf(node_id, sizeof(node_id), "%i", guc_replication_origin_id);
+
+ snprintf(redoptr_s, sizeof(redoptr_s), "%X/%X",
+ redoptr.xlogid, redoptr.xrecoff);
+
/* Send a RowDescription message */
pq_beginmessage(&buf, 'T');
- pq_sendint(&buf, 3, 2); /* 3 fields */
+ pq_sendint(&buf, 5, 2); /* 5 fields */

/* first field */
pq_sendstring(&buf, "systemid"); /* col name */
@@ -332,24 +364,47 @@ IdentifySystem(void)
pq_sendint(&buf, 0, 2); /* format code */

/* third field */
- pq_sendstring(&buf, "xlogpos");
- pq_sendint(&buf, 0, 4);
- pq_sendint(&buf, 0, 2);
- pq_sendint(&buf, TEXTOID, 4);
- pq_sendint(&buf, -1, 2);
- pq_sendint(&buf, 0, 4);
- pq_sendint(&buf, 0, 2);
+ pq_sendstring(&buf, "xlogpos"); /* col name */
+ pq_sendint(&buf, 0, 4); /* table oid */
+ pq_sendint(&buf, 0, 2); /* attnum */
+ pq_sendint(&buf, TEXTOID, 4); /* type oid */
+ pq_sendint(&buf, -1, 2); /* typlen */
+ pq_sendint(&buf, 0, 4); /* typmod */
+ pq_sendint(&buf, 0, 2); /* format code */
+
+ /* fourth field */
+ pq_sendstring(&buf, "node_id"); /* col name */
+ pq_sendint(&buf, 0, 4); /* table oid */
+ pq_sendint(&buf, 0, 2); /* attnum */
+ pq_sendint(&buf, INT4OID, 4); /* type oid */
+ pq_sendint(&buf, 4, 2); /* typlen */
+ pq_sendint(&buf, 0, 4); /* typmod */
+ pq_sendint(&buf, 0, 2); /* format code */
+
+ /* fifth field */
+ pq_sendstring(&buf, "last_checkpoint"); /* col name */
+ pq_sendint(&buf, 0, 4); /* table oid */
+ pq_sendint(&buf, 0, 2); /* attnum */
+ pq_sendint(&buf, TEXTOID, 4); /* type oid */
+ pq_sendint(&buf, -1, 2); /* typlen */
+ pq_sendint(&buf, 0, 4); /* typmod */
+ pq_sendint(&buf, 0, 2); /* format code */
+
pq_endmessage(&buf);

/* Send a DataRow message */
pq_beginmessage(&buf, 'D');
- pq_sendint(&buf, 3, 2); /* # of columns */
+ pq_sendint(&buf, 5, 2); /* # of columns */
pq_sendint(&buf, strlen(sysid), 4); /* col1 len */
pq_sendbytes(&buf, (char *) &sysid, strlen(sysid));
pq_sendint(&buf, strlen(tli), 4); /* col2 len */
pq_sendbytes(&buf, (char *) tli, strlen(tli));
pq_sendint(&buf, strlen(xpos), 4); /* col3 len */
pq_sendbytes(&buf, (char *) xpos, strlen(xpos));
+ pq_sendint(&buf, strlen(node_id), 4); /* col4 len */
+ pq_sendbytes(&buf, (char *)node_id, strlen(node_id));
+ pq_sendint(&buf, strlen(redoptr_s), 4); /* col5 len */
+ pq_sendbytes(&buf, (char *)redoptr_s, strlen(redoptr_s));

pq_endmessage(&buf);

@@ -432,6 +487,8 @@ StartReplication(StartReplicationCmd *cmd)
pq_endmessage(&buf);
pq_flush();

+ am_doing_logical = false;
+
/*
* Initialize position to the received one, then the xlog records begin to
* be shipped from that position
@@ -440,6 +497,56 @@ StartReplication(StartReplicationCmd *cmd)
}

/*
+ * START_LOGICAL_REPLICATION
+ */
+static void
+StartLogicalReplication(StartLogicalReplicationCmd *cmd)
+{
+ StringInfoData buf;
+
+ /* XXX: see above */
+ MarkPostmasterChildWalSender();
+ SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
+
+ /* XXX: see above*/
+ if (am_cascading_walsender && !RecoveryInProgress())
+ {
+ ereport(LOG,
+ (errmsg("terminating walsender process to force cascaded standby "
+ "to update timeline and reconnect")));
+ walsender_ready_to_stop = true;
+ }
+
+ /* XXX: see above*/
+ WalSndSetState(WALSNDSTATE_CATCHUP);
+
+ /* Send a CopyBothResponse message, and start streaming */
+ pq_beginmessage(&buf, 'W');
+ pq_sendbyte(&buf, 0);
+ pq_sendint(&buf, 0, 2);
+ pq_endmessage(&buf);
+ pq_flush();
+
+ am_doing_logical = true;
+
+ sentPtr = cmd->startpoint;
+
+ if(!xlogreader_state){
+ xlogreader_state = XLogReaderAllocate();
+ xlogreader_state->is_record_interesting = RecordRelevantForLogicalReplication;
+ xlogreader_state->finished_record = ProcessRecord;
+ xlogreader_state->writeout_data = WriteoutData;
+ xlogreader_state->read_page = XLogReadPage;
+
+ /* FIXME: it would probably better to handle this */
+ XLogReaderReset(xlogreader_state);
+ }
+
+ xlogreader_state->startptr = cmd->startpoint;
+ xlogreader_state->curptr = cmd->startpoint;
+}
+
+/*
* Execute an incoming replication command.
*/
static bool
@@ -483,6 +590,13 @@ HandleReplicationCommand(const char *cmd_string)
replication_started = true;
break;

+ case T_StartLogicalReplicationCmd:
+ StartLogicalReplication((StartLogicalReplicationCmd *) cmd_node);
+
+ /* break out of the loop */
+ replication_started = true;
+ break;
+
case T_BaseBackupCmd:
SendBaseBackup((BaseBackupCmd *) cmd_node);

@@ -1071,54 +1185,142 @@ retry:
p += readbytes;
}

- /*
- * After reading into the buffer, check that what we read was valid. We do
- * this after reading, because even though the segment was present when we
- * opened it, it might get recycled or removed while we read it. The
- * read() succeeds in that case, but the data we tried to read might
- * already have been overwritten with new WAL records.
- */
- XLogGetLastRemoved(&lastRemovedLog, &lastRemovedSeg);
- XLByteToSeg(startptr, log, seg);
- if (log < lastRemovedLog ||
- (log == lastRemovedLog && seg <= lastRemovedSeg))
- {
- char filename[MAXFNAMELEN];
+ if(node_id == InvalidMultimasterNodeId){
+ /*
+ * After reading into the buffer, check that what we read was valid. We
+ * do this after reading, because even though the segment was present
+ * when we opened it, it might get recycled or removed while we read
+ * it. The read() succeeds in that case, but the data we tried to read
+ * might already have been overwritten with new WAL records.
+ */
+ XLogGetLastRemoved(&lastRemovedLog, &lastRemovedSeg);
+ XLByteToSeg(startptr, log, seg);
+ if (log < lastRemovedLog ||
+ (log == lastRemovedLog && seg <= lastRemovedSeg))
+ {
+ char filename[MAXFNAMELEN];

- XLogFileName(filename, ThisTimeLineID, log, seg);
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("requested WAL segment %s has already been removed",
- filename)));
+ XLogFileName(filename, ThisTimeLineID, log, seg);
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("requested WAL segment %s has already been removed",
+ filename)));
+ }
+
+ /*
+ * During recovery, the currently-open WAL file might be replaced with
+ * the file of the same name retrieved from archive. So we always need
+ * to check what we read was valid after reading into the buffer. If
+ * it's invalid, we try to open and read the file again.
+ */
+ if (am_cascading_walsender)
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = MyWalSnd;
+ bool reload;
+
+ SpinLockAcquire(&walsnd->mutex);
+ reload = walsnd->needreload;
+ walsnd->needreload = false;
+ SpinLockRelease(&walsnd->mutex);
+
+ if (reload && sendFile >= 0)
+ {
+ close(sendFile);
+ sendFile = -1;
+
+ goto retry;
+ }
+ }
}
+ else{
+ /* FIXME: check shm? */
+ }
+}

+static bool
+RecordRelevantForLogicalReplication(XLogReaderState* state, XLogRecord* r){
/*
- * During recovery, the currently-open WAL file might be replaced with the
- * file of the same name retrieved from archive. So we always need to
- * check what we read was valid after reading into the buffer. If it's
- * invalid, we try to open and read the file again.
+ * For now we only send out data that are originating locally which implies
+ * a start topology between all nodes. Later we might support more
+ * complicated models. For that filtering positively by wanted IDs sounds
+ * like a better idea.
*/
- if (am_cascading_walsender)
- {
- /* use volatile pointer to prevent code rearrangement */
- volatile WalSnd *walsnd = MyWalSnd;
- bool reload;
+ if(r->xl_origin_id != current_replication_origin_id)
+ return false;
+
+ switch(r->xl_rmid){
+ case RM_HEAP_ID:
+ case RM_HEAP2_ID:
+ case RM_XACT_ID:
+ case RM_XLOG_ID:
+ /* FIXME: filter additionally */
+ return true;
+ default:
+ return false;
+ }
+}

- SpinLockAcquire(&walsnd->mutex);
- reload = walsnd->needreload;
- walsnd->needreload = false;
- SpinLockRelease(&walsnd->mutex);

- if (reload && sendFile >= 0)
- {
- close(sendFile);
- sendFile = -1;
+static void
+XLogReadPage(XLogReaderState* state, char *buf, XLogRecPtr startptr)
+{
+ XLogPageHeader page_header;

- goto retry;
- }
+ Assert((startptr.xrecoff % XLOG_BLCKSZ) == 0);
+
+ /* elog(LOG, "Reading from %X/%X", startptr.xlogid, startptr.xrecoff); */
+
+ /* FIXME: more sensible implementation */
+ XLogRead(buf, InvalidMultimasterNodeId, startptr, XLOG_BLCKSZ);
+
+ page_header = (XLogPageHeader)buf;
+
+ if(page_header->xlp_magic != XLOG_PAGE_MAGIC){
+ elog(FATAL, "page header magic %x, should be %x", page_header->xlp_magic,
+ XLOG_PAGE_MAGIC);
}
}

+static void
+ProcessRecord(XLogReaderState* state, XLogRecordBuffer* buf){
+ //FIXME: process table relfilenode reassignments here
+}
+
+static void WriteoutData(XLogReaderState* state, char* data, Size len){
+ //FIXME: state->nbytes shouldn't be used in here
+ /* we want to writeout zeros */
+ if(data == 0)
+ memset((char*)state->private_data + state->nbytes, 0, len);
+ else
+ memcpy((char*)state->private_data + state->nbytes, data, len);
+ state->nbytes += len;
+}
+
+static Size
+XLogSendLogical(char *msgbuf, bool *caughtup, XLogRecPtr startptr,
+ XLogRecPtr endptr)
+{
+#ifdef BUGGY
+ if(!xlogreader_state->incomplete){
+ XLogReaderReset(xlogreader_state);
+ xlogreader_state->startptr = startptr;
+ xlogreader_state->curptr = startptr;
+ }
+#endif
+
+ xlogreader_state->endptr = endptr;
+ xlogreader_state->private_data = msgbuf;
+ xlogreader_state->nbytes = 0;//FIXME: this should go
+
+ XLogReaderRead(xlogreader_state);
+
+ //FIXME
+ sentPtr = xlogreader_state->curptr;
+
+ return xlogreader_state->nbytes;
+}
+
/*
* Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
* but not yet sent to the client, and buffer it in the libpq output
@@ -1136,10 +1338,11 @@ static void
XLogSend(char *msgbuf, bool *caughtup)
{
XLogRecPtr SendRqstPtr;
- XLogRecPtr startptr;
- XLogRecPtr endptr;
- Size nbytes;
+ XLogRecPtr startptr = sentPtr;
+ XLogRecPtr endptr = sentPtr;
+
WalDataMessageHeader msghdr;
+ Size nbytes = 0;

/*
* Attempt to send all data that's already been written out and fsync'd to
@@ -1155,44 +1358,17 @@ XLogSend(char *msgbuf, bool *caughtup)
if (XLByteLE(SendRqstPtr, sentPtr))
{
*caughtup = true;
+#if 0
+ elog(LOG, "caughtup %X/%X", SendRqstPtr.xlogid, SendRqstPtr.xrecoff);
+#endif
return;
}

- /*
- * Figure out how much to send in one message. If there's no more than
- * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
- * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
- *
- * The rounding is not only for performance reasons. Walreceiver relies on
- * the fact that we never split a WAL record across two messages. Since a
- * long WAL record is split at page boundary into continuation records,
- * page boundary is always a safe cut-off point. We also assume that
- * SendRqstPtr never points to the middle of a WAL record.
- */
- startptr = sentPtr;
- if (startptr.xrecoff >= XLogFileSize)
- {
- /*
- * crossing a logid boundary, skip the non-existent last log segment
- * in previous logical log file.
- */
- startptr.xlogid += 1;
- startptr.xrecoff = 0;
- }
-
- endptr = startptr;
+ /* FIXME: this is duplicated in physical transport */
XLByteAdvance(endptr, MAX_SEND_SIZE);
- if (endptr.xlogid != startptr.xlogid)
- {
- /* Don't cross a logfile boundary within one message */
- Assert(endptr.xlogid == startptr.xlogid + 1);
- endptr.xlogid = startptr.xlogid;
- endptr.xrecoff = XLogFileSize;
- }

/* if we went beyond SendRqstPtr, back off */
- if (XLByteLE(SendRqstPtr, endptr))
- {
+ if (XLByteLE(SendRqstPtr, endptr)){
endptr = SendRqstPtr;
*caughtup = true;
}
@@ -1203,34 +1379,39 @@ XLogSend(char *msgbuf, bool *caughtup)
*caughtup = false;
}

- nbytes = endptr.xrecoff - startptr.xrecoff;
- Assert(nbytes <= MAX_SEND_SIZE);
-
/*
* OK to read and send the slice.
*/
msgbuf[0] = 'w';

- /*
- * Read the log directly into the output buffer to avoid extra memcpy
- * calls.
- */
- XLogRead(msgbuf + 1 + sizeof(WalDataMessageHeader), InvalidMultimasterNodeId,
- startptr, nbytes);
+ nbytes += 1 + sizeof(WalDataMessageHeader);
+
+ if(am_doing_logical)
+ nbytes += XLogSendLogical(msgbuf + nbytes, caughtup, sentPtr, endptr);
+ else
+ nbytes += XLogSendPhysical(msgbuf + nbytes, caughtup, sentPtr, endptr);
+
+#if 0
+ elog(LOG, "setting sentPtr to %X/%X, SendRqstPtr %X/%X, endptr %X/%X",
+ sentPtr.xlogid, sentPtr.xrecoff,
+ SendRqstPtr.xlogid, SendRqstPtr.xrecoff,
+ endptr.xlogid, endptr.xrecoff);
+#endif

/*
* We fill the message header last so that the send timestamp is taken as
* late as possible.
*/
msghdr.dataStart = startptr;
- msghdr.walEnd = SendRqstPtr;
+ msghdr.walEnd = sentPtr;
msghdr.sendTime = GetCurrentTimestamp();

+
memcpy(msgbuf + 1, &msghdr, sizeof(WalDataMessageHeader));

- pq_putmessage_noblock('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes);
+ pq_putmessage_noblock('d', msgbuf,
+ nbytes);

- sentPtr = endptr;

/* Update shared memory status */
{
@@ -1251,8 +1432,59 @@ XLogSend(char *msgbuf, bool *caughtup)
sentPtr.xlogid, sentPtr.xrecoff);
set_ps_display(activitymsg, false);
}
+}
+
+static Size
+XLogSendPhysical(char *msgbuf, bool *caughtup, XLogRecPtr startptr, XLogRecPtr endptr){
+ Size nbytes;
+
+ /*
+ * Figure out how much to send in one message. If there's no more than
+ * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
+ * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
+ *
+ * The rounding is not only for performance reasons. Walreceiver relies on
+ * the fact that we never split a WAL record across two messages. Since a
+ * long WAL record is split at page boundary into continuation records,
+ * page boundary is always a safe cut-off point. We also assume that
+ * endptr never points to the middle of a WAL record.
+ */
+ startptr = sentPtr;
+ if (startptr.xrecoff >= XLogFileSize)
+ {
+ /*
+ * crossing a logid boundary, skip the non-existent last log segment
+ * in previous logical log file.
+ *
+ * FIXME: Isn't getting to that point a bug in the XLByte arithmetic?
+ */
+ startptr.xlogid += 1;
+ startptr.xrecoff = 0;
+ }
+
+ endptr = startptr;
+ XLByteAdvance(endptr, MAX_SEND_SIZE);
+ if (endptr.xlogid != startptr.xlogid)
+ {
+ /* Don't cross a logfile boundary within one message */
+ Assert(endptr.xlogid == startptr.xlogid + 1);
+ endptr.xlogid = startptr.xlogid;
+ endptr.xrecoff = XLogFileSize;
+ }
+
+
+ nbytes = endptr.xrecoff - startptr.xrecoff;
+ Assert(nbytes <= MAX_SEND_SIZE);
+
+ /*
+ * Read the log directly into the output buffer to avoid extra memcpy
+ * calls.
+ */
+ XLogRead(msgbuf, InvalidMultimasterNodeId, startptr, nbytes);
+
+ sentPtr = endptr;

- return;
+ return nbytes;
}

/*
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 46b0657..6a58f96 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -3058,6 +3058,15 @@ static struct config_string ConfigureNamesString[] =
},

{
+ {"multimaster_conninfo", PGC_POSTMASTER, REPLICATION_MASTER,
+ gettext_noop("Connection string to upstream master."),
+ NULL
+ },
+ &mm_conninfo,
+ 0, NULL, NULL, NULL
+ },
+
+ {
{"default_text_search_config", PGC_USERSET, CLIENT_CONN_LOCALE,
gettext_noop("Sets default text search configuration."),
NULL
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 12f8a3f..240c13d 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -243,6 +243,7 @@

# - Multi Master Servers -

+#multimaster_conninfo = 'host=myupstreammaster'
#multimaster_node_id = 0 #invalid node id

#------------------------------------------------------------------------------
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 1e16088..78b2f5f 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -403,6 +403,7 @@ typedef enum NodeTag
T_IdentifySystemCmd,
T_BaseBackupCmd,
T_StartReplicationCmd,
+ T_StartLogicalReplicationCmd,

/*
* TAGS FOR RANDOM OTHER STUFF
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index 236a36d..fee111c 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -49,4 +49,14 @@ typedef struct StartReplicationCmd
XLogRecPtr startpoint;
} StartReplicationCmd;

+/* ----------------------
+ * START_LOGICAL_REPLICATION command
+ * ----------------------
+ */
+typedef struct StartLogicalReplicationCmd
+{
+ NodeTag type;
+ XLogRecPtr startpoint;
+} StartLogicalReplicationCmd;
+
#endif /* REPLNODES_H */
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 8f44fad..fc9e120 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -13,6 +13,10 @@

#include "access/xlogdefs.h"

+/* user settable parameters for multi-master in postmaster */
+extern char *mm_conninfo; /* copied in walreceiver.h also */
+#define LogicalWalReceiverActive() (mm_conninfo != NULL)
+
extern int guc_replication_origin_id;
extern RepNodeId current_replication_origin_id;
extern XLogRecPtr current_replication_origin_lsn;
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index c9ab1be..b565190 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -22,6 +22,7 @@
extern bool am_walreceiver;
extern int wal_receiver_status_interval;
extern bool hot_standby_feedback;
+extern RepNodeId receiving_from_node_id;

/*
* MAXCONNINFO: maximum size of a connection string.
@@ -38,9 +39,9 @@ extern bool hot_standby_feedback;
*/
typedef enum
{
- WALRCV_STOPPED, /* stopped and mustn't start up again */
WALRCV_STARTING, /* launched, but the process hasn't
* initialized yet */
+ WALRCV_STOPPED, /* stopped and mustn't start up again */
WALRCV_RUNNING, /* walreceiver is running */
WALRCV_STOPPING /* requested to stop, but still running */
} WalRcvState;
@@ -55,6 +56,7 @@ typedef struct
*/
pid_t pid;
WalRcvState walRcvState;
+ bool startedDuringRecovery;
pg_time_t startTime;

/*
@@ -108,9 +110,12 @@ typedef struct
extern WalRcvData *WalRcv;

/* libpqwalreceiver hooks */
-typedef bool (*walrcv_connect_type) (char *conninfo, XLogRecPtr startpoint);
+typedef bool (*walrcv_connect_type) (char *conninfo, XLogRecPtr* redo, XLogRecPtr* where_at, bool startedDuringRecovery);
extern PGDLLIMPORT walrcv_connect_type walrcv_connect;

+typedef bool (*walrcv_start_type) (char *conninfo, XLogRecPtr* startpoint, bool startedDuringRecovery);
+extern PGDLLIMPORT walrcv_start_type walrcv_start;
+
typedef bool (*walrcv_receive_type) (int timeout, unsigned char *type,
char **buffer, int *len);
extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
--
1.7.10.rc3.3.g19a6c.dirty

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Andres Freund 2012-06-13 11:28:45 [PATCH 14/16] Add module to apply changes from an apply-cache using low-level functions
Previous Message Andres Freund 2012-06-13 11:28:43 [PATCH 12/16] Add state to keep track of logical replication