Index: DBMirror.pl =================================================================== RCS file: /projects/cvsroot/pgsql-server/contrib/dbmirror/DBMirror.pl,v retrieving revision 1.7 diff -c -r1.7 DBMirror.pl *** DBMirror.pl 29 Nov 2003 22:39:19 -0000 1.7 --- DBMirror.pl 28 Jan 2004 00:50:32 -0000 *************** *** 79,95 **** sub mirrorInsert($$$$$); sub mirrorDelete($$$$$); sub mirrorUpdate($$$$$); - sub sendQueryToSlaves($$); sub logErrorMessage($); ! sub openSlaveConnection($); sub updateMirrorHostTable($$); ! sub extractData($$); local $::masterHost; local $::masterDb; local $::masterUser; local $::masterPassword; local $::errorThreshold=5; local $::errorEmailAddr=undef; my %slaveInfoHash; local $::slaveInfo = \%slaveInfoHash; --- 79,95 ---- sub mirrorInsert($$$$$); sub mirrorDelete($$$$$); sub mirrorUpdate($$$$$); sub logErrorMessage($); ! sub setupSlave($); sub updateMirrorHostTable($$); ! sub extractData($$); local $::masterHost; local $::masterDb; local $::masterUser; local $::masterPassword; local $::errorThreshold=5; local $::errorEmailAddr=undef; + local $::sleepInterval=60; my %slaveInfoHash; local $::slaveInfo = \%slaveInfoHash; *************** *** 115,122 **** die; } ! ! my $connectString = "host=$::masterHost dbname=$::masterDb user=$::masterUser password=$::masterPassword"; $masterConn = Pg::connectdb($connectString); --- 115,139 ---- die; } ! if (defined($::syslog)) ! { ! # log with syslog ! require Sys::Syslog; ! import Sys::Syslog qw(openlog syslog); ! openlog($0, 'cons,pid', 'user'); ! syslog("info", '%s', "starting $0 script with $ARGV[0]"); ! } ! ! my $connectString; ! if(defined($::masterHost)) ! { ! $connectString .= "host=$::masterHost "; ! } ! if(defined($::masterPort)) ! { ! $connectString .= "port=$::masterPort "; ! } ! $connectString .= "dbname=$::masterDb user=$::masterUser password=$::masterPassword"; $masterConn = Pg::connectdb($connectString); *************** *** 138,170 **** my $firstTime = 1; while(1) { if($firstTime == 0) { ! sleep 60; } $firstTime = 0; - # Open up the connection to the slave. - if(! defined $::slaveInfo->{"status"} || - $::slaveInfo->{"status"} == -1) { - openSlaveConnection($::slaveInfo); - } - - sendQueryToSlaves(undef,"SET TRANSACTION ISOLATION LEVEL SERIALIZABLE"); - sendQueryToSlaves(undef,"SET CONSTRAINTS ALL DEFERRED"); #Obtain a list of pending transactions using ordering by our approximation #to the commit time. The commit time approximation is taken to be the #SeqId of the last row edit in the transaction. ! my $pendingTransQuery = "SELECT pd.\"XID\",MAX(\"SeqId\") FROM \"Pending\" pd"; ! $pendingTransQuery .= " LEFT JOIN \"MirroredTransaction\" mt INNER JOIN"; ! $pendingTransQuery .= " \"MirrorHost\" mh ON mt.\"MirrorHostId\" = "; ! $pendingTransQuery .= " mh.\"MirrorHostId\" AND mh.\"HostName\"="; ! $pendingTransQuery .= " '$::slaveInfo->{\"slaveHost\"}' "; ! $pendingTransQuery .= " ON pd.\"XID\""; ! $pendingTransQuery .= " = mt.\"XID\" WHERE mt.\"XID\" is null "; ! $pendingTransQuery .= " GROUP BY pd.\"XID\" "; ! $pendingTransQuery .= " ORDER BY MAX(pd.\"SeqId\")"; my $pendingTransResults = $masterConn->exec($pendingTransQuery); --- 155,183 ---- my $firstTime = 1; while(1) { if($firstTime == 0) { ! sleep $::sleepInterval; } $firstTime = 0; + setupSlave($::slaveInfo); + #Obtain a list of pending transactions using ordering by our approximation #to the commit time. The commit time approximation is taken to be the #SeqId of the last row edit in the transaction. ! my $pendingTransQuery = "SELECT pd.XID,MAX(SeqId) FROM dbmirror_Pending pd"; ! $pendingTransQuery .= " LEFT JOIN dbmirror_MirroredTransaction mt INNER JOIN"; ! $pendingTransQuery .= " dbmirror_MirrorHost mh ON mt.MirrorHostId = "; ! $pendingTransQuery .= " mh.MirrorHostId AND mh.SlaveName="; ! $pendingTransQuery .= " '$::slaveInfo->{\"slaveName\"}' "; ! $pendingTransQuery .= " ON pd.XID"; ! $pendingTransQuery .= " = mt.XID WHERE mt.XID is null "; ! ! ! $pendingTransQuery .= " GROUP BY pd.XID"; ! $pendingTransQuery .= " ORDER BY MAX(pd.SeqId)"; my $pendingTransResults = $masterConn->exec($pendingTransQuery); *************** *** 185,197 **** my $XID = $pendingTransResults->getvalue($curTransTuple,0); my $maxSeqId = $pendingTransResults->getvalue($curTransTuple,1); my $seqId; ! ! my $pendingQuery = "SELECT pnd.\"SeqId\",pnd.\"TableName\","; ! $pendingQuery .= " pnd.\"Op\",pnddata.\"IsKey\", pnddata.\"Data\" AS \"Data\" "; ! $pendingQuery .= " FROM \"Pending\" pnd, \"PendingData\" pnddata "; ! $pendingQuery .= " WHERE pnd.\"SeqId\" = pnddata.\"SeqId\" AND "; ! ! $pendingQuery .= " pnd.\"XID\"=$XID ORDER BY \"SeqId\", \"IsKey\" DESC"; my $pendingResults = $masterConn->exec($pendingQuery); --- 198,218 ---- my $XID = $pendingTransResults->getvalue($curTransTuple,0); my $maxSeqId = $pendingTransResults->getvalue($curTransTuple,1); my $seqId; ! ! ! if($::slaveInfo->{'status'} eq 'FileClosed') ! { ! openTransactionFile($::slaveInfo,$XID); ! } ! ! ! ! my $pendingQuery = "SELECT pnd.SeqId,pnd.TableName,"; ! $pendingQuery .= " pnd.Op,pnddata.IsKey, pnddata.Data AS Data "; ! $pendingQuery .= " FROM dbmirror_Pending pnd, dbmirror_PendingData pnddata "; ! $pendingQuery .= " WHERE pnd.SeqId = pnddata.SeqId "; ! ! $pendingQuery .= " AND pnd.XID=$XID ORDER BY SeqId, IsKey DESC"; my $pendingResults = $masterConn->exec($pendingQuery); *************** *** 200,239 **** die; } ! my $numPending = $pendingResults->ntuples; my $curTuple = 0; - sendQueryToSlaves(undef,"BEGIN"); while ($curTuple < $numPending) { $seqId = $pendingResults->getvalue($curTuple,0); my $tableName = $pendingResults->getvalue($curTuple,1); my $op = $pendingResults->getvalue($curTuple,2); - $curTuple = mirrorCommand($seqId,$tableName,$op,$XID, $pendingResults,$curTuple) +1; - if($::slaveInfo->{"status"}==-1) { - last; - } } ! #Now commit the transaction. ! if($::slaveInfo->{"status"}==-1) { last; } sendQueryToSlaves(undef,"COMMIT"); updateMirrorHostTable($XID,$seqId); - if($commandCount > 5000) { - $commandCount = 0; - $::slaveInfo->{"status"} = -1; - $::slaveInfo->{"slaveConn"}->reset; - #Open the connection right away. - openSlaveConnection($::slaveInfo); - - } $pendingResults = undef; $curTransTuple = $curTransTuple +1; }#while transactions left. $pendingTransResults = undef; --- 221,267 ---- die; } ! sendQueryToSlaves($XID,"BEGIN"); my $numPending = $pendingResults->ntuples; my $curTuple = 0; while ($curTuple < $numPending) { $seqId = $pendingResults->getvalue($curTuple,0); my $tableName = $pendingResults->getvalue($curTuple,1); my $op = $pendingResults->getvalue($curTuple,2); $curTuple = mirrorCommand($seqId,$tableName,$op,$XID, $pendingResults,$curTuple) +1; } ! ! if($::slaveInfo->{'status'} ne 'DBOpen' && ! $::slaveInfo->{'status'} ne 'FileOpen') ! { last; } sendQueryToSlaves(undef,"COMMIT"); + #Now commit the transaction. updateMirrorHostTable($XID,$seqId); $pendingResults = undef; $curTransTuple = $curTransTuple +1; + + if($::slaveInfo->{'status'} eq 'FileOpen') + { + close ($::slaveInfo->{'TransactionFile'}); + } + elsif($::slaveInfo->{'status'} eq 'DBOpen') + { + if($commandCount > 5000) { + $commandCount = 0; + $::slaveInfo->{"status"} = 'DBClosed'; + $::slaveInfo->{"slaveConn"}->reset; + #Open the connection right away. + openSlaveConnection($::slaveInfo); + + } + } + }#while transactions left. $pendingTransResults = undef; *************** *** 303,308 **** --- 331,337 ---- my $pendingResults = $_[4]; my $currentTuple = $_[5]; + if($op eq 'i') { $currentTuple = mirrorInsert($seqId,$tableName,$transId,$pendingResults ,$currentTuple); *************** *** 315,320 **** --- 344,353 ---- $currentTuple = mirrorUpdate($seqId,$tableName,$transId,$pendingResults, $currentTuple); } + if($op eq 's') { + $currentTuple = mirrorSequence($seqId,$tableName,$transId,$pendingResults, + $currentTuple); + } $commandCount = $commandCount +1; if($commandCount % 100 == 0) { # print "Sent 100 commmands on SeqId $seqId \n"; *************** *** 411,417 **** $firstIteration=0; } $valuesQuery .= ")"; ! sendQueryToSlaves(undef,$insertQuery . $valuesQuery); return $currentTuple; } --- 444,450 ---- $firstIteration=0; } $valuesQuery .= ")"; ! sendQueryToSlaves($transId,$insertQuery . $valuesQuery); return $currentTuple; } *************** *** 491,497 **** $counter++; $firstField=0; } - sendQueryToSlaves($transId,$deleteQuery); return $currentTuple; } --- 524,529 ---- *************** *** 554,567 **** my $transId = $_[2]; my $pendingResult = $_[3]; my $currentTuple = $_[4]; ! my $counter; my $quotedValue; my $updateQuery = "UPDATE $tableName SET "; my $currentField; - - my %keyValueHash; my %dataValueHash; my $firstIteration=1; --- 586,597 ---- my $transId = $_[2]; my $pendingResult = $_[3]; my $currentTuple = $_[4]; ! my $counter; my $quotedValue; my $updateQuery = "UPDATE $tableName SET "; my $currentField; my %keyValueHash; my %dataValueHash; my $firstIteration=1; *************** *** 615,626 **** } $firstIteration=0; } - sendQueryToSlaves($transId,$updateQuery); return $currentTuple+1; } =item sendQueryToSlaves(seqId,sqlQuery) --- 645,671 ---- } $firstIteration=0; } sendQueryToSlaves($transId,$updateQuery); return $currentTuple+1; } + sub mirrorSequence($$$$$) { + my $seqId = $_[0]; + my $sequenceName = $_[1]; + my $transId = $_[2]; + my $pendingResult = $_[3]; + my $currentTuple = $_[4]; + + + my $query; + my $sequenceValue = $pendingResult->getvalue($currentTuple,4); + $query = sprintf("select setval(%s,%s)",$sequenceName,$sequenceValue); + + sendQueryToSlaves($transId,$query); + return $currentTuple; + + } =item sendQueryToSlaves(seqId,sqlQuery) *************** *** 647,653 **** my $seqId = $_[0]; my $sqlQuery = $_[1]; ! if($::slaveInfo->{"status"} == 0) { my $queryResult = $::slaveInfo->{"slaveConn"}->exec($sqlQuery); unless($queryResult->resultStatus == PGRES_COMMAND_OK) { my $errorMessage; --- 692,698 ---- my $seqId = $_[0]; my $sqlQuery = $_[1]; ! if($::slaveInfo->{"status"} eq 'DBOpen') { my $queryResult = $::slaveInfo->{"slaveConn"}->exec($sqlQuery); unless($queryResult->resultStatus == PGRES_COMMAND_OK) { my $errorMessage; *************** *** 660,669 **** --- 705,722 ---- $::slaveInfo->{"status"} = -1; } } + elsif($::slaveInfo->{"status"} eq 'FileOpen' ) { + my $xfile = $::slaveInfo->{'TransactionFile'}; + print $xfile $sqlQuery . ";\n"; + } + + } + + =item logErrorMessage(error) Mails an error message to the users specified $errorEmailAddr *************** *** 707,747 **** print mailPipe "\n\n\n=================================================\n"; close mailPipe; } warn($error); $lastErrorMsg = $error; } ! sub openSlaveConnection($) { my $slavePtr = $_[0]; - my $slaveConn; - my $slaveConnString = "host=" . $slavePtr->{"slaveHost"}; - $slaveConnString .= " dbname=" . $slavePtr->{"slaveDb"}; - $slaveConnString .= " user=" . $slavePtr->{"slaveUser"}; - $slaveConnString .= " password=" . $slavePtr->{"slavePassword"}; - - $slaveConn = Pg::connectdb($slaveConnString); - - if($slaveConn->status != PGRES_CONNECTION_OK) { - my $errorMessage = "Can't connect to slave database " ; - $errorMessage .= $slavePtr->{"slaveHost"} . "\n"; - $errorMessage .= $slaveConn->errorMessage; - logErrorMessage($errorMessage); - $slavePtr->{"status"} = -1; - } - else { - $slavePtr->{"slaveConn"} = $slaveConn; $slavePtr->{"status"} = 0; #Determine the MirrorHostId for the slave from the master's database ! my $resultSet = $masterConn->exec('SELECT "MirrorHostId" FROM ' ! . ' "MirrorHost" WHERE "HostName"' ! . '=\'' . $slavePtr->{"slaveHost"} . '\''); if($resultSet->ntuples !=1) { ! my $errorMessage .= $slavePtr->{"slaveHost"} ."\n"; $errorMessage .= "Has no MirrorHost entry on master\n"; logErrorMessage($errorMessage); $slavePtr->{"status"}=-1; --- 760,789 ---- print mailPipe "\n\n\n=================================================\n"; close mailPipe; } + + if (defined($::syslog)) + { + syslog('err', '%s (%m)', $error); + } + warn($error); $lastErrorMsg = $error; } ! sub setupSlave($) { my $slavePtr = $_[0]; $slavePtr->{"status"} = 0; #Determine the MirrorHostId for the slave from the master's database ! my $resultSet = $masterConn->exec('SELECT MirrorHostId FROM ' ! . ' dbmirror_MirrorHost WHERE SlaveName' ! . '=\'' . $slavePtr->{"slaveName"} . '\''); if($resultSet->ntuples !=1) { ! my $errorMessage .= $slavePtr->{"slaveName"} ."\n"; $errorMessage .= "Has no MirrorHost entry on master\n"; logErrorMessage($errorMessage); $slavePtr->{"status"}=-1; *************** *** 749,762 **** } $slavePtr->{"MirrorHostId"} = $resultSet->getvalue(0,0); ! ! ! } } - =item updateMirrorHostTable(lastTransId,lastSeqId) Updates the MirroredTransaction table to reflect the fact that --- 791,814 ---- } $slavePtr->{"MirrorHostId"} = $resultSet->getvalue(0,0); ! ! if(defined($::slaveInfo->{'slaveDb'})) { ! # We talk directly to a slave database. ! # ! if($::slaveInfo->{"status"} ne 'DBOpen') ! { ! openSlaveConnection($::slaveInfo); ! } ! sendQueryToSlaves(undef,"SET TRANSACTION ISOLATION LEVEL SERIALIZABLE"); ! sendQueryToSlaves(undef,"SET CONSTRAINTS ALL DEFERRED"); ! } ! else { ! $::slaveInfo->{"status"} = 'FileClosed'; } + } =item updateMirrorHostTable(lastTransId,lastSeqId) Updates the MirroredTransaction table to reflect the fact that *************** *** 783,821 **** my $lastTransId = shift; my $lastSeqId = shift; ! if($::slaveInfo->{"status"}==0) { ! my $deleteTransactionQuery; ! my $deleteResult; ! my $updateMasterQuery = "INSERT INTO \"MirroredTransaction\" "; ! $updateMasterQuery .= " (\"XID\",\"LastSeqId\",\"MirrorHostId\")"; ! $updateMasterQuery .= " VALUES ($lastTransId,$lastSeqId,$::slaveInfo->{\"MirrorHostId\"}) "; ! ! my $updateResult = $masterConn->exec($updateMasterQuery); ! unless($updateResult->resultStatus == PGRES_COMMAND_OK) { ! my $errorMessage = $masterConn->errorMessage . "\n"; ! $errorMessage .= $updateMasterQuery; ! logErrorMessage($errorMessage); ! die; ! } # print "Updated slaves to transaction $lastTransId\n" ; # flush STDOUT; ! #If this transaction has now been mirrored to all mirror hosts ! #then it can be deleted. ! $deleteTransactionQuery = 'DELETE FROM "Pending" WHERE "XID"=' ! . $lastTransId . ' AND (SELECT COUNT(*) FROM "MirroredTransaction"' ! . ' WHERE "XID"=' . $lastTransId . ')=(SELECT COUNT(*) FROM' ! . ' "MirrorHost")'; ! ! $deleteResult = $masterConn->exec($deleteTransactionQuery); ! if($deleteResult->resultStatus!=PGRES_COMMAND_OK) { ! logErrorMessage($masterConn->errorMessage . "\n" . ! $deleteTransactionQuery); ! die; ! } ! } } --- 835,874 ---- my $lastTransId = shift; my $lastSeqId = shift; ! ! ! my $deleteTransactionQuery; ! my $deleteResult; ! my $updateMasterQuery = "INSERT INTO dbmirror_MirroredTransaction "; ! $updateMasterQuery .= " (XID,LastSeqId,MirrorHostId)"; ! $updateMasterQuery .= " VALUES ($lastTransId,$lastSeqId,$::slaveInfo->{\"MirrorHostId\"}) "; ! ! my $updateResult = $masterConn->exec($updateMasterQuery); ! unless($updateResult->resultStatus == PGRES_COMMAND_OK) { ! my $errorMessage = $masterConn->errorMessage . "\n"; ! $errorMessage .= $updateMasterQuery; ! logErrorMessage($errorMessage); ! die; ! } # print "Updated slaves to transaction $lastTransId\n" ; # flush STDOUT; ! #If this transaction has now been mirrored to all mirror hosts ! #then it can be deleted. ! $deleteTransactionQuery = 'DELETE FROM dbmirror_Pending WHERE XID=' ! . $lastTransId . ' AND (SELECT COUNT(*) FROM dbmirror_MirroredTransaction' ! . ' WHERE XID=' . $lastTransId . ')=(SELECT COUNT(*) FROM' ! . ' dbmirror_MirrorHost)'; ! ! $deleteResult = $masterConn->exec($deleteTransactionQuery); ! if($deleteResult->resultStatus!=PGRES_COMMAND_OK) { ! logErrorMessage($masterConn->errorMessage . "\n" . ! $deleteTransactionQuery); ! die; } + + } *************** *** 888,891 **** --- 941,1010 ---- } #while return %valuesHash; + } + + + sub openTransactionFile($$) + { + my $slaveInfo = shift; + my $XID =shift; + # my $now_str = localtime; + my $nowsec; + my $nowmin; + my $nowhour; + my $nowmday; + my $nowmon; + my $nowyear; + my $nowwday; + my $nowyday; + my $nowisdst; + ($nowsec,$nowmin,$nowhour,$nowmday,$nowmon,$nowyear,$nowwday,$nowyday,$nowisdst) = + localtime; + my $fileName=sprintf(">%s/%s_%d-%d-%d_%d:%d:%dXID%d.sql", $::slaveInfo->{'TransactionFileDirectory'}, + $::slaveInfo->{"MirrorHostId"},($nowyear+1900),($nowmon+1),$nowmday,$nowhour,$nowmin, + $nowsec,$XID); + + my $xfile; + open($xfile,$fileName) or die "Can't open $fileName : $!"; + + $slaveInfo->{'TransactionFile'} = $xfile; + $slaveInfo->{'status'} = 'FileOpen'; + } + + + + sub openSlaveConnection($) { + my $slavePtr = $_[0]; + my $slaveConn; + + + my $slaveConnString; + if(defined($slavePtr->{"slaveHost"})) + { + $slaveConnString .= "host=" . $slavePtr->{"slaveHost"} . " "; + } + if(defined($slavePtr->{"slavePort"})) + { + $slaveConnString .= "port=" . $slavePtr->{"slavePort"} . " "; + } + + $slaveConnString .= " dbname=" . $slavePtr->{"slaveDb"}; + $slaveConnString .= " user=" . $slavePtr->{"slaveUser"}; + $slaveConnString .= " password=" . $slavePtr->{"slavePassword"}; + + $slaveConn = Pg::connectdb($slaveConnString); + + if($slaveConn->status != PGRES_CONNECTION_OK) { + my $errorMessage = "Can't connect to slave database " ; + $errorMessage .= $slavePtr->{"slaveHost"} . "\n"; + $errorMessage .= $slaveConn->errorMessage; + logErrorMessage($errorMessage); + $slavePtr->{"status"} = 'DBFailed'; + } + else { + $slavePtr->{"slaveConn"} = $slaveConn; + $slavePtr->{"status"} = 'DBOpen'; + } + + } Index: MirrorSetup.sql =================================================================== RCS file: /projects/cvsroot/pgsql-server/contrib/dbmirror/MirrorSetup.sql,v retrieving revision 1.5 diff -c -r1.5 MirrorSetup.sql *** MirrorSetup.sql 14 May 2003 03:25:55 -0000 1.5 --- MirrorSetup.sql 28 Jan 2004 00:50:32 -0000 *************** *** 1,43 **** CREATE FUNCTION "recordchange" () RETURNS trigger AS ! '/usr/local/pgsql/lib/pending.so', 'recordchange' LANGUAGE 'C'; - CREATE TABLE "MirrorHost" ( - "MirrorHostId" serial, - "HostName" varchar NOT NULL, - PRIMARY KEY("MirrorHostId") - ); - CREATE TABLE "Pending" ( - "SeqId" serial, - "TableName" varchar NOT NULL, - "Op" character, - "XID" int4 NOT NULL, - PRIMARY KEY ("SeqId") ); ! CREATE INDEX "Pending_XID_Index" ON "Pending" ("XID"); ! CREATE TABLE "PendingData" ( ! "SeqId" int4 NOT NULL, ! "IsKey" bool NOT NULL, ! "Data" varchar, ! PRIMARY KEY ("SeqId", "IsKey") , ! FOREIGN KEY ("SeqId") REFERENCES "Pending" ("SeqId") ON UPDATE CASCADE ON DELETE CASCADE ); ! CREATE TABLE "MirroredTransaction" ( ! "XID" int4 NOT NULL, ! "LastSeqId" int4 NOT NULL, ! "MirrorHostId" int4 NOT NULL, ! PRIMARY KEY ("XID","MirrorHostId"), ! FOREIGN KEY ("MirrorHostId") REFERENCES "MirrorHost" ("MirrorHostId") ON UPDATE CASCADE ON DELETE CASCADE, ! FOREIGN KEY ("LastSeqId") REFERENCES "Pending" ("SeqId") ON UPDATE CASCADE ON DELETE CASCADE ); --- 1,61 ---- + BEGIN; + + SET autocommit TO 'on'; CREATE FUNCTION "recordchange" () RETURNS trigger AS ! '$libdir/pending.so', 'recordchange' LANGUAGE 'C'; ! + CREATE TABLE dbmirror_MirrorHost ( + MirrorHostId serial not null, + SlaveName varchar NOT NULL, + PRIMARY KEY(MirrorHostId) + ); + CREATE TABLE dbmirror_Pending ( + SeqId serial, + TableName Name NOT NULL, + Op character, + XID int4 NOT NULL, + PRIMARY KEY (SeqId) ); ! CREATE INDEX "dbmirror_Pending_XID_Index" ON dbmirror_Pending (XID); ! CREATE TABLE dbmirror_PendingData ( ! SeqId int4 NOT NULL, ! IsKey bool NOT NULL, ! Data varchar, ! PRIMARY KEY (SeqId, IsKey) , ! FOREIGN KEY (SeqId) REFERENCES dbmirror_Pending (SeqId) ON UPDATE CASCADE ON DELETE CASCADE ); ! CREATE TABLE dbmirror_MirroredTransaction ( ! XID int4 NOT NULL, ! LastSeqId int4 NOT NULL, ! MirrorHostId int4 NOT NULL, ! PRIMARY KEY (XID,MirrorHostId), ! FOREIGN KEY (MirrorHostId) REFERENCES dbmirror_MirrorHost (MirrorHostId) ON UPDATE CASCADE ON DELETE CASCADE, ! FOREIGN KEY (LastSeqId) REFERENCES dbmirror_Pending (SeqId) ON UPDATE CASCADE ON DELETE CASCADE ); + + + UPDATE pg_proc SET proname='nextval_pg' WHERE proname='nextval'; + + CREATE FUNCTION pg_catalog.nextval(text) RETURNS int8 AS + '/usr/local/postgresql-7.4/lib/pending.so', 'nextval' LANGUAGE 'C' STRICT; + + + UPDATE pg_proc set proname='setval_pg' WHERE proname='setval'; + + CREATE FUNCTION pg_catalog.setval(text,int4) RETURNS int8 AS + '/usr/local/postgresql-7.4/lib/pending.so', 'setval' LANGUAGE 'C' STRICT; + + COMMIT; \ No newline at end of file Index: README.dbmirror =================================================================== RCS file: /projects/cvsroot/pgsql-server/contrib/dbmirror/README.dbmirror,v retrieving revision 1.7 diff -c -r1.7 README.dbmirror *** README.dbmirror 27 Oct 2003 02:52:45 -0000 1.7 --- README.dbmirror 28 Jan 2004 00:50:33 -0000 *************** *** 6,12 **** database Written and maintained by Steven Singer(ssinger@navtechinc.com) ! (c) 2001-2002 Navtech Systems Support Inc. ALL RIGHTS RESERVED Permission to use, copy, modify, and distribute this software and its --- 6,12 ---- database Written and maintained by Steven Singer(ssinger@navtechinc.com) ! (c) 2001-2004 Navtech Systems Support Inc. ALL RIGHTS RESERVED Permission to use, copy, modify, and distribute this software and its *************** *** 57,63 **** Requirments: --------------------------------- -PostgreSQL-7.4 (Older versions are no longer supported) ! -Perl 5.6(Other versions might work) -PgPerl (http://gborg.postgresql.org/project/pgperl/projdisplay.php) --- 57,63 ---- Requirments: --------------------------------- -PostgreSQL-7.4 (Older versions are no longer supported) ! -Perl 5.6 or 5.8 (Other versions might work) -PgPerl (http://gborg.postgresql.org/project/pgperl/projdisplay.php) *************** *** 81,93 **** You should now have a file named pending.so that contains the trigger. ! Install this file in /usr/local/pgsql/lib (or another suitable location). - If you choose a different location the MirrorSetup.sql script will need - to be modified to reflect your new location. The CREATE FUNCTION command - in the MirrorSetup.sql script associates the trigger function with the - pending.so shared library. Modify the arguments to this command if you - choose to install the trigger elsewhere. 2) Run MirrorSetup.sql --- 81,88 ---- You should now have a file named pending.so that contains the trigger. ! Install this file in your Postgresql lib directory (/usr/local/pgsql/lib) 2) Run MirrorSetup.sql *************** *** 95,101 **** This includes -Telling PostgreSQL about the "recordchange" trigger function. ! -Creating the Pending,PendingData, MirrorHost, MirroredTransaction tables To execute the script use psql as follows --- 90,97 ---- This includes -Telling PostgreSQL about the "recordchange" trigger function. ! -Creating the dbmirror_Pending,dbmirror_PendingData,dbmirror_MirrorHost, ! dbmirror_MirroredTransaction tables To execute the script use psql as follows *************** *** 114,130 **** The master settings refer to the master database(The one that is being mirrored). ! The slave settings refer to the database that the data is being mirrored to. ! The slaveHost parameter must refer to the machine name of the slave (Either ! a resolvable hostname or an IP address). The value for slave host ! must match the Hostname field in the MirrorHost table(See step 6). ! ! The master user must have sufficient permissions to modify the Pending ! tables and to read all of the tables being mirrored. ! ! The slave user must have enough permissions on the slave database to ! modify(INSERT,UPDATE,DELETE) any tables on the slave system that are being ! mirrored. 4) Add the trigger to tables. --- 110,143 ---- The master settings refer to the master database(The one that is being mirrored). ! The slave settings refer to the database that the data is being ! mirrored to. ! ! The slaveName setting in the configuration file must match the slave ! name specified in the dbmirror_MirrorHost table. ! ! DBMirror.pl can be run in two modes of operation: ! ! A) It can connect directly to the slave database. To do this specify ! a slave database name and optional host and port along with a username ! and password. See slaveDatabase.conf for details. ! ! ! The master user must have sufficient permissions to modify the Pending ! tables and to read all of the tables being mirrored. ! ! The slave user must have enough permissions on the slave database to ! modify(INSERT,UPDATE,DELETE) any tables on the slave system that are being ! mirrored. ! ! B) The SQL statements that should be executed on the slave can be ! written to files which can then be executed slave database through ! psql. This would be suitable for setups where their is no direct ! connection between the slave database and the master. A file is ! generated for each transaction in the directory specified by ! TransactionFileDirectory. The file name contains the date/time the ! file was created along with the transaction id. ! 4) Add the trigger to tables. *************** *** 153,159 **** slaveHost variable for that slave in the configuration file. For example ! INSERT INTO "MirrorHost" ("HostName") VALUES ('mySlaveMachine.mycompany.com'); 6) Start DBMirror.pl --- 166,172 ---- slaveHost variable for that slave in the configuration file. For example ! INSERT INTO "MirrorHost" ("SlaveName") VALUES ('backup_system'); 6) Start DBMirror.pl *************** *** 171,177 **** the configuration file. DBMirror can be run from the master, the slave, or a third machine as long ! as it is able to access both the master and slave databases. 7) Periodically run clean_pending.pl clean_pending.pl cleans out any entries from the Pending tables that --- 184,191 ---- the configuration file. DBMirror can be run from the master, the slave, or a third machine as long ! as it is able to access both the master and slave databases(not ! required if SQL files are being generated) 7) Periodically run clean_pending.pl clean_pending.pl cleans out any entries from the Pending tables that *************** *** 194,202 **** ---------- -Support for selective mirroring based on the content of data. -Support for BLOB's. ! -Support for conflict resolution. ! -Batching SQL commands in DBMirror for better performance over WAN's. -Better support for dealing with Schema changes. Steven Singer --- 208,233 ---- ---------- -Support for selective mirroring based on the content of data. -Support for BLOB's. ! -Support for multi-master mirroring with conflict resolution. -Better support for dealing with Schema changes. + + + + Significant Changes Since 7.4 + ---------------- + -Support for mirroring SEQUENCE's + -Support for unix domain sockets + -Support for outputting slave SQL statements to a file + -Changed the names of replication tables are now named + dbmirror_pending etc.. + + + + Credits + ----------- + Achilleus Mantzios + + Steven Singer Index: pending.c =================================================================== RCS file: /projects/cvsroot/pgsql-server/contrib/dbmirror/pending.c,v retrieving revision 1.15 diff -c -r1.15 pending.c *** pending.c 29 Nov 2003 22:39:19 -0000 1.15 --- pending.c 28 Jan 2004 00:50:33 -0000 *************** *** 1,5 **** --- 1,6 ---- /**************************************************************************** * pending.c + * $Id: pending.c,v 1.10 2004/01/19 00:09:11 ssinger Exp $ * $PostgreSQL: pgsql-server/contrib/dbmirror/pending.c,v 1.15 2003/11/29 22:39:19 pgsql Exp $ * * This file contains a trigger for Postgresql-7.x to record changes to tables *************** *** 34,68 **** #include #include #include enum FieldUsage { PRIMARY = 0, NONPRIMARY, ALL, NUM_FIELDUSAGE }; int storePending(char *cpTableName, HeapTuple tBeforeTuple, ! HeapTuple tAfterTuple, ! TupleDesc tTupdesc, ! TriggerData *tpTrigdata, char cOp); int storeKeyInfo(char *cpTableName, HeapTuple tTupleData, TupleDesc tTuplDesc, ! TriggerData *tpTrigdata); ! int storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc, ! TriggerData *tpTrigData, int iIncludeKeyData); int2vector *getPrimaryKey(Oid tblOid); ! char *packageData(HeapTuple tTupleData, TupleDesc tTupleDecs, ! TriggerData *tTrigData, enum FieldUsage eKeyUsage); #define BUFFER_SIZE 256 #define MAX_OID_LEN 10 ! /*#define DEBUG_OUTPUT 1 */ extern Datum recordchange(PG_FUNCTION_ARGS); PG_FUNCTION_INFO_V1(recordchange); /***************************************************************************** * The entry point for the trigger function. * The Trigger takes a single SQL 'text' argument indicating the name of the --- 35,94 ---- #include #include #include + #include + enum FieldUsage { PRIMARY = 0, NONPRIMARY, ALL, NUM_FIELDUSAGE }; int storePending(char *cpTableName, HeapTuple tBeforeTuple, ! HeapTuple tAfterTuple, ! TupleDesc tTupdesc, ! Oid tableOid, ! char cOp); ! ! int storeKeyInfo(char *cpTableName, HeapTuple tTupleData, TupleDesc tTuplDesc, ! Oid tableOid); ! int storeData(char *cpTableName, HeapTuple tTupleData, ! TupleDesc tTupleDesc,Oid tableOid,int iIncludeKeyData); int2vector *getPrimaryKey(Oid tblOid); ! char *packageData(HeapTuple tTupleData, TupleDesc tTupleDecs, Oid tableOid, enum FieldUsage eKeyUsage); + #define BUFFER_SIZE 256 #define MAX_OID_LEN 10 ! #define DEBUG_OUTPUT 1 extern Datum recordchange(PG_FUNCTION_ARGS); PG_FUNCTION_INFO_V1(recordchange); + #if defined DEBUG_OUTPUT + #define debug_msg2(x,y) elog(NOTICE,x,y) + #define debug_msg(x) elog(NOTICE,x) + #define debug_msg3(x,y,z) elog(NOTICE,x,y,z) + #else + #define debug_msg2(x,y) + #define debug_msg(x) + #define debug_msg(x,y,z) + + #endif + + + + extern Datum nextval(PG_FUNCTION_ARGS); + extern Datum setval(PG_FUNCTION_ARGS); + + int saveSequenceUpdate(const text * sequenceName, + int nextSequenceValue); + + /***************************************************************************** * The entry point for the trigger function. * The Trigger takes a single SQL 'text' argument indicating the name of the *************** *** 81,93 **** char op = 0; char *schemaname; char *fullyqualtblname; if (fcinfo->context != NULL) { if (SPI_connect() < 0) { ! elog(NOTICE, "storePending could not connect to SPI"); return -1; } trigdata = (TriggerData *) fcinfo->context; --- 107,121 ---- char op = 0; char *schemaname; char *fullyqualtblname; + char *pkxpress=NULL; if (fcinfo->context != NULL) { if (SPI_connect() < 0) { ! ereport(ERROR,(errcode(ERRCODE_CONNECTION_FAILURE), ! errmsg("dbmirror:recordchange could not connect to SPI"))); return -1; } trigdata = (TriggerData *) fcinfo->context; *************** *** 124,131 **** beforeTuple = trigdata->tg_trigtuple; op = 'd'; } ! if (storePending(fullyqualtblname, beforeTuple, afterTuple, tupdesc, trigdata, op)) { /* An error occoured. Skip the operation. */ ereport(ERROR, --- 152,166 ---- beforeTuple = trigdata->tg_trigtuple; op = 'd'; } + else + { + ereport(ERROR,(errcode(ERRCODE_TRIGGERED_ACTION_EXCEPTION), + errmsg("dbmirror:recordchange Unknown operation"))); + + } ! if (storePending(fullyqualtblname, beforeTuple, afterTuple, ! tupdesc, retTuple->t_tableOid, op)) { /* An error occoured. Skip the operation. */ ereport(ERROR, *************** *** 135,144 **** return PointerGetDatum(NULL); } ! #if defined DEBUG_OUTPUT ! elog(NOTICE, "returning on success"); ! #endif SPI_pfree(fullyqualtblname); SPI_finish(); return PointerGetDatum(retTuple); } --- 170,180 ---- return PointerGetDatum(NULL); } ! debug_msg("dbmirror:recordchange returning on success"); ! SPI_pfree(fullyqualtblname); + if(pkxpress != NULL) + SPI_pfree(pkxpress); SPI_finish(); return PointerGetDatum(retTuple); } *************** *** 160,200 **** storePending(char *cpTableName, HeapTuple tBeforeTuple, HeapTuple tAfterTuple, TupleDesc tTupDesc, ! TriggerData *tpTrigData, char cOp) { ! char *cpQueryBase = "INSERT INTO \"Pending\" (\"TableName\",\"Op\",\"XID\") VALUES ($1,$2,$3)"; int iResult = 0; HeapTuple tCurTuple; /* Points the current tuple(before or after) */ ! Datum saPlanData[4]; ! Oid taPlanArgTypes[3] = {NAMEOID, CHAROID, INT4OID}; void *vpPlan; tCurTuple = tBeforeTuple ? tBeforeTuple : tAfterTuple; - vpPlan = SPI_prepare(cpQueryBase, 3, taPlanArgTypes); if (vpPlan == NULL) ! elog(NOTICE, "error creating plan"); ! /* SPI_saveplan(vpPlan); */ saPlanData[0] = PointerGetDatum(cpTableName); saPlanData[1] = CharGetDatum(cOp); saPlanData[2] = Int32GetDatum(GetCurrentTransactionId()); ! ! iResult = SPI_execp(vpPlan, saPlanData, NULL, 1); if (iResult < 0) ! elog(NOTICE, "storedPending fired (%s) returned %d", cpQueryBase, iResult); ! #if defined DEBUG_OUTPUT ! elog(NOTICE, "row successfully stored in pending table"); ! #endif if (cOp == 'd') { --- 196,240 ---- storePending(char *cpTableName, HeapTuple tBeforeTuple, HeapTuple tAfterTuple, TupleDesc tTupDesc, ! Oid tableOid, ! char cOp) { ! char *cpQueryBase = "INSERT INTO dbmirror_pending (TableName,Op,XID) VALUES ($1,$2,$3)"; int iResult = 0; HeapTuple tCurTuple; + char nulls[3]=" "; /* Points the current tuple(before or after) */ ! Datum saPlanData[3]; ! Oid taPlanArgTypes[4] = {NAMEOID, ! CHAROID, ! INT4OID}; void *vpPlan; tCurTuple = tBeforeTuple ? tBeforeTuple : tAfterTuple; vpPlan = SPI_prepare(cpQueryBase, 3, taPlanArgTypes); if (vpPlan == NULL) ! ereport(ERROR,(errcode(ERRCODE_TRIGGERED_ACTION_EXCEPTION), ! errmsg("dbmirror:storePending error creating plan"))); ! saPlanData[0] = PointerGetDatum(cpTableName); saPlanData[1] = CharGetDatum(cOp); saPlanData[2] = Int32GetDatum(GetCurrentTransactionId()); ! iResult = SPI_execp(vpPlan, saPlanData, nulls, 1); if (iResult < 0) ! elog(NOTICE, "storedPending fired (%s) returned %d", ! cpQueryBase, iResult); ! ! debug_msg("dbmirror:storePending row successfully stored in pending table"); ! if (cOp == 'd') { *************** *** 202,208 **** * This is a record of a delete operation. * Just store the key data. */ ! iResult = storeKeyInfo(cpTableName, tBeforeTuple, tTupDesc, tpTrigData); } else if (cOp == 'i') { --- 242,249 ---- * This is a record of a delete operation. * Just store the key data. */ ! iResult = storeKeyInfo(cpTableName, ! tBeforeTuple, tTupDesc, tableOid); } else if (cOp == 'i') { *************** *** 210,229 **** * An Insert operation. * Store all data */ ! iResult = storeData(cpTableName, tAfterTuple, tTupDesc, tpTrigData, TRUE); } else { /* op must be an update. */ ! iResult = storeKeyInfo(cpTableName, tBeforeTuple, tTupDesc, tpTrigData); ! iResult = iResult ? iResult : storeData(cpTableName, tAfterTuple, tTupDesc, ! tpTrigData, TRUE); } ! #if defined DEBUG_OUTPUT ! elog(NOTICE, "done storing keyinfo"); ! #endif return iResult; --- 251,272 ---- * An Insert operation. * Store all data */ ! iResult = storeData(cpTableName, tAfterTuple, ! tTupDesc, tableOid,TRUE); } else { /* op must be an update. */ ! iResult = storeKeyInfo(cpTableName, tBeforeTuple, ! tTupDesc, tableOid); ! iResult = iResult ? iResult : ! storeData(cpTableName, tAfterTuple, tTupDesc, ! tableOid,TRUE); } ! ! debug_msg("dbmirror:storePending done storing keyinfo"); return iResult; *************** *** 231,242 **** int storeKeyInfo(char *cpTableName, HeapTuple tTupleData, ! TupleDesc tTupleDesc, ! TriggerData *tpTrigData) { Oid saPlanArgTypes[1] = {NAMEOID}; ! char *insQuery = "INSERT INTO \"PendingData\" (\"SeqId\",\"IsKey\",\"Data\") VALUES(currval('\"Pending_SeqId_seq\"'),'t',$1)"; void *pplan; Datum saPlanData[1]; char *cpKeyData; --- 274,284 ---- int storeKeyInfo(char *cpTableName, HeapTuple tTupleData, ! TupleDesc tTupleDesc, Oid tableOid) { Oid saPlanArgTypes[1] = {NAMEOID}; ! char *insQuery = "INSERT INTO dbmirror_pendingdata (SeqId,IsKey,Data) VALUES(currval('dbmirror_pending_seqid_seq'),'t',$1)"; void *pplan; Datum saPlanData[1]; char *cpKeyData; *************** *** 250,256 **** } /* pplan = SPI_saveplan(pplan); */ ! cpKeyData = packageData(tTupleData, tTupleDesc, tpTrigData, PRIMARY); if (cpKeyData == NULL) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), --- 292,298 ---- } /* pplan = SPI_saveplan(pplan); */ ! cpKeyData = packageData(tTupleData, tTupleDesc,tableOid, PRIMARY); if (cpKeyData == NULL) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), *************** *** 258,266 **** errmsg("there is no PRIMARY KEY for table %s", cpTableName))); ! #if defined DEBUG_OUTPUT ! elog(NOTICE, "key data: %s", cpKeyData); ! #endif saPlanData[0] = PointerGetDatum(cpKeyData); iRetCode = SPI_execp(pplan, saPlanData, NULL, 1); --- 300,308 ---- errmsg("there is no PRIMARY KEY for table %s", cpTableName))); ! ! debug_msg2("dbmirror:storeKeyInfo key data: %s", cpKeyData); ! saPlanData[0] = PointerGetDatum(cpKeyData); iRetCode = SPI_execp(pplan, saPlanData, NULL, 1); *************** *** 270,281 **** if (iRetCode != SPI_OK_INSERT) { ! elog(NOTICE, "error inserting row in pendingDelete"); return -1; } ! #if defined DEBUG_OUTPUT ! elog(NOTICE, "insert successful"); ! #endif return 0; --- 312,323 ---- if (iRetCode != SPI_OK_INSERT) { ! ereport(ERROR,(errcode(ERRCODE_TRIGGERED_ACTION_EXCEPTION) ! ,errmsg("error inserting row in pendingDelete"))); return -1; } ! ! debug_msg("insert successful"); return 0; *************** *** 318,329 **** * Stores a copy of the non-key data for the row. *****************************************************************************/ int ! storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc, ! TriggerData *tpTrigData, int iIncludeKeyData) { Oid planArgTypes[1] = {NAMEOID}; ! char *insQuery = "INSERT INTO \"PendingData\" (\"SeqId\",\"IsKey\",\"Data\") VALUES(currval('\"Pending_SeqId_seq\"'),'f',$1)"; void *pplan; Datum planData[1]; char *cpKeyData; --- 360,371 ---- * Stores a copy of the non-key data for the row. *****************************************************************************/ int ! storeData(char *cpTableName, HeapTuple tTupleData, ! TupleDesc tTupleDesc,Oid tableOid, int iIncludeKeyData) { Oid planArgTypes[1] = {NAMEOID}; ! char *insQuery = "INSERT INTO dbmirror_pendingdata (SeqId,IsKey,Data) VALUES(currval('dbmirror_pending_seqid_seq'),'f',$1)"; void *pplan; Datum planData[1]; char *cpKeyData; *************** *** 338,346 **** /* pplan = SPI_saveplan(pplan); */ if (iIncludeKeyData == 0) ! cpKeyData = packageData(tTupleData, tTupleDesc, tpTrigData, NONPRIMARY); else ! cpKeyData = packageData(tTupleData, tTupleDesc, tpTrigData, ALL); planData[0] = PointerGetDatum(cpKeyData); iRetValue = SPI_execp(pplan, planData, NULL, 1); --- 380,389 ---- /* pplan = SPI_saveplan(pplan); */ if (iIncludeKeyData == 0) ! cpKeyData = packageData(tTupleData, tTupleDesc, ! tableOid, NONPRIMARY); else ! cpKeyData = packageData(tTupleData, tTupleDesc,tableOid, ALL); planData[0] = PointerGetDatum(cpKeyData); iRetValue = SPI_execp(pplan, planData, NULL, 1); *************** *** 353,361 **** elog(NOTICE, "error inserting row in pendingDelete"); return -1; } ! #if defined DEBUG_OUTPUT ! elog(NOTICE, "insert successful"); ! #endif return 0; --- 396,404 ---- elog(NOTICE, "error inserting row in pendingDelete"); return -1; } ! ! debug_msg("dbmirror:storeKeyData insert successful"); ! return 0; *************** *** 376,383 **** * ALL implies include all fields. */ char * ! packageData(HeapTuple tTupleData, TupleDesc tTupleDesc, ! TriggerData *tpTrigData, enum FieldUsage eKeyUsage) { int iNumCols; --- 419,425 ---- * ALL implies include all fields. */ char * ! packageData(HeapTuple tTupleData, TupleDesc tTupleDesc, Oid tableOid, enum FieldUsage eKeyUsage) { int iNumCols; *************** *** 391,404 **** if (eKeyUsage != ALL) { ! tpPKeys = getPrimaryKey(tpTrigData->tg_relation->rd_id); if (tpPKeys == NULL) return NULL; } ! #if defined DEBUG_OUTPUT if (tpPKeys != NULL) ! elog(NOTICE, "have primary keys"); ! #endif cpDataBlock = SPI_palloc(BUFFER_SIZE); iDataBlockSize = BUFFER_SIZE; iUsedDataBlock = 0; /* To account for the null */ --- 433,449 ---- if (eKeyUsage != ALL) { ! tpPKeys = getPrimaryKey(tableOid); if (tpPKeys == NULL) return NULL; } ! if (tpPKeys != NULL) ! { ! debug_msg("dbmirror:packageData have primary keys"); ! ! } ! cpDataBlock = SPI_palloc(BUFFER_SIZE); iDataBlockSize = BUFFER_SIZE; iUsedDataBlock = 0; /* To account for the null */ *************** *** 417,465 **** { /* Determine if this is a primary key or not. */ iIsPrimaryKey = 0; ! for (iPrimaryKeyIndex = 0; (*tpPKeys)[iPrimaryKeyIndex] != 0; iPrimaryKeyIndex++) { ! if ((*tpPKeys)[iPrimaryKeyIndex] == iColumnCounter) { iIsPrimaryKey = 1; break; } } ! if (iIsPrimaryKey ? (eKeyUsage != PRIMARY) : (eKeyUsage != NONPRIMARY)) { /** * Don't use. */ ! #if defined DEBUG_OUTPUT ! elog(NOTICE, "skipping column"); ! #endif continue; } } /* KeyUsage!=ALL */ ! #ifndef NODROPCOLUMN ! if(tTupleDesc->attrs[iColumnCounter-1]->attisdropped) ! { ! /** ! * This column has been dropped. ! * Do not mirror it. ! */ ! continue; ! } ! #endif ! cpFieldName = DatumGetPointer(NameGetDatum(&tTupleDesc->attrs ! [iColumnCounter - 1]->attname)); ! #if defined DEBUG_OUTPUT ! elog(NOTICE, "field name: %s", cpFieldName); ! #endif ! while (iDataBlockSize - iUsedDataBlock < strlen(cpFieldName) + 6) ! { ! cpDataBlock = SPI_repalloc(cpDataBlock, iDataBlockSize + BUFFER_SIZE); iDataBlockSize = iDataBlockSize + BUFFER_SIZE; } sprintf(cpDataBlock + iUsedDataBlock, "\"%s\"=", cpFieldName); iUsedDataBlock = iUsedDataBlock + strlen(cpFieldName) + 3; ! cpFieldData = SPI_getvalue(tTupleData, tTupleDesc, iColumnCounter); cpUnFormatedPtr = cpFieldData; cpFormatedPtr = cpDataBlock + iUsedDataBlock; --- 462,519 ---- { /* Determine if this is a primary key or not. */ iIsPrimaryKey = 0; ! for (iPrimaryKeyIndex = 0; ! (*tpPKeys)[iPrimaryKeyIndex] != 0; iPrimaryKeyIndex++) { ! if ((*tpPKeys)[iPrimaryKeyIndex] ! == iColumnCounter) { iIsPrimaryKey = 1; break; } } ! if (iIsPrimaryKey ? (eKeyUsage != PRIMARY) : ! (eKeyUsage != NONPRIMARY)) { /** * Don't use. */ ! ! debug_msg("dbmirror:packageData skipping column"); ! continue; } } /* KeyUsage!=ALL */ ! ! if(tTupleDesc->attrs[iColumnCounter-1]->attisdropped) ! { ! /** ! * This column has been dropped. ! * Do not mirror it. ! */ ! continue; ! } ! ! cpFieldName = DatumGetPointer(NameGetDatum ! ! (&tTupleDesc->attrs ! [iColumnCounter - 1]->attname)); ! ! debug_msg2("dbmirror:packageData field name: %s", cpFieldName); ! ! while (iDataBlockSize - iUsedDataBlock < ! strlen(cpFieldName) + 6) ! { ! cpDataBlock = SPI_repalloc(cpDataBlock, ! iDataBlockSize + ! BUFFER_SIZE); iDataBlockSize = iDataBlockSize + BUFFER_SIZE; } sprintf(cpDataBlock + iUsedDataBlock, "\"%s\"=", cpFieldName); iUsedDataBlock = iUsedDataBlock + strlen(cpFieldName) + 3; ! cpFieldData = SPI_getvalue(tTupleData, tTupleDesc, ! iColumnCounter); cpUnFormatedPtr = cpFieldData; cpFormatedPtr = cpDataBlock + iUsedDataBlock; *************** *** 477,491 **** continue; } ! #if defined DEBUG_OUTPUT ! elog(NOTICE, "field data: \"%s\"", cpFieldData); ! elog(NOTICE, "starting format loop"); ! #endif while (*cpUnFormatedPtr != 0) { while (iDataBlockSize - iUsedDataBlock < 2) { ! cpDataBlock = SPI_repalloc(cpDataBlock, iDataBlockSize + BUFFER_SIZE); iDataBlockSize = iDataBlockSize + BUFFER_SIZE; cpFormatedPtr = cpDataBlock + iUsedDataBlock; } --- 531,547 ---- continue; } ! debug_msg2("dbmirror:packageData field data: \"%s\"", ! cpFieldData); ! debug_msg("dbmirror:packageData starting format loop"); ! while (*cpUnFormatedPtr != 0) { while (iDataBlockSize - iUsedDataBlock < 2) { ! cpDataBlock = SPI_repalloc(cpDataBlock, ! iDataBlockSize ! + BUFFER_SIZE); iDataBlockSize = iDataBlockSize + BUFFER_SIZE; cpFormatedPtr = cpDataBlock + iUsedDataBlock; } *************** *** 505,529 **** while (iDataBlockSize - iUsedDataBlock < 3) { ! cpDataBlock = SPI_repalloc(cpDataBlock, iDataBlockSize + BUFFER_SIZE); iDataBlockSize = iDataBlockSize + BUFFER_SIZE; cpFormatedPtr = cpDataBlock + iUsedDataBlock; } sprintf(cpFormatedPtr, "' "); iUsedDataBlock = iUsedDataBlock + 2; ! #if defined DEBUG_OUTPUT ! elog(NOTICE, "data block: \"%s\"", cpDataBlock); ! #endif } /* for iColumnCounter */ if (tpPKeys != NULL) SPI_pfree(tpPKeys); ! #if defined DEBUG_OUTPUT ! elog(NOTICE, "returning DataBlockSize:%d iUsedDataBlock:%d", iDataBlockSize, ! iUsedDataBlock); ! #endif memset(cpDataBlock + iUsedDataBlock, 0, iDataBlockSize - iUsedDataBlock); return cpDataBlock; } --- 561,778 ---- while (iDataBlockSize - iUsedDataBlock < 3) { ! cpDataBlock = SPI_repalloc(cpDataBlock, ! iDataBlockSize + ! BUFFER_SIZE); iDataBlockSize = iDataBlockSize + BUFFER_SIZE; cpFormatedPtr = cpDataBlock + iUsedDataBlock; } sprintf(cpFormatedPtr, "' "); iUsedDataBlock = iUsedDataBlock + 2; ! ! debug_msg2("dbmirror:packageData data block: \"%s\"", ! cpDataBlock); } /* for iColumnCounter */ if (tpPKeys != NULL) SPI_pfree(tpPKeys); ! ! debug_msg3("dbmirror:packageData returning DataBlockSize:%d iUsedDataBlock:%d", ! iDataBlockSize, ! iUsedDataBlock); ! memset(cpDataBlock + iUsedDataBlock, 0, iDataBlockSize - iUsedDataBlock); return cpDataBlock; } + + + PG_FUNCTION_INFO_V1(setval); + + Datum setval(PG_FUNCTION_ARGS) + { + + + text * sequenceName; + + Oid setvalArgTypes[2] = {TEXTOID,INT4OID}; + int nextValue; + void * setvalPlan=NULL; + Datum setvalData[2]; + const char * setvalQuery = "SELECT setval_pg($1,$2)"; + int ret; + + sequenceName = PG_GETARG_TEXT_P(0); + nextValue = PG_GETARG_INT32(1); + + setvalData[0] = PointerGetDatum(sequenceName); + setvalData[1] = Int32GetDatum(nextValue); + + if (SPI_connect() < 0) + { + ereport(ERROR,(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION), + errmsg("dbmirror:setval could not connect to SPI"))); + return -1; + } + + setvalPlan = SPI_prepare(setvalQuery,2,setvalArgTypes); + if(setvalPlan == NULL) + { + ereport(ERROR,(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION), + errmsg("dbmirror:setval could not prepare plan"))); + return -1; + } + + ret = SPI_execp(setvalPlan,setvalData,NULL,1); + + if(ret != SPI_OK_SELECT || SPI_processed != 1) + return -1; + + debug_msg2("dbmirror:setval: setval_pg returned ok:%d",nextValue); + + ret = saveSequenceUpdate(sequenceName,nextValue); + + SPI_pfree(setvalPlan); + + SPI_finish(); + debug_msg("dbmirror:setval about to return"); + return Int64GetDatum(nextValue); + + } + + + + PG_FUNCTION_INFO_V1(nextval); + + Datum + nextval(PG_FUNCTION_ARGS) + { + text * sequenceName; + + const char * nextvalQuery = "SELECT nextval_pg($1)"; + Oid nextvalArgTypes[1] = {TEXTOID}; + void * nextvalPlan=NULL; + Datum nextvalData[1]; + + + int ret; + HeapTuple resTuple; + char isNull; + int nextSequenceValue; + + + + debug_msg("dbmirror:nextval Starting pending.so:nextval"); + + + sequenceName = PG_GETARG_TEXT_P(0); + + if (SPI_connect() < 0) + { + ereport(ERROR,(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION), + errmsg("dbmirror:nextval could not connect to SPI"))); + return -1; + } + + nextvalPlan = SPI_prepare(nextvalQuery,1,nextvalArgTypes); + + + debug_msg("prepared plan to call nextval_pg"); + + + if(nextvalPlan==NULL) + { + ereport(ERROR,(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION), + errmsg("dbmirror:nextval error creating plan"))); + return -1; + } + nextvalData[0] = PointerGetDatum(sequenceName); + + ret = SPI_execp(nextvalPlan,nextvalData,NULL,1); + + debug_msg("dbmirror:Executed call to nextval_pg"); + + + if(ret != SPI_OK_SELECT || SPI_processed != 1) + return -1; + + resTuple = SPI_tuptable->vals[0]; + + debug_msg("dbmirror:nextval Set resTuple"); + + nextSequenceValue =*(DatumGetPointer(SPI_getbinval(resTuple, + SPI_tuptable->tupdesc, + 1,&isNull))); + + + + debug_msg2("dbmirror:nextval Set SPI_getbinval:%d",nextSequenceValue); + + + saveSequenceUpdate(sequenceName,nextSequenceValue); + SPI_pfree(resTuple); + SPI_pfree(nextvalPlan); + + SPI_finish(); + + return Int64GetDatum(nextSequenceValue); + } + + + int + saveSequenceUpdate(const text * sequenceName, + int nextSequenceVal) + { + + Oid insertArgTypes[2] = {TEXTOID,INT4OID}; + Oid insertDataArgTypes[1] = {NAMEOID}; + void * insertPlan=NULL; + void * insertDataPlan=NULL; + Datum insertDatum[2]; + Datum insertDataDatum[1]; + char nextSequenceText[32]; + + const char * insertQuery = + "INSERT INTO dbmirror_Pending (TableName,Op,XID) VALUES" \ + "($1,'s',$2)"; + const char * insertDataQuery = + "INSERT INTO dbmirror_PendingData(SeqId,IsKey,Data) VALUES " \ + "(currval('dbmirror_pending_seqid_seq'),'t',$1)"; + + int ret; + + + insertPlan = SPI_prepare(insertQuery,2,insertArgTypes); + insertDataPlan = SPI_prepare(insertDataQuery,1,insertDataArgTypes); + + debug_msg("Prepared insert query"); + + + if(insertPlan == NULL || insertDataPlan == NULL) + { + ereport(ERROR,(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),errmsg("dbmirror:nextval error creating plan"))); + } + + insertDatum[1] = Int32GetDatum(GetCurrentTransactionId()); + insertDatum[0] = PointerGetDatum(sequenceName); + + sprintf(nextSequenceText,"%d",nextSequenceVal); + insertDataDatum[0] = PointerGetDatum(nextSequenceText); + debug_msg2("dbmirror:savesequenceupdate: Setting value %s", + nextSequenceText); + + debug_msg("dbmirror:About to execute insert query"); + + ret = SPI_execp(insertPlan,insertDatum,NULL,1); + + ret = SPI_execp(insertDataPlan,insertDataDatum,NULL,1); + + debug_msg("dbmirror:Insert query finished"); + SPI_pfree(insertPlan); + SPI_pfree(insertDataPlan); + + return ret; + + } +