Index: DBMirror.pl =================================================================== RCS file: /projects/cvsroot/pgsql-server/contrib/dbmirror/DBMirror.pl,v retrieving revision 1.6 diff -c -w -r1.6 DBMirror.pl *** DBMirror.pl 14 May 2003 03:25:55 -0000 1.6 --- DBMirror.pl 16 Nov 2003 00:05:06 -0000 *************** *** 79,88 **** sub mirrorInsert($$$$$); sub mirrorDelete($$$$$); sub mirrorUpdate($$$$$); - sub sendQueryToSlaves($$); sub logErrorMessage($); ! sub openSlaveConnection($); ! sub updateMirrorHostTable($$); sub extractData($$); local $::masterHost; local $::masterDb; --- 79,87 ---- sub mirrorInsert($$$$$); sub mirrorDelete($$$$$); sub mirrorUpdate($$$$$); sub logErrorMessage($); ! sub setupSlave($); ! sub updateMirrorHostTable($$$); sub extractData($$); local $::masterHost; local $::masterDb; *************** *** 115,122 **** die; } ! ! my $connectString = "host=$::masterHost dbname=$::masterDb user=$::masterUser password=$::masterPassword"; $masterConn = Pg::connectdb($connectString); --- 114,129 ---- die; } ! my $connectString; ! if(defined($::masterHost)) ! { ! $connectString .= "host=$::masterHost "; ! } ! if(defined($::masterPort)) ! { ! $connectString .= "port=$::masterPort "; ! } ! $connectString .= "dbname=$::masterDb user=$::masterUser password=$::masterPassword"; $masterConn = Pg::connectdb($connectString); *************** *** 141,170 **** sleep 60; } $firstTime = 0; - # Open up the connection to the slave. - if(! defined $::slaveInfo->{"status"} || - $::slaveInfo->{"status"} == -1) { - openSlaveConnection($::slaveInfo); - } - sendQueryToSlaves(undef,"SET TRANSACTION ISOLATION LEVEL SERIALIZABLE"); - sendQueryToSlaves(undef,"SET CONSTRAINTS ALL DEFERRED"); #Obtain a list of pending transactions using ordering by our approximation #to the commit time. The commit time approximation is taken to be the #SeqId of the last row edit in the transaction. ! my $pendingTransQuery = "SELECT pd.\"XID\",MAX(\"SeqId\") FROM \"Pending\" pd"; ! $pendingTransQuery .= " LEFT JOIN \"MirroredTransaction\" mt INNER JOIN"; ! $pendingTransQuery .= " \"MirrorHost\" mh ON mt.\"MirrorHostId\" = "; ! $pendingTransQuery .= " mh.\"MirrorHostId\" AND mh.\"HostName\"="; ! $pendingTransQuery .= " '$::slaveInfo->{\"slaveHost\"}' "; ! $pendingTransQuery .= " ON pd.\"XID\""; ! $pendingTransQuery .= " = mt.\"XID\" WHERE mt.\"XID\" is null "; ! $pendingTransQuery .= " GROUP BY pd.\"XID\" "; ! $pendingTransQuery .= " ORDER BY MAX(pd.\"SeqId\")"; my $pendingTransResults = $masterConn->exec($pendingTransQuery); --- 148,176 ---- sleep 60; } $firstTime = 0; + setupSlave($::slaveInfo); #Obtain a list of pending transactions using ordering by our approximation #to the commit time. The commit time approximation is taken to be the #SeqId of the last row edit in the transaction. ! my $pendingTransQuery = "SELECT pd.XID,MAX(SeqId),(pd.slaveid is null) as isexpuncond FROM dbmirror_Pending pd"; ! $pendingTransQuery .= " LEFT JOIN dbmirror_MirroredTransaction mt INNER JOIN"; ! $pendingTransQuery .= " dbmirror_MirrorHost mh ON mt.MirrorHostId = "; ! $pendingTransQuery .= " mh.MirrorHostId AND mh.SlaveName="; ! $pendingTransQuery .= " '$::slaveInfo->{\"slaveName\"}' "; ! $pendingTransQuery .= " ON pd.XID"; ! $pendingTransQuery .= " = mt.XID WHERE (mt.XID is null and pd.slaveid is null) "; ! if(defined($::slaveInfo->{'MirrorHostId'})) { ! ! $pendingTransQuery .= "or pd.slaveid = '$::slaveInfo->{\"MirrorHostId\"}' "; ! } ! ! $pendingTransQuery .= " GROUP BY pd.XID,pd.slaveid "; ! $pendingTransQuery .= " ORDER BY MAX(pd.SeqId)"; my $pendingTransResults = $masterConn->exec($pendingTransQuery); *************** *** 184,197 **** while($curTransTuple < $numPendingTrans) { my $XID = $pendingTransResults->getvalue($curTransTuple,0); my $maxSeqId = $pendingTransResults->getvalue($curTransTuple,1); my $seqId; - my $pendingQuery = "SELECT pnd.\"SeqId\",pnd.\"TableName\","; - $pendingQuery .= " pnd.\"Op\",pnddata.\"IsKey\", pnddata.\"Data\" AS \"Data\" "; - $pendingQuery .= " FROM \"Pending\" pnd, \"PendingData\" pnddata "; - $pendingQuery .= " WHERE pnd.\"SeqId\" = pnddata.\"SeqId\" AND "; ! $pendingQuery .= " pnd.\"XID\"=$XID ORDER BY \"SeqId\", \"IsKey\" DESC"; my $pendingResults = $masterConn->exec($pendingQuery); --- 190,215 ---- while($curTransTuple < $numPendingTrans) { my $XID = $pendingTransResults->getvalue($curTransTuple,0); my $maxSeqId = $pendingTransResults->getvalue($curTransTuple,1); + my $isexpuncond = $pendingTransResults->getvalue($curTransTuple,2); my $seqId; ! if($::slaveInfo->{'status'} eq 'FileClosed') ! { ! openTransactionFile($::slaveInfo,$XID); ! } ! ! ! ! my $pendingQuery = "SELECT pnd.SeqId,pnd.TableName,"; ! $pendingQuery .= " pnd.Op,pnddata.IsKey, pnddata.Data AS Data "; ! $pendingQuery .= " FROM dbmirror_Pending pnd, dbmirror_PendingData pnddata "; ! $pendingQuery .= " WHERE pnd.SeqId = pnddata.SeqId "; ! if(defined($::slaveInfo->{'MirrorHostId'})) { ! $pendingQuery .= " AND (pnd.slaveid is null or pnd.slaveid= '$::slaveInfo->{\"MirrorHostId\"}' ) "; ! } ! ! $pendingQuery .= " AND pnd.XID=$XID ORDER BY SeqId, IsKey DESC"; my $pendingResults = $masterConn->exec($pendingQuery); *************** *** 200,210 **** die; } ! my $numPending = $pendingResults->ntuples; my $curTuple = 0; - sendQueryToSlaves(undef,"BEGIN"); while ($curTuple < $numPending) { $seqId = $pendingResults->getvalue($curTuple,0); my $tableName = $pendingResults->getvalue($curTuple,1); --- 218,227 ---- die; } ! sendQueryToSlaves($XID,"BEGIN"); my $numPending = $pendingResults->ntuples; my $curTuple = 0; while ($curTuple < $numPending) { $seqId = $pendingResults->getvalue($curTuple,0); my $tableName = $pendingResults->getvalue($curTuple,1); *************** *** 212,239 **** $curTuple = mirrorCommand($seqId,$tableName,$op,$XID, $pendingResults,$curTuple) +1; - if($::slaveInfo->{"status"}==-1) { - last; - } } ! #Now commit the transaction. ! if($::slaveInfo->{"status"}==-1) { last; } sendQueryToSlaves(undef,"COMMIT"); ! updateMirrorHostTable($XID,$seqId); if($commandCount > 5000) { $commandCount = 0; ! $::slaveInfo->{"status"} = -1; $::slaveInfo->{"slaveConn"}->reset; #Open the connection right away. openSlaveConnection($::slaveInfo); } - $pendingResults = undef; - $curTransTuple = $curTransTuple +1; }#while transactions left. $pendingTransResults = undef; --- 229,265 ---- $curTuple = mirrorCommand($seqId,$tableName,$op,$XID, $pendingResults,$curTuple) +1; } ! ! if($::slaveInfo->{'status'} ne 'DBOpen' && ! $::slaveInfo->{'status'} ne 'FileOpen') ! { last; } sendQueryToSlaves(undef,"COMMIT"); ! #Now commit the transaction. ! updateMirrorHostTable($XID,$seqId,$isexpuncond); ! ! $pendingResults = undef; ! $curTransTuple = $curTransTuple +1; ! ! if($::slaveInfo->{'status'} eq 'FileOpen') ! { ! close ($::slaveInfo->{'TransactionFile'}); ! } ! elsif($::slaveInfo->{'status'} eq 'DBOpen') ! { if($commandCount > 5000) { $commandCount = 0; ! $::slaveInfo->{"status"} = 'DBClosed'; $::slaveInfo->{"slaveConn"}->reset; #Open the connection right away. openSlaveConnection($::slaveInfo); } + } }#while transactions left. $pendingTransResults = undef; *************** *** 303,308 **** --- 329,335 ---- my $pendingResults = $_[4]; my $currentTuple = $_[5]; + if($op eq 'i') { $currentTuple = mirrorInsert($seqId,$tableName,$transId,$pendingResults ,$currentTuple); *************** *** 411,417 **** $firstIteration=0; } $valuesQuery .= ")"; ! sendQueryToSlaves(undef,$insertQuery . $valuesQuery); return $currentTuple; } --- 438,444 ---- $firstIteration=0; } $valuesQuery .= ")"; ! sendQueryToSlaves($transId,$insertQuery . $valuesQuery); return $currentTuple; } *************** *** 491,497 **** $counter++; $firstField=0; } - sendQueryToSlaves($transId,$deleteQuery); return $currentTuple; } --- 518,523 ---- *************** *** 560,567 **** my $updateQuery = "UPDATE $tableName SET "; my $currentField; - - my %keyValueHash; my %dataValueHash; my $firstIteration=1; --- 586,591 ---- *************** *** 615,627 **** } $firstIteration=0; } - sendQueryToSlaves($transId,$updateQuery); return $currentTuple+1; } - =item sendQueryToSlaves(seqId,sqlQuery) Sends an SQL query to the slave. --- 639,649 ---- *************** *** 647,653 **** my $seqId = $_[0]; my $sqlQuery = $_[1]; ! if($::slaveInfo->{"status"} == 0) { my $queryResult = $::slaveInfo->{"slaveConn"}->exec($sqlQuery); unless($queryResult->resultStatus == PGRES_COMMAND_OK) { my $errorMessage; --- 669,675 ---- my $seqId = $_[0]; my $sqlQuery = $_[1]; ! if($::slaveInfo->{"status"} eq 'DBOpen') { my $queryResult = $::slaveInfo->{"slaveConn"}->exec($sqlQuery); unless($queryResult->resultStatus == PGRES_COMMAND_OK) { my $errorMessage; *************** *** 660,669 **** --- 682,699 ---- $::slaveInfo->{"status"} = -1; } } + elsif($::slaveInfo->{"status"} eq 'FileOpen' ) { + my $xfile = $::slaveInfo->{'TransactionFile'}; + print $xfile $sqlQuery . ";\n"; + } + + } + + =item logErrorMessage(error) Mails an error message to the users specified $errorEmailAddr *************** *** 713,747 **** } ! sub openSlaveConnection($) { my $slavePtr = $_[0]; - my $slaveConn; - my $slaveConnString = "host=" . $slavePtr->{"slaveHost"}; - $slaveConnString .= " dbname=" . $slavePtr->{"slaveDb"}; - $slaveConnString .= " user=" . $slavePtr->{"slaveUser"}; - $slaveConnString .= " password=" . $slavePtr->{"slavePassword"}; - - $slaveConn = Pg::connectdb($slaveConnString); - - if($slaveConn->status != PGRES_CONNECTION_OK) { - my $errorMessage = "Can't connect to slave database " ; - $errorMessage .= $slavePtr->{"slaveHost"} . "\n"; - $errorMessage .= $slaveConn->errorMessage; - logErrorMessage($errorMessage); - $slavePtr->{"status"} = -1; - } - else { - $slavePtr->{"slaveConn"} = $slaveConn; $slavePtr->{"status"} = 0; #Determine the MirrorHostId for the slave from the master's database ! my $resultSet = $masterConn->exec('SELECT "MirrorHostId" FROM ' ! . ' "MirrorHost" WHERE "HostName"' ! . '=\'' . $slavePtr->{"slaveHost"} . '\''); if($resultSet->ntuples !=1) { ! my $errorMessage .= $slavePtr->{"slaveHost"} ."\n"; $errorMessage .= "Has no MirrorHost entry on master\n"; logErrorMessage($errorMessage); $slavePtr->{"status"}=-1; --- 743,760 ---- } ! sub setupSlave($) { my $slavePtr = $_[0]; $slavePtr->{"status"} = 0; #Determine the MirrorHostId for the slave from the master's database ! my $resultSet = $masterConn->exec('SELECT MirrorHostId FROM ' ! . ' dbmirror_MirrorHost WHERE SlaveName' ! . '=\'' . $slavePtr->{"slaveName"} . '\''); if($resultSet->ntuples !=1) { ! my $errorMessage .= $slavePtr->{"slaveName"} ."\n"; $errorMessage .= "Has no MirrorHost entry on master\n"; logErrorMessage($errorMessage); $slavePtr->{"status"}=-1; *************** *** 750,762 **** } $slavePtr->{"MirrorHostId"} = $resultSet->getvalue(0,0); ! ! } ! } =item updateMirrorHostTable(lastTransId,lastSeqId) Updates the MirroredTransaction table to reflect the fact that --- 763,785 ---- } $slavePtr->{"MirrorHostId"} = $resultSet->getvalue(0,0); ! if(defined($::slaveInfo->{'slaveDb'})) { ! # We talk directly to a slave database. ! # ! if($::slaveInfo->{"status"} ne 'DBOpen') ! { ! openSlaveConnection($::slaveInfo); } ! sendQueryToSlaves(undef,"SET TRANSACTION ISOLATION LEVEL SERIALIZABLE"); ! sendQueryToSlaves(undef,"SET CONSTRAINTS ALL DEFERRED"); ! } ! else { ! $::slaveInfo->{"status"} = 'FileClosed'; } + } + =item updateMirrorHostTable(lastTransId,lastSeqId) Updates the MirroredTransaction table to reflect the fact that *************** *** 779,793 **** =cut ! sub updateMirrorHostTable($$) { my $lastTransId = shift; my $lastSeqId = shift; ! if($::slaveInfo->{"status"}==0) { my $deleteTransactionQuery; my $deleteResult; ! my $updateMasterQuery = "INSERT INTO \"MirroredTransaction\" "; ! $updateMasterQuery .= " (\"XID\",\"LastSeqId\",\"MirrorHostId\")"; $updateMasterQuery .= " VALUES ($lastTransId,$lastSeqId,$::slaveInfo->{\"MirrorHostId\"}) "; my $updateResult = $masterConn->exec($updateMasterQuery); --- 802,817 ---- =cut ! sub updateMirrorHostTable($$$) { my $lastTransId = shift; my $lastSeqId = shift; + my $isexpuncond = shift; ! if ($isexpuncond eq 't') { my $deleteTransactionQuery; my $deleteResult; ! my $updateMasterQuery = "INSERT INTO dbmirror_MirroredTransaction "; ! $updateMasterQuery .= " (XID,LastSeqId,MirrorHostId)"; $updateMasterQuery .= " VALUES ($lastTransId,$lastSeqId,$::slaveInfo->{\"MirrorHostId\"}) "; my $updateResult = $masterConn->exec($updateMasterQuery); *************** *** 802,811 **** #If this transaction has now been mirrored to all mirror hosts #then it can be deleted. ! $deleteTransactionQuery = 'DELETE FROM "Pending" WHERE "XID"=' ! . $lastTransId . ' AND (SELECT COUNT(*) FROM "MirroredTransaction"' ! . ' WHERE "XID"=' . $lastTransId . ')=(SELECT COUNT(*) FROM' ! . ' "MirrorHost")'; $deleteResult = $masterConn->exec($deleteTransactionQuery); if($deleteResult->resultStatus!=PGRES_COMMAND_OK) { --- 826,835 ---- #If this transaction has now been mirrored to all mirror hosts #then it can be deleted. ! $deleteTransactionQuery = 'DELETE FROM dbmirror_Pending WHERE XID=' ! . $lastTransId . ' AND (SELECT COUNT(*) FROM dbmirror_MirroredTransaction' ! . ' WHERE XID=' . $lastTransId . ')=(SELECT COUNT(*) FROM' ! . ' dbmirror_MirrorHost)'; $deleteResult = $masterConn->exec($deleteTransactionQuery); if($deleteResult->resultStatus!=PGRES_COMMAND_OK) { *************** *** 813,821 **** $deleteTransactionQuery); die; } - } ! } --- 837,854 ---- $deleteTransactionQuery); die; } } ! else { ! my $deleteTransactionQuery; ! my $deleteResult; ! $deleteTransactionQuery = 'DELETE FROM dbmirror_Pending WHERE XID='. $lastTransId.' AND slaveid='.$::slaveInfo->{"MirrorHostId"}; ! $deleteResult = $masterConn->exec($deleteTransactionQuery); ! if($deleteResult->resultStatus!=PGRES_COMMAND_OK) { ! logErrorMessage($masterConn->errorMessage . "\n" . ! $deleteTransactionQuery); ! die; ! } ! } } *************** *** 888,891 **** --- 921,990 ---- } #while return %valuesHash; + } + + + sub openTransactionFile($$) + { + my $slaveInfo = shift; + my $XID =shift; + # my $now_str = localtime; + my $nowsec; + my $nowmin; + my $nowhour; + my $nowmday; + my $nowmon; + my $nowyear; + my $nowwday; + my $nowyday; + my $nowisdst; + ($nowsec,$nowmin,$nowhour,$nowmday,$nowmon,$nowyear,$nowwday,$nowyday,$nowisdst) = + localtime; + my $fileName=sprintf(">%s/%s_%d-%d-%d_%d:%d:%dXID%d.sql", $::slaveInfo->{'TransactionFileDirectory'}, + $::slaveInfo->{"MirrorHostId"},($nowyear+1900),($nowmon+1),$nowmday,$nowhour,$nowmin, + $nowsec,$XID); + + my $xfile; + open($xfile,$fileName) or die "Can't open $fileName : $!"; + + $slaveInfo->{'TransactionFile'} = $xfile; + $slaveInfo->{'status'} = 'FileOpen'; + } + + + + sub openSlaveConnection($) { + my $slavePtr = $_[0]; + my $slaveConn; + + + my $slaveConnString; + if(defined($slavePtr->{"slaveHost"})) + { + $slaveConnString .= "host=" . $slavePtr->{"slaveHost"} . " "; + } + if(defined($slavePtr->{"slavePort"})) + { + $slaveConnString .= "port=" . $slavePtr->{"slavePort"} . " "; + } + + $slaveConnString .= " dbname=" . $slavePtr->{"slaveDb"}; + $slaveConnString .= " user=" . $slavePtr->{"slaveUser"}; + $slaveConnString .= " password=" . $slavePtr->{"slavePassword"}; + + $slaveConn = Pg::connectdb($slaveConnString); + + if($slaveConn->status != PGRES_CONNECTION_OK) { + my $errorMessage = "Can't connect to slave database " ; + $errorMessage .= $slavePtr->{"slaveHost"} . "\n"; + $errorMessage .= $slaveConn->errorMessage; + logErrorMessage($errorMessage); + $slavePtr->{"status"} = 'DBFailed'; + } + else { + $slavePtr->{"slaveConn"} = $slaveConn; + $slavePtr->{"status"} = 'DBOpen'; + } + + } Index: MirrorSetup.sql =================================================================== RCS file: /projects/cvsroot/pgsql-server/contrib/dbmirror/MirrorSetup.sql,v retrieving revision 1.5 diff -c -w -r1.5 MirrorSetup.sql *** MirrorSetup.sql 14 May 2003 03:25:55 -0000 1.5 --- MirrorSetup.sql 16 Nov 2003 00:05:06 -0000 *************** *** 1,43 **** CREATE FUNCTION "recordchange" () RETURNS trigger AS ! '/usr/local/pgsql/lib/pending.so', 'recordchange' LANGUAGE 'C'; - CREATE TABLE "MirrorHost" ( - "MirrorHostId" serial, - "HostName" varchar NOT NULL, - PRIMARY KEY("MirrorHostId") - ); - CREATE TABLE "Pending" ( - "SeqId" serial, - "TableName" varchar NOT NULL, - "Op" character, - "XID" int4 NOT NULL, - PRIMARY KEY ("SeqId") ); ! CREATE INDEX "Pending_XID_Index" ON "Pending" ("XID"); ! CREATE TABLE "PendingData" ( ! "SeqId" int4 NOT NULL, ! "IsKey" bool NOT NULL, ! "Data" varchar, ! PRIMARY KEY ("SeqId", "IsKey") , ! FOREIGN KEY ("SeqId") REFERENCES "Pending" ("SeqId") ON UPDATE CASCADE ON DELETE CASCADE ); ! CREATE TABLE "MirroredTransaction" ( ! "XID" int4 NOT NULL, ! "LastSeqId" int4 NOT NULL, ! "MirrorHostId" int4 NOT NULL, ! PRIMARY KEY ("XID","MirrorHostId"), ! FOREIGN KEY ("MirrorHostId") REFERENCES "MirrorHost" ("MirrorHostId") ON UPDATE CASCADE ON DELETE CASCADE, ! FOREIGN KEY ("LastSeqId") REFERENCES "Pending" ("SeqId") ON UPDATE CASCADE ON DELETE CASCADE ); --- 1,47 ---- + + SET autocommit TO 'on'; + CREATE FUNCTION "recordchange" () RETURNS trigger AS ! '/usr/local/postgresql-7.4/lib/pending.so', 'recordchange' LANGUAGE 'C'; + CREATE TABLE dbmirror_MirrorHost ( + MirrorHostId int4 not null, + SlaveName varchar NOT NULL, + PRIMARY KEY(MirrorHostId) + ); + CREATE TABLE dbmirror_Pending ( + SeqId serial, + TableName varchar NOT NULL, + Op character, + XID int4 NOT NULL, + SlaveId int4, + PRIMARY KEY (SeqId) ); ! CREATE INDEX "dbmirror_Pending_XID_Index" ON dbmirror_Pending (XID); ! CREATE TABLE dbmirror_PendingData ( ! SeqId int4 NOT NULL, ! IsKey bool NOT NULL, ! Data varchar, ! PRIMARY KEY (SeqId, IsKey) , ! FOREIGN KEY (SeqId) REFERENCES dbmirror_Pending (SeqId) ON UPDATE CASCADE ON DELETE CASCADE ); ! CREATE TABLE dbmirror_MirroredTransaction ( ! XID int4 NOT NULL, ! LastSeqId int4 NOT NULL, ! MirrorHostId int4 NOT NULL, ! PRIMARY KEY (XID,MirrorHostId), ! FOREIGN KEY (MirrorHostId) REFERENCES dbmirror_MirrorHost (MirrorHostId) ON UPDATE CASCADE ON DELETE CASCADE, ! FOREIGN KEY (LastSeqId) REFERENCES dbmirror_Pending (SeqId) ON UPDATE CASCADE ON DELETE CASCADE ); Index: pending.c =================================================================== RCS file: /projects/cvsroot/pgsql-server/contrib/dbmirror/pending.c,v retrieving revision 1.14 diff -c -w -r1.14 pending.c *** pending.c 29 Sep 2003 18:16:48 -0000 1.14 --- pending.c 16 Nov 2003 00:05:09 -0000 *************** *** 1,6 **** /**************************************************************************** * pending.c ! * $Id: pending.c,v 1.14 2003/09/29 18:16:48 momjian Exp $ * * This file contains a trigger for Postgresql-7.x to record changes to tables * to a pending table for mirroring. --- 1,6 ---- /**************************************************************************** * pending.c ! * $Id: pending.c,v 1.5 2003/11/11 02:49:50 ssinger Exp $ * * This file contains a trigger for Postgresql-7.x to record changes to tables * to a pending table for mirroring. *************** *** 34,39 **** --- 34,40 ---- #include #include #include + #include enum FieldUsage { PRIMARY = 0, NONPRIMARY, ALL, NUM_FIELDUSAGE *************** *** 42,65 **** int storePending(char *cpTableName, HeapTuple tBeforeTuple, HeapTuple tAfterTuple, TupleDesc tTupdesc, ! TriggerData *tpTrigdata, char cOp); ! int storeKeyInfo(char *cpTableName, HeapTuple tTupleData, TupleDesc tTuplDesc, ! TriggerData *tpTrigdata); ! int storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc, ! TriggerData *tpTrigData, int iIncludeKeyData); int2vector *getPrimaryKey(Oid tblOid); ! char *packageData(HeapTuple tTupleData, TupleDesc tTupleDecs, ! TriggerData *tTrigData, enum FieldUsage eKeyUsage); #define BUFFER_SIZE 256 #define MAX_OID_LEN 10 ! /*#define DEBUG_OUTPUT 1 */ extern Datum recordchange(PG_FUNCTION_ARGS); ! PG_FUNCTION_INFO_V1(recordchange); --- 43,85 ---- int storePending(char *cpTableName, HeapTuple tBeforeTuple, HeapTuple tAfterTuple, TupleDesc tTupdesc, ! Oid tableOid, ! char cOp,int slaveid); ! ! int handler(char *cpTableName, HeapTuple tBeforeTuple, ! HeapTuple tAfterTuple, ! TupleDesc tTupdesc, ! Oid tableOid, ! char cOp, int slaveid, char *pkxpress); ! ! int existsInAccnt(char *cpTableName, int slaveid, char *pkxpress); ! int createAccnt(char *cpTableName, int slaveid, char *pkxpress); ! int decreaseAccnt(char *cpTableName, int slaveid, char *pkxpress); ! int deleteAccnt(char *cpTableName, char *pkxpress); ! int deleteSlaveAccnt(char *cpTableName,int slaveid, char *pkxpress); ! char *getPKxpress(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc,Oid tableOid); ! int *getSlaves(char *cpTableName,char *pkxpress); ! ! int getSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc); ! ! int handleParents(HeapTuple Btuple,HeapTuple Atuple, TupleDesc tupleDesc,Oid tableOid,int slaveid); ! int updateAccntParents(HeapTuple Btuple,HeapTuple Atuple, TupleDesc tupleDesc,Oid tableOid,int slaveid); ! ! int storeKeyInfo(char *cpTableName, HeapTuple tTupleData, TupleDesc tTuplDesc, Oid tableOid); ! int storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc,Oid tbaleOid,int iIncludeKeyData); int2vector *getPrimaryKey(Oid tblOid); ! char *packageData(HeapTuple tTupleData, TupleDesc tTupleDecs, Oid tableOid, enum FieldUsage eKeyUsage); + #define BUFFER_SIZE 256 #define MAX_OID_LEN 10 ! #define DEBUG_OUTPUT 1 extern Datum recordchange(PG_FUNCTION_ARGS); ! /**#define CONDITIONAL_REPLICATION**/ PG_FUNCTION_INFO_V1(recordchange); *************** *** 81,86 **** --- 101,107 ---- char op = 0; char *schemaname; char *fullyqualtblname; + char *pkxpress=NULL; if (fcinfo->context != NULL) { *************** *** 124,131 **** beforeTuple = trigdata->tg_trigtuple; op = 'd'; } ! if (storePending(fullyqualtblname, beforeTuple, afterTuple, tupdesc, trigdata, op)) { /* An error occoured. Skip the operation. */ ereport(ERROR, --- 145,164 ---- beforeTuple = trigdata->tg_trigtuple; op = 'd'; } + #if defined CONDITIONAL_REPLICATION + pkxpress=getPKxpress(fullyqualtblname,retTuple,tupdesc,retTuple->t_tableOid); + if (handler(fullyqualtblname, beforeTuple, afterTuple, tupdesc,retTuple->t_tableOid, op,getSlaveId(fullyqualtblname,retTuple,tupdesc),pkxpress)) + { + /* An error occoured. Skip the operation. */ + ereport(ERROR, + (errcode(ERRCODE_TRIGGERED_ACTION_EXCEPTION), + errmsg("operation could not be mirrored"))); ! return PointerGetDatum(NULL); ! ! } ! #else ! if (storePending(fullyqualtblname, beforeTuple, afterTuple, tupdesc, retTuple->t_tableOid, op,0)) { /* An error occoured. Skip the operation. */ ereport(ERROR, *************** *** 135,144 **** --- 168,181 ---- return PointerGetDatum(NULL); } + #endif + #if defined DEBUG_OUTPUT elog(NOTICE, "returning on success"); #endif SPI_pfree(fullyqualtblname); + if(pkxpress != NULL) + SPI_pfree(pkxpress); SPI_finish(); return PointerGetDatum(retTuple); } *************** *** 160,183 **** storePending(char *cpTableName, HeapTuple tBeforeTuple, HeapTuple tAfterTuple, TupleDesc tTupDesc, ! TriggerData *tpTrigData, char cOp) { ! char *cpQueryBase = "INSERT INTO \"Pending\" (\"TableName\",\"Op\",\"XID\") VALUES ($1,$2,$3)"; int iResult = 0; HeapTuple tCurTuple; /* Points the current tuple(before or after) */ Datum saPlanData[4]; ! Oid taPlanArgTypes[3] = {NAMEOID, CHAROID, INT4OID}; void *vpPlan; tCurTuple = tBeforeTuple ? tBeforeTuple : tAfterTuple; ! ! vpPlan = SPI_prepare(cpQueryBase, 3, taPlanArgTypes); if (vpPlan == NULL) elog(NOTICE, "error creating plan"); /* SPI_saveplan(vpPlan); */ --- 197,221 ---- storePending(char *cpTableName, HeapTuple tBeforeTuple, HeapTuple tAfterTuple, TupleDesc tTupDesc, ! Oid tableOid, ! char cOp, int slaveid) { ! char *cpQueryBase = "INSERT INTO dbmirror_pending (TableName,Op,XID,slaveid) VALUES ($1,$2,$3,$4)"; int iResult = 0; HeapTuple tCurTuple; + char nulls[4]=" "; /* Points the current tuple(before or after) */ Datum saPlanData[4]; ! Oid taPlanArgTypes[4] = {NAMEOID, CHAROID, INT4OID, INT4OID}; void *vpPlan; tCurTuple = tBeforeTuple ? tBeforeTuple : tAfterTuple; ! vpPlan = SPI_prepare(cpQueryBase, 4, taPlanArgTypes); if (vpPlan == NULL) elog(NOTICE, "error creating plan"); /* SPI_saveplan(vpPlan); */ *************** *** 185,193 **** saPlanData[0] = PointerGetDatum(cpTableName); saPlanData[1] = CharGetDatum(cOp); saPlanData[2] = Int32GetDatum(GetCurrentTransactionId()); ! ! iResult = SPI_execp(vpPlan, saPlanData, NULL, 1); if (iResult < 0) elog(NOTICE, "storedPending fired (%s) returned %d", cpQueryBase, iResult); --- 223,232 ---- saPlanData[0] = PointerGetDatum(cpTableName); saPlanData[1] = CharGetDatum(cOp); saPlanData[2] = Int32GetDatum(GetCurrentTransactionId()); + saPlanData[3] = Int32GetDatum(slaveid); + if (slaveid <=0) nulls[3]='n'; ! iResult = SPI_execp(vpPlan, saPlanData, nulls, 1); if (iResult < 0) elog(NOTICE, "storedPending fired (%s) returned %d", cpQueryBase, iResult); *************** *** 202,208 **** * This is a record of a delete operation. * Just store the key data. */ ! iResult = storeKeyInfo(cpTableName, tBeforeTuple, tTupDesc, tpTrigData); } else if (cOp == 'i') { --- 241,247 ---- * This is a record of a delete operation. * Just store the key data. */ ! iResult = storeKeyInfo(cpTableName, tBeforeTuple, tTupDesc, tableOid); } else if (cOp == 'i') { *************** *** 210,224 **** * An Insert operation. * Store all data */ ! iResult = storeData(cpTableName, tAfterTuple, tTupDesc, tpTrigData, TRUE); } else { /* op must be an update. */ ! iResult = storeKeyInfo(cpTableName, tBeforeTuple, tTupDesc, tpTrigData); ! iResult = iResult ? iResult : storeData(cpTableName, tAfterTuple, tTupDesc, ! tpTrigData, TRUE); } #if defined DEBUG_OUTPUT --- 249,262 ---- * An Insert operation. * Store all data */ ! iResult = storeData(cpTableName, tAfterTuple, tTupDesc, tableOid,TRUE); } else { /* op must be an update. */ ! iResult = storeKeyInfo(cpTableName, tBeforeTuple, tTupDesc, tableOid); ! iResult = iResult ? iResult : storeData(cpTableName, tAfterTuple, tTupDesc,tableOid,TRUE); } #if defined DEBUG_OUTPUT *************** *** 231,242 **** int storeKeyInfo(char *cpTableName, HeapTuple tTupleData, ! TupleDesc tTupleDesc, ! TriggerData *tpTrigData) { Oid saPlanArgTypes[1] = {NAMEOID}; ! char *insQuery = "INSERT INTO \"PendingData\" (\"SeqId\",\"IsKey\",\"Data\") VALUES(currval('\"Pending_SeqId_seq\"'),'t',$1)"; void *pplan; Datum saPlanData[1]; char *cpKeyData; --- 269,279 ---- int storeKeyInfo(char *cpTableName, HeapTuple tTupleData, ! TupleDesc tTupleDesc, Oid tableOid) { Oid saPlanArgTypes[1] = {NAMEOID}; ! char *insQuery = "INSERT INTO dbmirror_pendingdata (SeqId,IsKey,Data) VALUES(currval('dbmirror_pending_seqid_seq'),'t',$1)"; void *pplan; Datum saPlanData[1]; char *cpKeyData; *************** *** 250,256 **** } /* pplan = SPI_saveplan(pplan); */ ! cpKeyData = packageData(tTupleData, tTupleDesc, tpTrigData, PRIMARY); if (cpKeyData == NULL) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), --- 287,293 ---- } /* pplan = SPI_saveplan(pplan); */ ! cpKeyData = packageData(tTupleData, tTupleDesc,tableOid, PRIMARY); if (cpKeyData == NULL) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), *************** *** 318,329 **** * Stores a copy of the non-key data for the row. *****************************************************************************/ int ! storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc, ! TriggerData *tpTrigData, int iIncludeKeyData) { Oid planArgTypes[1] = {NAMEOID}; ! char *insQuery = "INSERT INTO \"PendingData\" (\"SeqId\",\"IsKey\",\"Data\") VALUES(currval('\"Pending_SeqId_seq\"'),'f',$1)"; void *pplan; Datum planData[1]; char *cpKeyData; --- 355,365 ---- * Stores a copy of the non-key data for the row. *****************************************************************************/ int ! storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc,Oid tableOid, int iIncludeKeyData) { Oid planArgTypes[1] = {NAMEOID}; ! char *insQuery = "INSERT INTO dbmirror_pendingdata (SeqId,IsKey,Data) VALUES(currval('dbmirror_pending_seqid_seq'),'f',$1)"; void *pplan; Datum planData[1]; char *cpKeyData; *************** *** 338,346 **** /* pplan = SPI_saveplan(pplan); */ if (iIncludeKeyData == 0) ! cpKeyData = packageData(tTupleData, tTupleDesc, tpTrigData, NONPRIMARY); else ! cpKeyData = packageData(tTupleData, tTupleDesc, tpTrigData, ALL); planData[0] = PointerGetDatum(cpKeyData); iRetValue = SPI_execp(pplan, planData, NULL, 1); --- 374,382 ---- /* pplan = SPI_saveplan(pplan); */ if (iIncludeKeyData == 0) ! cpKeyData = packageData(tTupleData, tTupleDesc, tableOid, NONPRIMARY); else ! cpKeyData = packageData(tTupleData, tTupleDesc,tableOid, ALL); planData[0] = PointerGetDatum(cpKeyData); iRetValue = SPI_execp(pplan, planData, NULL, 1); *************** *** 376,383 **** * ALL implies include all fields. */ char * ! packageData(HeapTuple tTupleData, TupleDesc tTupleDesc, ! TriggerData *tpTrigData, enum FieldUsage eKeyUsage) { int iNumCols; --- 412,418 ---- * ALL implies include all fields. */ char * ! packageData(HeapTuple tTupleData, TupleDesc tTupleDesc, Oid tableOid, enum FieldUsage eKeyUsage) { int iNumCols; *************** *** 391,397 **** if (eKeyUsage != ALL) { ! tpPKeys = getPrimaryKey(tpTrigData->tg_relation->rd_id); if (tpPKeys == NULL) return NULL; } --- 426,432 ---- if (eKeyUsage != ALL) { ! tpPKeys = getPrimaryKey(tableOid); if (tpPKeys == NULL) return NULL; } *************** *** 437,442 **** --- 472,478 ---- continue; } } /* KeyUsage!=ALL */ + #ifndef NODROPCOLUMN if(tTupleDesc->attrs[iColumnCounter-1]->attisdropped) { *************** *** 526,529 **** --- 562,1240 ---- return cpDataBlock; + } + + #define MAXKCOLS 32 + #define MAXCOL_LEN 128 + int handler (char *cpTableName,HeapTuple tBeforeTuple,HeapTuple tAfterTuple,TupleDesc tupleDesc,Oid tableOid,char op,int slaveid,char *pkxpress) { + + HeapTuple tuple; + tuple = tBeforeTuple ? tBeforeTuple : tAfterTuple; + #if defined DEBUG_OUTPUT + elog(NOTICE,"---->IN handler with tableid=%d,tablename=%s,slaveid=%d and op=%c",tableOid,cpTableName,slaveid,op); + #endif + + if (slaveid == 0 && op == 'm') { + ereport(ERROR,(errcode(ERRCODE_TRIGGERED_ACTION_EXCEPTION), + errmsg("Found an explicit table ISEXPUNCOND with the 'm' (implicit) operation. Real slave ids ARE NEVER 0. That shouldnt happen"))); + return -1; + } + /* + ISEXPUNCOND + */ + else if (slaveid == 0) return storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,slaveid); + /* + ISEXPWITHSLAVEID + */ + else if (slaveid > 0 && op =='d') { + int retval; + retval = storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,slaveid); + return updateAccntParents(tBeforeTuple,NULL,tupleDesc,tableOid,slaveid); + } + else if (slaveid > 0 && op != 'm') { /* this is either 'i' or 'u' */ + int retval; + retval = handleParents(tBeforeTuple,tAfterTuple,tupleDesc,tableOid,slaveid); + retval= (retval)?retval:storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,slaveid); + if (op == 'u') { + retval = (retval)?retval:updateAccntParents(tBeforeTuple,tAfterTuple,tupleDesc,tableOid,slaveid); + } + return retval; + } + /* + ISIMPL + */ + else if (slaveid >0 && op == 'm') { + if (getSlaveId(cpTableName,tuple,tupleDesc) >= 0) { + ereport(ERROR,(errcode(ERRCODE_TRIGGERED_ACTION_EXCEPTION), + errmsg("Found an explicit table ISEXPUNCOND or ISEXPWITHSLAVEID with the 'm' (implicit) operation. .Fathers of ISEXPUNCOND must BE ALWAYS ISEXPUNCOND. Fathers of ISEXPWITHSLAVEID must be always ISIMPL.That shouldnt happen"))); + } + if (existsInAccnt(cpTableName,slaveid,pkxpress)) return 0; + else { + int retval; + retval = handleParents(tBeforeTuple,tAfterTuple,tupleDesc,tableOid,slaveid); + retval = (retval)?retval:storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'i',slaveid); + return (retval)?retval:createAccnt(cpTableName,slaveid,pkxpress); + } + } + /* + ISIMPL but called from trigger ('i','d','u') + */ + else if (slaveid < 0) { + if (op == 'i') return 0; + /* + Delete in this fashion will eventually never happen. + The deletions will be "triggered" by childern deletions/updates. + */ + else if (op == 'd') { + int retval=0; + int *slaves; + int *run; + slaves = getSlaves(cpTableName,pkxpress); + if (slaves == NULL) return 0; + for (run=slaves;*run;run++) { + retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,*run); + retval = retval || updateAccntParents(tBeforeTuple,NULL,tupleDesc,tableOid,*run); + } + + SPI_pfree(slaves); + return retval || deleteAccnt(cpTableName,pkxpress); + + } + else if (op == 'u') { + int retval=0; + int *slaves; + int *run; + + slaves = getSlaves(cpTableName,pkxpress); + if (slaves == NULL) return 0; + for (run=slaves;*run;run++) { + retval = retval || handleParents(tBeforeTuple,tAfterTuple,tupleDesc,tableOid,*run); + retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,*run); + retval = retval || updateAccntParents(tBeforeTuple,tAfterTuple,tupleDesc,tableOid,*run); + } + SPI_pfree(slaves); + return retval; + } + } + return 0; + } + + int getSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc) { + char *slave_siteidkeyname; + char *qb; + char *q; + char *foo; + int ret; + HeapTuple resTuple; + int attnum; + int slaveid; + SPITupleTable *SIDKEY_tupTable; + int SIDKEY_processed; + #if defined DEBUG_OUTPUT + elog(NOTICE, "in getSlaveId of %s",cpTableName); + #endif + qb = "SELECT siteidkeyname from dbmirror_explicitreptables where tblname="; + q = SPI_palloc(strlen(qb)+strlen(cpTableName)+2); + sprintf(q,"%s'%s'",qb,cpTableName); + elog(NOTICE,q); + ret = SPI_exec(q,1); + SPI_pfree(q); + /* + ISIMPL + */ + if (ret != SPI_OK_SELECT || SPI_processed != 1) return -1; + SIDKEY_tupTable = SPI_tuptable; + SIDKEY_processed = SPI_processed; + resTuple = SIDKEY_tupTable->vals[0]; + slave_siteidkeyname = SPI_getvalue(resTuple,SIDKEY_tupTable->tupdesc,1); + /* + ISEXPUNCOND + */ + if (slave_siteidkeyname == NULL) return 0; + attnum = SPI_fnumber(tupleDesc,slave_siteidkeyname); + if (attnum == SPI_ERROR_NOATTRIBUTE) { + ereport(ERROR,(errcode(ERRCODE_TRIGGERED_ACTION_EXCEPTION), + errmsg("WRONG slaveidketname=%s given for table %s",slave_siteidkeyname,cpTableName))); + SPI_pfree(slave_siteidkeyname); + return 0; + } + foo=SPI_getvalue(tuple,tupleDesc,attnum); + if (foo == NULL) { + ereport(ERROR,(errcode(ERRCODE_TRIGGERED_ACTION_EXCEPTION), + errmsg("%s cannot be null for table %s",slave_siteidkeyname,cpTableName))); + SPI_pfree(slave_siteidkeyname); + return 0; + } + SPI_pfree(slave_siteidkeyname); + slaveid = atoi(foo); + SPI_pfree(foo); + /* + ISEXPWITHSLAVEID + */ + return slaveid; + } + #define MAX_WHERE_CLAUSE 512 + #define MAX_QUERY_LEN 512 + int handleParents(HeapTuple Btuple,HeapTuple Atuple, TupleDesc tupleDesc,Oid tableOid,int slaveid) { + char *qb; + char *q; + bool isNull; + int ret; + int i; + HeapTuple resTuple; + Datum resDatum; + Oid confrelid; + char *confrelname; + int2 *thisCols; + int2 *fkCols; + int2 *run; + ArrayType *arr; + char AthisColsVals[MAXKCOLS][MAXCOL_LEN]; + char fkColsNames[MAXKCOLS][MAXCOL_LEN]; + short numOfCols; + SPITupleTable *FK_tupTable; + int FK_processed; + + #if defined DEBUG_OUTPUT + elog(NOTICE, "in handleParents of Table=%d",tableOid); + #endif + + qb = "SELECT c.confrelid,c.conkey,c.confkey,f.relname FROM pg_catalog.pg_constraint c,pg_catalog.pg_class f WHERE c.contype = 'f' AND c.confrelid = f.oid AND c.conrelid = "; + q = SPI_palloc(strlen(qb)+MAX_OID_LEN+1); + sprintf(q,"%s%d",qb,tableOid); + ret = SPI_exec(q,0); + SPI_pfree(q); + if (ret != SPI_OK_SELECT || SPI_processed < 0) { + elog(NOTICE, "no FKs"); + return 0; + } + /* + For every FK dependency we track and handle the parent table + */ + FK_tupTable = SPI_tuptable; + FK_processed = SPI_processed; + + for (i=0;ivals[i]; + + + resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,1,&isNull); + confrelid = (Oid) DatumGetObjectId(resDatum); + + resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,2,&isNull); + arr = (ArrayType *) DatumGetPointer(resDatum); + thisCols = (int2 *)ARR_DATA_PTR(arr); + numOfCols=ARR_DIMS(arr)[0]; + for (run=thisCols,colrun=0;colrun END OF ATTR LOOP"); + elog(NOTICE,"handle it = %d, ",handleit); + elog(NOTICE,"FkHasNullValue = %d, ",FkHasNullValue); + #endif + if (FkHasNullValue || !handleit) continue; + resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,3,&isNull); + arr = (ArrayType *) DatumGetPointer(resDatum); + fkCols = (int2 *)ARR_DATA_PTR(arr); + for (run=fkCols,colrun=0;colrunvals[0]; + value = SPI_getvalue(PAR_resTuple,PAR_tupTable->tupdesc,1); + memcpy(&(fkColsNames[colrun][0]),value,strlen(value)+1); + SPI_pfree(value); + + } + + resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,4,&isNull); + confrelname = (char *) DatumGetName(resDatum); + /* + There must be at least 1 column in compound FK!! + */ + WHERE = SPI_palloc(MAX_WHERE_CLAUSE); + sprintf(WHERE,"\"%s\"=%s",fkColsNames[0],AthisColsVals[0]); + for (j=1;jvals[0]; + ParTableName = SPI_palloc(256); + sprintf(ParTableName,"\"public\".\"%s\"",confrelname); + ret = handler(ParTableName,NULL,RealPar_tuple,RealPar_tupTable->tupdesc,confrelid,'m',slaveid,WHERE); + SPI_pfree(ParTableName); + SPI_pfree(WHERE); + + } + return 0; + } + + int existsInAccnt(char *cpTableName, int slaveid, char *pkxpress) { + char *q; + int ret; + + #if defined DEBUG_OUTPUT + elog(NOTICE, "in existsInAccnt of Table=%s",cpTableName); + #endif + + q = SPI_palloc(MAX_QUERY_LEN); + /* + sprintf(q,"SELECT 1 FROM dbmirror_accounting WHERE tblname='%s' AND pkxpress='%s' AND slaveid=%d",cpTableName,pkxpress,slaveid); + ret = SPI_exec(q,1); + SPI_pfree(q); + if (ret != SPI_OK_SELECT || SPI_processed != 1) return 0; + else return 1; + */ + sprintf(q,"UPDATE dbmirror_accounting set cnt=cnt+1 WHERE tblname='%s' AND pkxpress='%s' AND slaveid=%d",cpTableName,pkxpress,slaveid); + ret = SPI_exec(q,1); + SPI_pfree(q); + if (ret != SPI_OK_UPDATE || SPI_processed != 1) return 0; + else return 1; + } + + int decreaseAccnt(char *cpTableName, int slaveid, char *pkxpress) { + char *q; + int ret; + bool isNull; + + #if defined DEBUG_OUTPUT + elog(NOTICE, "in decreaseAccnt of Table=%s",cpTableName); + #endif + + q = SPI_palloc(MAX_QUERY_LEN); + sprintf(q,"UPDATE dbmirror_accounting set cnt=cnt-1 WHERE tblname='%s' AND pkxpress='%s' AND slaveid=%d",cpTableName,pkxpress,slaveid); + ret = SPI_exec(q,1); + SPI_pfree(q); + if (ret != SPI_OK_UPDATE || SPI_processed != 1) { + + ereport(ERROR,(errcode(ERRCODE_TRIGGERED_ACTION_EXCEPTION), + errmsg("problem in decreaseAccnt for table %s, slaveid=%d, pkxpress=%s",cpTableName,slaveid,pkxpress))); + } + q = SPI_palloc(MAX_QUERY_LEN); + sprintf(q,"SELECT cnt FROM dbmirror_accounting WHERE tblname='%s' AND pkxpress='%s' AND slaveid=%d",cpTableName,pkxpress,slaveid); + ret = SPI_exec(q,1); + SPI_pfree(q); + if (ret != SPI_OK_SELECT || SPI_processed != 1) elog(ERROR,"problem in decreaseAccnt for table %s, slaveid=%d, pkxpress=%s",cpTableName,slaveid,pkxpress); + return DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],SPI_tuptable->tupdesc,1,&isNull)); + } + + int createAccnt(char *cpTableName, int slaveid, char *pkxpress) { + char *q; + int ret; + + #if defined DEBUG_OUTPUT + elog(NOTICE, "in createAccnt of Table=%s",cpTableName); + #endif + + q = SPI_palloc(MAX_QUERY_LEN); + /* + public.dbmirror_accounting.cnt DEFAULT = 1 + */ + sprintf(q,"INSERT INTO dbmirror_accounting (tblname,pkxpress,slaveid) VALUES('%s','%s',%d)",cpTableName,pkxpress,slaveid); + ret = SPI_exec(q,1); + SPI_pfree(q); + if (ret != SPI_OK_INSERT || SPI_processed != 1) return -2; + else return 0; + } + + + char *getPKxpress(char *cpTableName, HeapTuple tuple, TupleDesc tupleDesc, Oid tableOid) { + int2vector *pk; + int i; + char *WHERE = SPI_palloc(MAX_WHERE_CLAUSE); + + #if defined DEBUG_OUTPUT + elog(NOTICE, "in getPKxpress of Table=%s",cpTableName); + #endif + + pk = getPrimaryKey(tableOid); + if (pk == NULL) return NULL; + if ((*pk)[0] != 0) { + char *keyname; + char *keyval; + keyname = SPI_fname(tupleDesc,(*pk)[0]); + if (keyname == NULL) { + ereport(ERROR,(errcode(ERRCODE_TRIGGERED_ACTION_EXCEPTION), + errmsg("FATAL ERROR: WRONG keyname key=%d given for table %s",(*pk)[0],cpTableName))); + return NULL; + } + keyval = SPI_getvalue(tuple,tupleDesc,(*pk)[0]); + if (keyval == NULL) { + ereport(ERROR,(errcode(ERRCODE_TRIGGERED_ACTION_EXCEPTION), + errmsg("FATAL ERROR: WRONG keyval key=%d given for table %s",(*pk)[0],cpTableName))); + return NULL; + } + + sprintf(WHERE,"%s=%s",keyname,keyval); + SPI_pfree(keyname); + SPI_pfree(keyval); + } + for (i=1;(*pk)[i] != 0;i++) { + char *keyname; + char *keyval; + keyname = SPI_fname(tupleDesc,(*pk)[i]); + if (keyname == NULL) { + ereport(ERROR,(errcode(ERRCODE_TRIGGERED_ACTION_EXCEPTION), + errmsg("FATAL ERROR: WRONG keyname key=%d given for table %s", + (*pk)[i],cpTableName))); + return NULL; + } + keyval = SPI_getvalue(tuple,tupleDesc,(*pk)[i]); + if (keyval == NULL) { + ereport(ERROR,(errcode(ERRCODE_TRIGGERED_ACTION_EXCEPTION), + errmsg("FATAL ERROR: WRONG keyval key=%d given for table %s", + (*pk)[i],cpTableName))); + return NULL; + } + + sprintf(WHERE,"%s AND %s=%s",WHERE,keyname,keyval); + SPI_pfree(keyname); + SPI_pfree(keyval); + + } + SPI_pfree(pk); + return WHERE; + } + int *getSlaves(char *cpTableName,char *pkxpress) { + char *q; + int ret; + int SLAVE_processed; + SPITupleTable *SLAVE_tupTable; + int *slaves; + + #if defined DEBUG_OUTPUT + elog(NOTICE, "in getSlaves of Table=%s",cpTableName); + #endif + + if (pkxpress == NULL) return NULL; + + q = SPI_palloc(MAX_QUERY_LEN); + sprintf(q,"SELECT slaveid FROM dbmirror_accounting WHERE tblname='%s' AND pkxpress='%s'",cpTableName,pkxpress); + ret = SPI_exec(q,0); + SPI_pfree(q); + if (ret != SPI_OK_SELECT || SPI_processed <= 0) return NULL; + SLAVE_tupTable = SPI_tuptable; + SLAVE_processed = SPI_processed; + slaves = SPI_palloc((SLAVE_processed+1) * sizeof(int)); + #if defined DEBUG_OUTPUT + elog(NOTICE,"SLAVE_processed=%d",SLAVE_processed); + #endif + for (ret=0;retvals[ret],SLAVE_tupTable->tupdesc,1); + *(slaves+ret) = atoi(slaveidStr); + SPI_pfree(slaveidStr); + + } + *(slaves+SLAVE_processed) = 0; + return slaves; + + + } + + int deleteAccnt (char *cpTableName, char *pkxpress) { + char *q; + int ret; + + #if defined DEBUG_OUTPUT + elog(NOTICE, "in deleteAccnt of Table=%s",cpTableName); + #endif + + q = SPI_palloc(MAX_QUERY_LEN); + sprintf(q,"DELETE FROM dbmirror_accounting where tblname='%s' AND pkxpress='%s'",cpTableName,pkxpress); + ret = SPI_exec(q,0); + SPI_pfree(q); + if (ret == SPI_OK_DELETE) return 0; + else return -2; + } + + int deleteSlaveAccnt (char *cpTableName,int slaveid, char *pkxpress) { + char *q; + int ret; + + #if defined DEBUG_OUTPUT + elog(NOTICE, "in deleteSlaveAccnt of Table=%s",cpTableName); + #endif + + q = SPI_palloc(MAX_QUERY_LEN); + sprintf(q,"DELETE FROM dbmirror_accounting where tblname='%s' AND pkxpress='%s' AND slaveid=%d",cpTableName,pkxpress,slaveid); + ret = SPI_exec(q,0); + SPI_pfree(q); + if (ret == SPI_OK_DELETE) return 0; + else return -2; + } + + int updateAccntParents(HeapTuple Btuple,HeapTuple Atuple, TupleDesc tupleDesc,Oid tableOid,int slaveid) { + char *qb; + char *q; + bool isNull; + int ret; + int i; + HeapTuple resTuple; + Datum resDatum; + Oid confrelid; + char *confrelname; + int2 *thisCols; + int2 *fkCols; + int2 *run; + ArrayType *arr; + char BthisColsVals[MAXKCOLS][MAXCOL_LEN]; + char fkColsNames[MAXKCOLS][MAXCOL_LEN]; + short numOfCols; + SPITupleTable *FK_tupTable; + int FK_processed; + + #if defined DEBUG_OUTPUT + elog(NOTICE, "in updateAccnt parents of Table=%d",tableOid); + #endif + + qb = "SELECT c.confrelid,c.conkey,c.confkey,f.relname FROM pg_catalog.pg_constraint c,pg_catalog.pg_class f WHERE c.contype = 'f' AND c.confrelid = f.oid AND c.conrelid = "; + q = SPI_palloc(strlen(qb)+MAX_OID_LEN+1); + sprintf(q,"%s%d",qb,tableOid); + ret = SPI_exec(q,0); + SPI_pfree(q); + if (ret != SPI_OK_SELECT || SPI_processed < 0) { + elog(NOTICE, "no FKs"); + return 0; + } + /* + For every FK dependency we track and handle the parent table + */ + FK_tupTable = SPI_tuptable; + FK_processed = SPI_processed; + + for (i=0;ivals[i]; + + + resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,1,&isNull); + confrelid = (Oid) DatumGetObjectId(resDatum); + + resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,2,&isNull); + arr = (ArrayType *) DatumGetPointer(resDatum); + thisCols = (int2 *)ARR_DATA_PTR(arr); + numOfCols=ARR_DIMS(arr)[0]; + for (run=thisCols,colrun=0;colrun END OF ATTR LOOP"); + elog(NOTICE,"decrease it = %d, ",decreaseit); + elog(NOTICE,"FkHasNullValue = %d, ",FkHasNullValue); + #endif + + if (FkHasNullValue || !decreaseit) continue; + + + + + resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,3,&isNull); + arr = (ArrayType *) DatumGetPointer(resDatum); + fkCols = (int2 *)ARR_DATA_PTR(arr); + for (run=fkCols,colrun=0;colrunvals[0]; + value = SPI_getvalue(PAR_resTuple,PAR_tupTable->tupdesc,1); + memcpy(&(fkColsNames[colrun][0]),value,strlen(value)+1); + SPI_pfree(value); + + } + + resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,4,&isNull); + confrelname = (char *) DatumGetName(resDatum); + /* + there must be at least 1 column in compound FK!! + */ + WHERE = SPI_palloc(MAX_WHERE_CLAUSE); + sprintf(WHERE,"\"%s\"=%s",fkColsNames[0],BthisColsVals[0]); + for (j=1;jvals[0]; + ParTableName = SPI_palloc(256); + sprintf(ParTableName,"\"public\".\"%s\"",confrelname); + if (decreaseAccnt(ParTableName,slaveid,WHERE)==0) { + ret = storePending(ParTableName,RealPar_tuple,NULL,RealPar_tupTable->tupdesc,confrelid,'d',slaveid); + ret = deleteSlaveAccnt(ParTableName,slaveid,WHERE); + ret = updateAccntParents(RealPar_tuple,NULL,RealPar_tupTable->tupdesc,confrelid,slaveid); + + } + SPI_pfree(ParTableName); + SPI_pfree(WHERE); + + } + return 0; } Index: slaveDatabase.conf =================================================================== RCS file: /projects/cvsroot/pgsql-server/contrib/dbmirror/slaveDatabase.conf,v retrieving revision 1.1 diff -c -w -r1.1 slaveDatabase.conf *** slaveDatabase.conf 23 Jun 2002 21:58:08 -0000 1.1 --- slaveDatabase.conf 16 Nov 2003 00:05:09 -0000 *************** *** 14,20 **** # Where to email Error messages to # $errorEmailAddr = "me@mydomain.com"; ! $slaveInfo->{"slaveHost"} = "backupMachine.mydomain.com"; $slaveInfo->{"slaveDb"} = "myDatabase"; $slaveInfo->{"slaveUser"} = "postgres"; --- 14,20 ---- # Where to email Error messages to # $errorEmailAddr = "me@mydomain.com"; ! $slaveInfo->{"slaveName"} = "SLAVE_1"; $slaveInfo->{"slaveHost"} = "backupMachine.mydomain.com"; $slaveInfo->{"slaveDb"} = "myDatabase"; $slaveInfo->{"slaveUser"} = "postgres";