/**************************************************************************** * pending.c * $Id: pending.c,v 1.8 2006/03/02 14:31:29 achill4 Exp $ * * This file contains a trigger for Postgresql-7.x to record changes to tables * to a pending table for mirroring. * All tables that should be mirrored should have this trigger hooked up to it. * * Written by Steven Singer (ssinger@navtechinc.com) * (c) 2001-2002 Navtech Systems Support Inc. * ALL RIGHTS RESERVED * * Permission to use, copy, modify, and distribute this software and its * documentation for any purpose, without fee, and without a written agreement * is hereby granted, provided that the above copyright notice and this * paragraph and the following two paragraphs appear in all copies. * * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. * * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES, * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. * * ***************************************************************************/ #include #include "postgres.h" #include "executor/spi.h" #include "commands/trigger.h" #include "catalog/pg_type.h" #include "utils/array.h" #include "utils/rel.h" #define Int2VectorSize(n) (offsetof(int2vector, values) + (n) * sizeof(int16)) #define TRUE 1 #define FALSE 0 PG_MODULE_MAGIC; enum FieldUsage { PRIMARY = 0, NONPRIMARY, ALL, NUM_FIELDUSAGE }; int storePending(char *cpTableName, HeapTuple tBeforeTuple, HeapTuple tAfterTuple, TupleDesc tTupdesc, Oid tableOid, char cOp,int slaveid, char origOp); int storexid(void); int handler(char *cpTableName, HeapTuple tBeforeTuple, HeapTuple tAfterTuple, TupleDesc tTupdesc, Oid tableOid, char cOp, int slaveid, char *pkxpress); int existsInAccnt(char *cpTableName, int slaveid, char *pkxpress); int createAccnt(char *cpTableName, int slaveid, char *pkxpress); int decreaseAccnt(char *cpTableName, int slaveid, char *pkxpress); int deleteAccnt(char *cpTableName, char *pkxpress); int deleteSlaveAccnt(char *cpTableName,int slaveid, char *pkxpress); char *getPKxpress(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc,Oid tableOid); int *getSlaves(char *cpTableName,char *pkxpress); int getSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc); int getComputedSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc); int getOldComputedSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc); /*char getForwardParentOrigOp(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc);*/ int handleParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple, TupleDesc tupleDesc,Oid tableOid,int slaveid); int updateAccntParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple, TupleDesc tupleDesc,Oid tableOid,int slaveid); int storeKeyInfo(char *cpTableName, HeapTuple tTupleData, TupleDesc tTuplDesc, Oid tableOid); int storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc,Oid tbaleOid,int iIncludeKeyData); int2vector *getPrimaryKey(Oid tblOid); char *packageData(char *cpTableName,HeapTuple tTupleData, TupleDesc tTupleDecs, Oid tableOid, enum FieldUsage eKeyUsage); bool isExcluded(char *cpTableName,TupleDesc tTupleDesc,int iColumnCounter); char *get_namespace_name(Oid nspid); #define BUFFER_SIZE 256 #define MAX_OID_LEN 10 //#define DEBUG_OUTPUT 1 extern Datum recordchange(PG_FUNCTION_ARGS); PG_FUNCTION_INFO_V1(recordchange); /***************************************************************************** * The entry point for the trigger function. * The Trigger takes a single SQL 'text' argument indicating the name of the * table the trigger was applied to. If this name is incorrect so will the * mirroring. ****************************************************************************/ Datum recordchange(PG_FUNCTION_ARGS) { TriggerData *trigdata; TupleDesc tupdesc; HeapTuple beforeTuple = NULL; HeapTuple afterTuple = NULL; HeapTuple retTuple = NULL; char *tblname; char op = 0; char *schemaname; char *fullyqualtblname; char *pkxpress; if (fcinfo->context != NULL) { if (SPI_connect() < 0) { elog(NOTICE, "recordchange could not connect to SPI"); return -1; } trigdata = (TriggerData *) fcinfo->context; /* Extract the table name */ tblname = SPI_getrelname(trigdata->tg_relation); #ifndef NOSCHEMAS schemaname = get_namespace_name(RelationGetNamespace(trigdata->tg_relation)); fullyqualtblname = palloc(strlen(tblname) + strlen(schemaname) + 6); sprintf(fullyqualtblname,"\"%s\".\"%s\"", schemaname,tblname); #else fullyqualtblname = palloc(strlen(tblname) + 3); sprintf(fullyqualtblname,"\"%s\"",tblname); #endif tupdesc = trigdata->tg_relation->rd_att; if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event)) { retTuple = trigdata->tg_newtuple; beforeTuple = trigdata->tg_trigtuple; afterTuple = trigdata->tg_newtuple; op = 'u'; } else if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event)) { retTuple = trigdata->tg_trigtuple; afterTuple = trigdata->tg_trigtuple; op = 'i'; } else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event)) { retTuple = trigdata->tg_trigtuple; beforeTuple = trigdata->tg_trigtuple; op = 'd'; } if (storexid()) { elog(ERROR, "Operation could not be mirrored. storexid problem"); return PointerGetDatum(NULL); } pkxpress=getPKxpress(fullyqualtblname,retTuple,tupdesc,retTuple->t_tableOid); if (handler(fullyqualtblname, beforeTuple, afterTuple, tupdesc,retTuple->t_tableOid, op,getSlaveId(fullyqualtblname,retTuple,tupdesc),pkxpress)) { /* An error occoured. Skip the operation. */ elog(ERROR, "Operation could not be mirrored"); return PointerGetDatum(NULL); } #if defined DEBUG_OUTPUT elog(NOTICE, "Returning on success"); #endif pfree(fullyqualtblname); pfree(pkxpress); SPI_finish(); return PointerGetDatum(retTuple); } else { /* * Not being called as a trigger. */ return PointerGetDatum(NULL); } } /***************************************************************************** * stores the current xid in dbmirror_xactions *****************************************************************************/ int storexid(void) { //char *cpQueryBase = "INSERT INTO dbmirror_xactions (XID) VALUES ($1)"; char *cpQueryBase = "INSERT INTO dbmirror_xactions (XID) SELECT $1 WHERE NOT EXISTS (SELECT 1 FROM dbmirror_xactions WHERE xid=$1)"; int iResult = 0; Datum saPlanData[1]; Oid taPlanArgTypes[1] = {INT4OID}; void *vpPlan; vpPlan = SPI_prepare(cpQueryBase, 1, taPlanArgTypes); if (vpPlan == NULL) elog(NOTICE, " storexid Error creating plan"); saPlanData[0] = Int32GetDatum(GetCurrentTransactionId()); iResult = SPI_execp(vpPlan, saPlanData, NULL , 1); if (iResult < 0) { elog(NOTICE, "storexid fired (%s) returned %d", cpQueryBase, iResult); return -1; } #if defined DEBUG_OUTPUT elog(NOTICE, "row successfully stored in dbmirror_xactions"); #endif return 0; } /***************************************************************************** * Constructs and executes an SQL query to write a record of this tuple change * to the pending table. *****************************************************************************/ int storePending(char *cpTableName, HeapTuple tBeforeTuple, HeapTuple tAfterTuple, TupleDesc tTupDesc, Oid tableOid, char cOp, int slaveid, char origOp) { char *cpQueryBase = "INSERT INTO dbmirror_pending (TableName,Op,XID,slaveid,origop) VALUES ($1,$2,$3,$4,$5)"; int iResult = 0; HeapTuple tCurTuple; char nulls[5]=" "; //Points the current tuple(before or after) Datum saPlanData[5]; Oid taPlanArgTypes[5] = {NAMEOID, CHAROID, INT4OID, INT4OID, CHAROID}; void *vpPlan; tCurTuple = tBeforeTuple ? tBeforeTuple : tAfterTuple; vpPlan = SPI_prepare(cpQueryBase, 5, taPlanArgTypes); if (vpPlan == NULL) elog(NOTICE, "Error creating plan"); /* SPI_saveplan(vpPlan); */ saPlanData[0] = PointerGetDatum(cpTableName); saPlanData[1] = CharGetDatum(cOp); saPlanData[2] = Int32GetDatum(GetCurrentTransactionId()); saPlanData[3] = Int32GetDatum(slaveid); if (slaveid <=0) nulls[3]='n'; saPlanData[4] = CharGetDatum(origOp); iResult = SPI_execp(vpPlan, saPlanData, nulls, 1); if (iResult < 0) elog(NOTICE, "storedPending fired (%s) returned %d", cpQueryBase, iResult); #if defined DEBUG_OUTPUT elog(NOTICE, "row successfully stored in pending table"); #endif if (cOp == 'd') { /** * This is a record of a delete operation. * Just store the key data. */ iResult = storeKeyInfo(cpTableName, tBeforeTuple, tTupDesc, tableOid); } else if (cOp == 'i') { /** * An Insert operation. * Store all data */ iResult = storeData(cpTableName, tAfterTuple, tTupDesc, tableOid,TRUE); } else { /* op must be an update. */ iResult = storeKeyInfo(cpTableName, tBeforeTuple, tTupDesc, tableOid); iResult = iResult ? iResult : storeData(cpTableName, tAfterTuple, tTupDesc,tableOid,TRUE); } #if defined DEBUG_OUTPUT elog(NOTICE, "Done storing keyinfo/data"); #endif return iResult; } int storeKeyInfo(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc, Oid tableOid) { Oid saPlanArgTypes[1] = {VARCHAROID}; char *insQuery = "INSERT INTO dbmirror_pendingdata (SeqId,IsKey,Data) VALUES(currval('dbmirror_pending_seqid_seq'),'t',$1)"; void *pplan; Datum saPlanData[1]; char *cpKeyData; char *cpKeyData_tmp; int iRetCode; pplan = SPI_prepare(insQuery, 1, saPlanArgTypes); if (pplan == NULL) { elog(NOTICE, "Could not prepare INSERT plan"); return -1; } /* pplan = SPI_saveplan(pplan); */ cpKeyData = packageData(cpTableName,tTupleData, tTupleDesc,tableOid, PRIMARY); if (cpKeyData == NULL) { elog(ERROR,"Could not determine primary key data"); return -1; } #if defined DEBUG_OUTPUT elog(NOTICE, "KeyData: %s", cpKeyData); #endif cpKeyData_tmp = palloc(VARHDRSZ+strlen(cpKeyData)); memcpy((cpKeyData_tmp+VARHDRSZ), cpKeyData, strlen(cpKeyData)); SET_VARSIZE(cpKeyData_tmp, VARHDRSZ+strlen(cpKeyData)); saPlanData[0] = PointerGetDatum(cpKeyData_tmp); iRetCode = SPI_execp(pplan, saPlanData, NULL, 1); if (cpKeyData != NULL) pfree(cpKeyData); if (cpKeyData_tmp != 0) pfree(cpKeyData_tmp); if (iRetCode != SPI_OK_INSERT) { elog(NOTICE, "Error inserting row in storeKeyInfo"); return -1; } #if defined DEBUG_OUTPUT elog(NOTICE, "Insert successful"); #endif return 0; } int2vector * getPrimaryKey(Oid tblOid) { char *queryBase; char *query; bool isNull; int2vector *resultKey; int2vector *tpResultKey; HeapTuple resTuple; Datum resDatum; int ret; queryBase = "SELECT indkey FROM pg_index WHERE indisprimary='t' AND indrelid="; query = palloc(strlen(queryBase) + MAX_OID_LEN + 1); sprintf(query, "%s%d", queryBase, tblOid); ret = SPI_exec(query, 1); if (ret != SPI_OK_SELECT || SPI_processed != 1) { elog(NOTICE, "Could not select primary index key"); return NULL; } resTuple = SPI_tuptable->vals[0]; resDatum = SPI_getbinval(resTuple, SPI_tuptable->tupdesc, 1, &isNull); if (isNull) { elog(NOTICE, "PKey is NULL"); return NULL; } tpResultKey = (int2vector *) DatumGetPointer(PG_DETOAST_DATUM(resDatum)); int n=tpResultKey->dim1; resultKey = palloc(Int2VectorSize(n)); if (n > 0) memcpy(resultKey->values, tpResultKey->values, n * sizeof(int16)); SET_VARSIZE(resultKey, Int2VectorSize(n)); resultKey->ndim = 1; resultKey->dataoffset = 0; resultKey->elemtype = INT2OID; resultKey->dim1 = n; resultKey->lbound1 = 0; pfree(query); return resultKey; } /****************************************************************************** * Stores a copy of the non-key data for the row. *****************************************************************************/ int storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc,Oid tableOid, int iIncludeKeyData) { Oid planArgTypes[1] = {VARCHAROID}; char *insQuery = "INSERT INTO dbmirror_pendingdata (SeqId,IsKey,Data) VALUES(currval('dbmirror_pending_seqid_seq'),'f',$1)"; SPIPlanPtr pplan; Datum planData[1]; char *cpKeyData; char *cpKeyData_tmp; int iRetValue; pplan = SPI_prepare(insQuery, 1, planArgTypes); if (pplan == NULL) { elog(NOTICE, "Could not prepare INSERT plan"); return -1; } /* pplan = SPI_saveplan(pplan); */ if (iIncludeKeyData == 0) cpKeyData = packageData(cpTableName,tTupleData, tTupleDesc, tableOid, NONPRIMARY); else cpKeyData = packageData(cpTableName,tTupleData, tTupleDesc,tableOid, ALL); cpKeyData_tmp = palloc(VARHDRSZ+strlen(cpKeyData)); memcpy((cpKeyData_tmp+VARHDRSZ), cpKeyData, strlen(cpKeyData)); SET_VARSIZE(cpKeyData_tmp, VARHDRSZ+strlen(cpKeyData)); planData[0] = PointerGetDatum(cpKeyData_tmp); iRetValue = SPI_execp(pplan, planData, NULL, 1); if (cpKeyData != 0) pfree(cpKeyData); if (cpKeyData_tmp != 0) pfree(cpKeyData_tmp); if (iRetValue != SPI_OK_INSERT) { elog(NOTICE, "Error inserting row in storeData"); return -1; } #if defined DEBUG_OUTPUT elog(NOTICE, "Insert successful"); #endif return 0; } /** * Packages the data in tTupleData into a string of the format * FieldName='value text' where any quotes inside of value text * are escaped with a backslash and any backslashes in value text * are esacped by a second back slash. * * tTupleDesc should be a description of the tuple stored in * tTupleData. * * eFieldUsage specifies which fields to use. * PRIMARY implies include only primary key fields. * NONPRIMARY implies include only non-primary key fields. * ALL implies include all fields. */ char * packageData(char *cpTableName,HeapTuple tTupleData, TupleDesc tTupleDesc, Oid tableOid, enum FieldUsage eKeyUsage) { int iNumCols; int2vector *tpPKeys = NULL; int iColumnCounter; char *cpDataBlock; int iDataBlockSize; int iUsedDataBlock; iNumCols = tTupleDesc->natts; if (eKeyUsage != ALL) { tpPKeys = getPrimaryKey(tableOid); if (tpPKeys == NULL) return NULL; } #if defined DEBUG_OUTPUT if (tpPKeys != NULL) elog(NOTICE, "Have primary keys"); #endif cpDataBlock = palloc(BUFFER_SIZE); iDataBlockSize = BUFFER_SIZE; iUsedDataBlock = 0; /* To account for the null */ for (iColumnCounter = 1; iColumnCounter <= iNumCols; iColumnCounter++) { int iIsPrimaryKey; int iPrimaryKeyIndex; char *cpUnFormatedPtr; char *cpFormatedPtr; char *cpFieldName; char *cpFieldData; if (eKeyUsage != ALL) { /* Determine if this is a primary key or not. */ iIsPrimaryKey = 0; int16 *tpPKeysV = tpPKeys->values; int tpPKeysSize = tpPKeys->dim1; for (iPrimaryKeyIndex = 0; iPrimaryKeyIndexattrs[iColumnCounter-1].attisdropped) //if(tTupleDesc->attrs[iColumnCounter-1]->attisdropped) { /** * This column has been dropped. * Do not mirror it. */ continue; } #endif /*************************************** Edw mpainei o kwdikas gia elegxo enanti, tou dbmirror_exclude_attributes, gia eksairetea columns ****************************************/ if (isExcluded(cpTableName,tTupleDesc,iColumnCounter)) { continue; } // this comment is for 11 cpFieldName = DatumGetPointer(NameGetDatum(&tTupleDesc->attrs[iColumnCounter - 1].attname)); //cpFieldName = DatumGetPointer(NameGetDatum(&tTupleDesc->attrs[iColumnCounter - 1]->attname)); #if defined DEBUG_OUTPUT elog(NOTICE, "FieldName: %s", cpFieldName); #endif while (iDataBlockSize - iUsedDataBlock < strlen(cpFieldName) + 6) { cpDataBlock = repalloc(cpDataBlock, iDataBlockSize + BUFFER_SIZE); iDataBlockSize = iDataBlockSize + BUFFER_SIZE; } sprintf(cpDataBlock + iUsedDataBlock, "\"%s\"=", cpFieldName); iUsedDataBlock = iUsedDataBlock + strlen(cpFieldName) + 3; cpFieldData = SPI_getvalue(tTupleData, tTupleDesc, iColumnCounter); cpUnFormatedPtr = cpFieldData; cpFormatedPtr = cpDataBlock + iUsedDataBlock; if (cpFieldData != NULL) { *cpFormatedPtr = '\''; iUsedDataBlock++; cpFormatedPtr++; } else { sprintf(cpFormatedPtr," "); iUsedDataBlock++; cpFormatedPtr++; continue; } #if defined DEBUG_OUTPUT elog(NOTICE, "FieldData: %s", cpFieldData); elog(NOTICE, "Starting format loop"); #endif while (*cpUnFormatedPtr != 0) { while (iDataBlockSize - iUsedDataBlock < 2) { cpDataBlock = repalloc(cpDataBlock, iDataBlockSize + BUFFER_SIZE); iDataBlockSize = iDataBlockSize + BUFFER_SIZE; cpFormatedPtr = cpDataBlock + iUsedDataBlock; } if (*cpUnFormatedPtr == '\\' || *cpUnFormatedPtr == '\'') { *cpFormatedPtr = '\\'; cpFormatedPtr++; iUsedDataBlock++; } *cpFormatedPtr = *cpUnFormatedPtr; cpFormatedPtr++; cpUnFormatedPtr++; iUsedDataBlock++; } pfree(cpFieldData); while (iDataBlockSize - iUsedDataBlock < 3) { cpDataBlock = repalloc(cpDataBlock, iDataBlockSize + BUFFER_SIZE); iDataBlockSize = iDataBlockSize + BUFFER_SIZE; cpFormatedPtr = cpDataBlock + iUsedDataBlock; } sprintf(cpFormatedPtr, "' "); iUsedDataBlock = iUsedDataBlock + 2; #if defined DEBUG_OUTPUT elog(NOTICE, "DataBlock: %s", cpDataBlock); #endif } /* for iColumnCounter */ if (tpPKeys != NULL) pfree(tpPKeys); #if defined DEBUG_OUTPUT elog(NOTICE, "Returning: DataBlockSize:%d iUsedDataBlock:%d",iDataBlockSize, iUsedDataBlock); #endif memset(cpDataBlock + iUsedDataBlock, 0, iDataBlockSize - iUsedDataBlock); return cpDataBlock; } bool isExcluded(char *cpTableName, TupleDesc tupleDesc, int iColumnNumber) { char *qb1; char *qb2; char *q; char *fieldName; int ret; HeapTuple resTuple; SPITupleTable *SIDKEY_tupTable; int SIDKEY_processed; char *value; #if defined DEBUG_OUTPUT elog(NOTICE, "in isExcluded of %s",cpTableName); #endif fieldName = SPI_fname(tupleDesc,iColumnNumber); #if defined DEBUG_OUTPUT elog(NOTICE, "fieldName=%s",fieldName); #endif qb1 = "SELECT "; qb2 = " = any(attnames) from dbmirror_exclude_attributes where tblname="; #if defined DEBUG_OUTPUT elog(NOTICE, "%s q size=%d",fieldName,strlen(qb1)+strlen(qb2)+strlen(cpTableName)+2+strlen(fieldName)+2); #endif /* to +1 parakatw einai gia to 0 (null) */ q = palloc(strlen(qb1)+strlen(qb2)+strlen(cpTableName)+2+strlen(fieldName)+2+1); sprintf(q,"%s'%s'%s'%s'",qb1,fieldName,qb2,cpTableName); ret = SPI_exec(q,1); pfree(q); pfree(fieldName); if (ret != SPI_OK_SELECT || SPI_processed != 1) return FALSE; SIDKEY_tupTable = SPI_tuptable; SIDKEY_processed = SPI_processed; resTuple = SIDKEY_tupTable->vals[0]; value = SPI_getvalue(resTuple,SIDKEY_tupTable->tupdesc,1); if (value == NULL) return FALSE; if (strncmp(value,"t",1) == 0 || strncmp(value,"T",1) == 0) { pfree(value); return TRUE; } else { pfree(value); return FALSE; } } #define MAXKCOLS 32 #define MAXCOL_LEN 128 int handler (char *cpTableName,HeapTuple tBeforeTuple,HeapTuple tAfterTuple,TupleDesc tupleDesc,Oid tableOid,char op,int slaveid,char *pkxpress) { HeapTuple tuple; tuple = tBeforeTuple ? tBeforeTuple : tAfterTuple; #if defined DEBUG_OUTPUT elog(NOTICE,"---->IN handler with tableid=%d,tablename=%s,slaveid=%d, op=%c,pkxpress=%s",tableOid,cpTableName,slaveid,op,pkxpress); #endif if (slaveid == 0 && op == 'm') { elog(ERROR,"Found an explicit table ISEXPUNCOND with the 'm' (implicit) operation. Real slave ids ARE NEVER 0. That shouldnt happen"); return -1; } /* ISEXPUNCOND */ else if (slaveid == 0) return storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,slaveid,op); /* ISEXPWITHSLAVEID */ else if (slaveid > 0 && op =='d') { int retval; retval = storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,slaveid,op); return updateAccntParents(cpTableName,tBeforeTuple,NULL,tupleDesc,tableOid,slaveid); } else if (slaveid > 0 && op == 'i') { int retval; retval = handleParents(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,slaveid); retval= (retval)?retval:storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,slaveid,op); return retval; } else if (slaveid > 0 && op == 'u') { int retval; int old_slaveid; old_slaveid = getSlaveId(cpTableName,tBeforeTuple,tupleDesc); retval = handleParents(cpTableName,(slaveid!=old_slaveid)?NULL:tBeforeTuple,tAfterTuple,tupleDesc,tableOid,slaveid); if (old_slaveid != -3 && old_slaveid != slaveid) { retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'d',old_slaveid,op); } if (old_slaveid != slaveid) { retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'i',slaveid,op); } else { retval= (retval)?retval:storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,slaveid,op); } if (old_slaveid != -3) { retval = (retval)?retval:updateAccntParents(cpTableName,tBeforeTuple,(old_slaveid!=slaveid)?NULL:tAfterTuple,tupleDesc,tableOid,old_slaveid); } return retval; } else if (slaveid == -3 && (op =='d' || op =='i')) { return 0; } else if (slaveid == -3 && op =='u') { int retval=0; int old_slaveid; old_slaveid = getSlaveId(cpTableName,tBeforeTuple,tupleDesc); if (old_slaveid > 0) { retval=storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'d',old_slaveid,op); retval = (retval)?retval:updateAccntParents(cpTableName,tBeforeTuple,NULL,tupleDesc,tableOid,old_slaveid); } return retval; } /* ISIMPLBACKWARD KALESMENO apo handleParents */ else if (slaveid >0 && op == 'm') { if (getSlaveId(cpTableName,tuple,tupleDesc) >= 0) { return 0; /* SE SXOLIO Giati twra mporei enas pateras ISEXPUNCOND/ISEXPWITHSLAVEID na exei paidi ISIMPLBACKWARD */ /* elog(ERROR,"Found an explicit table ISEXPUNCOND or ISEXPWITHSLAVEID with the 'm' (implicit) operation. .Fathers of ISEXPUNCOND must BE ALWAYS ISEXPUNCOND. Fathers of ISEXPWITHSLAVEID must be always ISIMPL.That shouldnt happen"); */ } if (getSlaveId(cpTableName,tuple,tupleDesc) == -2) { return 0; /* */ } if (getSlaveId(cpTableName,tuple,tupleDesc) == -3) { return 0; } if (existsInAccnt(cpTableName,slaveid,pkxpress)) return 0; else { int retval; retval = handleParents(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,slaveid); retval = (retval)?retval:storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'i',slaveid,op); return (retval)?retval:createAccnt(cpTableName,slaveid,pkxpress); } } /* ISIMPL* but called from trigger ('i','d','u') */ /* ISIMPLBACKWARD */ else if (slaveid == -1) { if (op == 'i') return 0; /* Delete in this fashion may happen for "orphan" rows. The deletions will be "triggered" by childern deletions/updates. */ else if (op == 'd') { int retval=0; int *slaves; int *run; slaves = getSlaves(cpTableName,pkxpress); if (slaves == NULL) return 0; for (run=slaves;*run;run++) { retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,*run,op); retval = retval || updateAccntParents(cpTableName,tBeforeTuple,NULL,tupleDesc,tableOid,*run); } pfree(slaves); return retval || deleteAccnt(cpTableName,pkxpress); } else if (op == 'u') { int retval=0; int *slaves; int *run; slaves = getSlaves(cpTableName,pkxpress); #if defined DEBUG_OUTPUT elog(NOTICE,"in slaveid=-1, op='u', slaves=%d",(int)slaves); #endif if (slaves == NULL) return 0; for (run=slaves;*run;run++) { #if defined DEBUG_OUTPUT elog(NOTICE,"in slaveid=-1, op='u', runslave=%d",*run); #endif retval = retval || handleParents(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,*run); retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,*run,op); retval = retval || updateAccntParents(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,*run); } pfree(slaves); return retval; } } /* ISIMPLFORWARD */ else if (slaveid == -2) { if (op == 'i') { int retval=0; int forw_slaveid = getComputedSlaveId(cpTableName,tuple,tupleDesc); if (forw_slaveid>0) { retval = retval || handleParents(cpTableName,NULL,tAfterTuple,tupleDesc,tableOid,forw_slaveid); retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,forw_slaveid,op); } return retval; } else if (op == 'd') { int retval=0; int forw_slaveid = getComputedSlaveId(cpTableName,tuple,tupleDesc); if (forw_slaveid>0) { retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,forw_slaveid,op); retval = retval || updateAccntParents(cpTableName,tBeforeTuple,NULL,tupleDesc,tableOid,forw_slaveid); } return retval ; } else if (op == 'u') { /* POTE ALLAGH TOU VALUE TOU COLUMN NAME TOU PATH dhl tou COLUMN ME NAME PATHCOL */ int retval=0; int forw_slaveid = getComputedSlaveId(cpTableName,tAfterTuple,tupleDesc); int old_slaveid = getOldComputedSlaveId(cpTableName,tBeforeTuple,tupleDesc); //char origop = getForwardParentOrigOp(cpTableName,tBeforeTuple,tupleDesc); //if (origop == 'i') { // old_slaveid = forw_slaveid; //} //elog(NOTICE, "in handler ISIMPLFWD nu, op='%c' , forw_slaveid= %d , old_slaveid= %d ", op,forw_slaveid,old_slaveid); // LEGACY CODE as of 2016-03-18 //retval = retval || handleParents(cpTableName,(forw_slaveid!=old_slaveid)?NULL:tBeforeTuple,tAfterTuple,tupleDesc,tableOid,forw_slaveid); //retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,forw_slaveid); //retval = retval || updateAccntParents(cpTableName,tBeforeTuple,(forw_slaveid!=old_slaveid)?NULL:tAfterTuple,tupleDesc,tableOid,old_slaveid); if (forw_slaveid>0) { retval = handleParents(cpTableName,(forw_slaveid!=old_slaveid)?NULL:tBeforeTuple,tAfterTuple,tupleDesc,tableOid,forw_slaveid); if (old_slaveid != -3 && old_slaveid != forw_slaveid) { retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'d',old_slaveid,op); } if (old_slaveid != forw_slaveid) { retval = retval || storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'i',forw_slaveid,op); } else { retval= (retval)?retval:storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,op,forw_slaveid,op); } if (old_slaveid != -3) { retval = (retval)?retval:updateAccntParents(cpTableName,tBeforeTuple,(old_slaveid!=forw_slaveid)?NULL:tAfterTuple,tupleDesc,tableOid,old_slaveid); } } else { // ISIMPLFORWARD me vslid IS NULL --> -3 if (old_slaveid > 0) { retval=storePending(cpTableName,tBeforeTuple,tAfterTuple,tupleDesc,tableOid,'d',old_slaveid,op); retval = (retval)?retval:updateAccntParents(cpTableName,tBeforeTuple,NULL,tupleDesc,tableOid,old_slaveid); } } return retval; } } return 0; } int getSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc) { char *slave_siteidkeyname; char *qb; char *q; char *foo; int ret; HeapTuple resTuple; int attnum; int slaveid; SPITupleTable *SIDKEY_tupTable; int SIDKEY_processed; #if defined DEBUG_OUTPUT elog(NOTICE, "in getSlaveId of %s",cpTableName); #endif qb = "SELECT siteidkeyname from dbmirror_explicitreptables where tblname="; q = palloc(strlen(qb)+strlen(cpTableName)+2+1); sprintf(q,"%s'%s'",qb,cpTableName); ret = SPI_exec(q,1); pfree(q); /* ISIMPLBACKWARD */ if (ret != SPI_OK_SELECT || SPI_processed != 1) return -1; SIDKEY_tupTable = SPI_tuptable; SIDKEY_processed = SPI_processed; resTuple = SIDKEY_tupTable->vals[0]; slave_siteidkeyname = SPI_getvalue(resTuple,SIDKEY_tupTable->tupdesc,1); /* ISEXPUNCOND */ if (slave_siteidkeyname == NULL) return 0; /* ISIMPLFORWARD */ if (strncmp(slave_siteidkeyname,"___",3) == 0) { pfree(slave_siteidkeyname); return -2; } attnum = SPI_fnumber(tupleDesc,slave_siteidkeyname); if (attnum == SPI_ERROR_NOATTRIBUTE) { elog(ERROR,"WRONG slaveidkeyname=%s given for table %s",slave_siteidkeyname,cpTableName); pfree(slave_siteidkeyname); return 0; } foo=SPI_getvalue(tuple,tupleDesc,attnum); /* ISEXPWITHSLAVEID BUT siteidkeyname is null */ if (foo == NULL) { pfree(slave_siteidkeyname); return -3; } pfree(slave_siteidkeyname); slaveid = atoi(foo); pfree(foo); /* ISEXPWITHSLAVEID */ return slaveid; } int getComputedSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc) { char *slave_siteidkeyname; char *path; char *forw_table; char *forw_table_id; char *pathcol; char *pathcolval; char *qb; char *q; HeapTuple resTuple; int ret; char *run; int attnum; SPITupleTable *SIDKEY_tupTable; int SIDKEY_processed; HeapTuple RealPar_tuple; SPITupleTable *RealPar_tupTable; char *ParTableName; #if defined DEBUG_OUTPUT elog(NOTICE, "in getComputedSlaveId of %s",cpTableName); #endif qb = "SELECT siteidkeyname from dbmirror_explicitreptables where tblname="; q = palloc(strlen(qb)+strlen(cpTableName)+2+1); sprintf(q,"%s'%s'",qb,cpTableName); ret = SPI_exec(q,1); pfree(q); if (ret != SPI_OK_SELECT || SPI_processed != 1) { elog(ERROR,"ISIMPLFORWARD Table %s NOT IN dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName); } SIDKEY_tupTable = SPI_tuptable; SIDKEY_processed = SPI_processed; resTuple = SIDKEY_tupTable->vals[0]; slave_siteidkeyname = SPI_getvalue(resTuple,SIDKEY_tupTable->tupdesc,1); if (slave_siteidkeyname == NULL) { pfree(slave_siteidkeyname); elog(ERROR,"ISIMPLFORWARD Table %s has NULL siteidkeyname in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName); } if (strncmp(slave_siteidkeyname,"___",3) != 0) { pfree(slave_siteidkeyname); elog(ERROR,"ISIMPLFORWARD Table %s has siteidkeyname not starting with \"___\" in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName); } // elog(NOTICE,"slave_siteidkeyname=-%s-",slave_siteidkeyname); path = slave_siteidkeyname + 3; // elog(NOTICE,"path=-%s-",path); run = index(path,'_'); if (run == NULL) { pfree(slave_siteidkeyname); elog(ERROR,"ISIMPLFORWARD Table %s has not the right \"___localcol_forwtable\" format siteidkeyname in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName); } pathcol = palloc(MAXCOL_LEN); // elog(NOTICE,"path=-%d-",(unsigned)path); // elog(NOTICE,"run=-%d-",(unsigned)run); // elog(NOTICE,"run-path=-%d-",run-path); *run = 0; strncpy(pathcol,path,run-path+1); // elog(NOTICE,"pathcol=-%s-",pathcol); forw_table = run+1; attnum = SPI_fnumber(tupleDesc,pathcol); if (attnum == SPI_ERROR_NOATTRIBUTE) { pfree(slave_siteidkeyname); elog(ERROR,"WRONG pathcol=%s given for table %s",slave_siteidkeyname,cpTableName); return 0; } pathcolval=SPI_getvalue(tuple,tupleDesc,attnum); /* forw_table PRIMARY KEY PREPEI PANTA NA EINAI "id" */ run = index(forw_table,'@'); if (run == NULL) { forw_table_id = palloc(MAXCOL_LEN); strncpy(forw_table_id,"id",3); } else { forw_table_id = palloc(MAXCOL_LEN); *run=0; strncpy(forw_table_id,forw_table,run-forw_table+1); forw_table=run+1; } q = palloc(512); sprintf(q,"SELECT * FROM public.%s where %s='%s'",forw_table,forw_table_id,pathcolval); ret = SPI_exec(q,1); if (ret != SPI_OK_SELECT || SPI_processed != 1) { pfree(slave_siteidkeyname); elog(ERROR, "Fatal error: ComputedSlaveId: par table %s should have row with %s=%s",forw_table,forw_table_id,pathcolval); return -2; } pfree(q); RealPar_tupTable = SPI_tuptable; RealPar_tuple = RealPar_tupTable->vals[0]; ParTableName = palloc(256); sprintf(ParTableName,"\"public\".\"%s\"",forw_table); ret = getSlaveId(ParTableName,RealPar_tuple,RealPar_tupTable->tupdesc); if (ret == -2) { pfree(slave_siteidkeyname); ret = getComputedSlaveId(ParTableName,RealPar_tuple,RealPar_tupTable->tupdesc); pfree(ParTableName); return ret; } if (ret <= 0) { pfree(slave_siteidkeyname); pfree(ParTableName); // elog(ERROR, "Fatal error: par table %s with slaveid!=-2 should have slaveid>0",forw_table); /*** apopiera xeirismou NULL slaveid, dhl isodynamo ISEXPWITHSLAVEID kai slaveid IS NULL ***/ return -3; } pfree(ParTableName); pfree(slave_siteidkeyname); return ret; /* PROSOXH: To swsto einai apla sto slave_siteidkeyname na fylame MONO to parent table, kai meta me query na briskoume to tuple tou parent table. */ } int getOldComputedSlaveId(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc) { char *slave_siteidkeyname; char *path; char *forw_table; char *forw_table_id; char *pathcol; char *pathcolval; char *qb; char *q; HeapTuple resTuple; int ret; char *run; int attnum; SPITupleTable *SIDKEY_tupTable; int SIDKEY_processed; HeapTuple RealPar_tuple; SPITupleTable *RealPar_tupTable; char *ParTableName; char *foo; int slaveid_d=-3; #if defined DEBUG_OUTPUT elog(NOTICE, "in getOldComputedSlaveId of %s",cpTableName); #endif qb = "SELECT siteidkeyname from dbmirror_explicitreptables where tblname="; q = palloc(strlen(qb)+strlen(cpTableName)+2+1); sprintf(q,"%s'%s'",qb,cpTableName); ret = SPI_exec(q,1); pfree(q); if (ret != SPI_OK_SELECT || SPI_processed != 1) { elog(ERROR,"ISIMPLFORWARD Table %s NOT IN dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName); } SIDKEY_tupTable = SPI_tuptable; SIDKEY_processed = SPI_processed; resTuple = SIDKEY_tupTable->vals[0]; slave_siteidkeyname = SPI_getvalue(resTuple,SIDKEY_tupTable->tupdesc,1); if (slave_siteidkeyname == NULL) { pfree(slave_siteidkeyname); elog(ERROR,"ISIMPLFORWARD Table %s has NULL siteidkeyname in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName); } if (strncmp(slave_siteidkeyname,"___",3) != 0) { pfree(slave_siteidkeyname); elog(ERROR,"ISIMPLFORWARD Table %s has siteidkeyname not starting with \"___\" in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName); } // elog(NOTICE,"slave_siteidkeyname=-%s-",slave_siteidkeyname); path = slave_siteidkeyname + 3; // elog(NOTICE,"path=-%s-",path); run = index(path,'_'); if (run == NULL) { pfree(slave_siteidkeyname); elog(ERROR,"ISIMPLFORWARD Table %s has not the right \"___localcol_forwtable\" format siteidkeyname in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName); } pathcol = palloc(MAXCOL_LEN); // elog(NOTICE,"path=-%d-",(unsigned)path); // elog(NOTICE,"run=-%d-",(unsigned)run); // elog(NOTICE,"run-path=-%d-",run-path); *run = 0; strncpy(pathcol,path,run-path+1); // elog(NOTICE,"pathcol=-%s-",pathcol); forw_table = run+1; attnum = SPI_fnumber(tupleDesc,pathcol); if (attnum == SPI_ERROR_NOATTRIBUTE) { pfree(slave_siteidkeyname); elog(ERROR,"WRONG pathcol=%s given for table %s",slave_siteidkeyname,cpTableName); return 0; } pathcolval=SPI_getvalue(tuple,tupleDesc,attnum); /* forw_table PRIMARY KEY PREPEI PANTA NA EINAI "id" */ run = index(forw_table,'@'); if (run == NULL) { forw_table_id = palloc(MAXCOL_LEN); strncpy(forw_table_id,"id",3); } else { forw_table_id = palloc(MAXCOL_LEN); *run=0; strncpy(forw_table_id,forw_table,run-forw_table+1); forw_table=run+1; } /* consult : https://docs.google.com/document/d/1gEXoF5Rm-9ieYgEtgSVQpRPCven4LoMEU_taNXOF6Rc/edit?usp=sharing an parent's op=='d' tote sigoura parent's origop=='u' */ q = palloc(512); sprintf(q," SELECT slaveid FROM public.dbmirror_pending WHERE tablename='\"public\".\"%s\"' AND op='d' AND xid=%d ORDER BY seqid DESC LIMIT 1 ",forw_table,GetCurrentTransactionId()); ret = SPI_exec(q,1); if (ret == SPI_OK_SELECT && SPI_processed == 1) { RealPar_tupTable = SPI_tuptable; RealPar_tuple = RealPar_tupTable->vals[0]; foo = SPI_getvalue(RealPar_tuple,RealPar_tupTable->tupdesc,1); if (foo == NULL) { slaveid_d = -3; } else { slaveid_d = atoi(foo); pfree(foo); } pfree(q); pfree(pathcolval); return slaveid_d; } /* dirty patch code to handle copy supplycase. it was supposed to solve the problem of not turning 'u' to 'i' for e.g supplycasesquotes */ /* but it doesnt work for fb_ , as it turns 'i' to 'u' as per : dbmirror issues when changing vslid (null->int, int->null, intX->intY) */ /* following code must be here (not commended) as of 2018-12-20 */ /* the aim is make everything work correctly, supplies + fb_ + all type ISIMPLFORWARD (-2) tables */ /* q = palloc(512); sprintf(q," SELECT slaveid FROM public.dbmirror_pending WHERE tablename='%s' AND op='i' AND xid=%d ORDER BY seqid DESC LIMIT 1 ",cpTableName,GetCurrentTransactionId()); ret = SPI_exec(q,1); if (ret == SPI_OK_SELECT && SPI_processed == 1) { RealPar_tupTable = SPI_tuptable; RealPar_tuple = RealPar_tupTable->vals[0]; foo = SPI_getvalue(RealPar_tuple,RealPar_tupTable->tupdesc,1); if (foo == NULL) { slaveid_d = -3; } else { slaveid_d = atoi(foo); pfree(foo); } pfree(q); pfree(pathcolval); return slaveid_d; } */ /** pls consult : https://docs.google.com/document/d/1gEXoF5Rm-9ieYgEtgSVQpRPCven4LoMEU_taNXOF6Rc/edit?usp=sharing if (parent op=='i' and origop <> 'i' exist) -pou simainei oti origop='u' kai (eite slaveid NULL -> NOT NULL eite X -> Y, X!=Y) - we consider this as an insert and return -3 in order to communicate this to handler(). NOTE, this means that it is ILLEGAL for the app to do UPDATE on the parent, then update on the kid, and then perform a second UPDATE on the kid, this will result in TWO inserts and produce an duplicate ERROR. else - diladi origop == 'i' or no parent op=='i' exists - pou simainei : ( eite parent origop == 'i' (synepagetai oti parent op == 'i') eite parent op != 'i' : (synepagetai : eite parent op == 'd' opote den tha ftasei edo o kwdikas, logw tou 1ou statement pio pano eite parent op == 'u' (case NOT NULL (X) -> NOT NULL (Y) me X==Y ) ) ) then we let the code fallback to return getComputedSlaveId(cpTableName,tuple,tupleDesc) which will return forw_slaveid . **/ q = palloc(512); sprintf(q," SELECT slaveid FROM public.dbmirror_pending WHERE tablename='\"public\".\"%s\"' AND op='i' AND origop <> 'i' AND xid=%d ORDER BY seqid DESC LIMIT 1 ",forw_table,GetCurrentTransactionId()); ret = SPI_exec(q,1); if (ret == SPI_OK_SELECT && SPI_processed == 1) { pfree(slave_siteidkeyname); pfree(pathcolval); return -3; } pfree(slave_siteidkeyname); pfree(pathcolval); return getComputedSlaveId(cpTableName,tuple,tupleDesc) ; /* ISA 'u' */ /* * PROYPOTHETOUME OTI O AMESOS BABAS exei entry sto dbmirror_pending me to idio xid kai op='d' */ } /* char getForwardParentOrigOp(char *cpTableName,HeapTuple tuple, TupleDesc tupleDesc) { char *slave_siteidkeyname; char *path; char *forw_table; char *forw_table_id; char *pathcol; char *pathcolval; char *qb; char *q; HeapTuple resTuple; int ret; char *run; int attnum; SPITupleTable *SIDKEY_tupTable; int SIDKEY_processed; HeapTuple RealPar_tuple; SPITupleTable *RealPar_tupTable; char *ParTableName; char *foo; int slaveid_d=-3; char origop; #if defined DEBUG_OUTPUT elog(NOTICE, "in getForwardParentOrigOp of %s",cpTableName); #endif qb = "SELECT siteidkeyname from dbmirror_explicitreptables where tblname="; q = palloc(strlen(qb)+strlen(cpTableName)+2+1); sprintf(q,"%s'%s'",qb,cpTableName); ret = SPI_exec(q,1); pfree(q); if (ret != SPI_OK_SELECT || SPI_processed != 1) { elog(ERROR,"ISIMPLFORWARD Table %s NOT IN dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName); } SIDKEY_tupTable = SPI_tuptable; SIDKEY_processed = SPI_processed; resTuple = SIDKEY_tupTable->vals[0]; slave_siteidkeyname = SPI_getvalue(resTuple,SIDKEY_tupTable->tupdesc,1); if (slave_siteidkeyname == NULL) { pfree(slave_siteidkeyname); elog(ERROR,"ISIMPLFORWARD Table %s has NULL siteidkeyname in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName); } if (strncmp(slave_siteidkeyname,"___",3) != 0) { pfree(slave_siteidkeyname); elog(ERROR,"ISIMPLFORWARD Table %s has siteidkeyname not starting with \"___\" in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName); } // elog(NOTICE,"slave_siteidkeyname=-%s-",slave_siteidkeyname); path = slave_siteidkeyname + 3; // elog(NOTICE,"path=-%s-",path); run = index(path,'_'); if (run == NULL) { pfree(slave_siteidkeyname); elog(ERROR,"ISIMPLFORWARD Table %s has not the right \"___localcol_forwtable\" format siteidkeyname in dbmirror_explicitreptables. How did we get here? CONTACT IT DEPT IMMEDIATELY.",cpTableName); } pathcol = palloc(MAXCOL_LEN); // elog(NOTICE,"path=-%d-",(unsigned)path); // elog(NOTICE,"run=-%d-",(unsigned)run); // elog(NOTICE,"run-path=-%d-",run-path); *run = 0; strncpy(pathcol,path,run-path+1); // elog(NOTICE,"pathcol=-%s-",pathcol); forw_table = run+1; attnum = SPI_fnumber(tupleDesc,pathcol); if (attnum == SPI_ERROR_NOATTRIBUTE) { pfree(slave_siteidkeyname); elog(ERROR,"WRONG pathcol=%s given for table %s",slave_siteidkeyname,cpTableName); return 0; } pathcolval=SPI_getvalue(tuple,tupleDesc,attnum); */ /* forw_table PRIMARY KEY PREPEI PANTA NA EINAI "id" */ /* run = index(forw_table,'@'); if (run == NULL) { forw_table_id = palloc(MAXCOL_LEN); strncpy(forw_table_id,"id",3); } else { forw_table_id = palloc(MAXCOL_LEN); *run=0; strncpy(forw_table_id,forw_table,run-forw_table+1); forw_table=run+1; } q = palloc(512); sprintf(q," SELECT origop FROM public.dbmirror_pending WHERE tablename='\"public\".\"%s\"' AND xid=%d ORDER BY seqid DESC LIMIT 1 ",forw_table,GetCurrentTransactionId()); ret = SPI_exec(q,1); if (ret == SPI_OK_SELECT && SPI_processed == 1) { RealPar_tupTable = SPI_tuptable; RealPar_tuple = RealPar_tupTable->vals[0]; foo = SPI_getvalue(RealPar_tuple,RealPar_tupTable->tupdesc,1); if (foo == NULL) { pfree(q); pfree(pathcolval); elog(ERROR,"WRONG forward parent table %s has null origop in this transaction",forw_table); } else { origop = foo[0]; pfree(foo); } pfree(q); pfree(pathcolval); return origop; } return ' '; } */ #define MAX_WHERE_CLAUSE 512 #define MAX_QUERY_LEN 512 int handleParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple, TupleDesc tupleDesc,Oid tableOid,int slaveid) { char *qb; char *q; bool isNull; int ret; int i; HeapTuple resTuple; Datum resDatum; Oid confrelid; char *confrelname; int16 *thisCols; int16 *fkCols; int16 *run; ArrayType *arr; char AthisColsVals[MAXKCOLS][MAXCOL_LEN]; char fkColsNames[MAXKCOLS][MAXCOL_LEN]; char thisColsTypes[MAXKCOLS][100]; short numOfCols; SPITupleTable *FK_tupTable; int FK_processed; #if defined DEBUG_OUTPUT elog(NOTICE, "in handleParents of Table=%d",tableOid); #endif qb = "SELECT c.confrelid,c.conkey,c.confkey,f.relname FROM pg_catalog.pg_constraint c,pg_catalog.pg_class f WHERE c.contype = 'f' AND c.confrelid = f.oid AND c.conrelid = "; q = palloc(strlen(qb)+MAX_OID_LEN+1); sprintf(q,"%s%d",qb,tableOid); ret = SPI_exec(q,0); pfree(q); if (ret != SPI_OK_SELECT || SPI_processed < 0) { elog(NOTICE, "no FKs"); return 0; } /* For every FK dependency we track and handle the parent table */ FK_tupTable = SPI_tuptable; FK_processed = SPI_processed; for (i=0;ivals[i]; resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,1,&isNull); confrelid = (Oid) DatumGetObjectId(resDatum); resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,2,&isNull); arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum)); thisCols = (int16 *)ARR_DATA_PTR(arr); numOfCols=ARR_DIMS(arr)[0]; #if defined DEBUG_OUTPUT int DIM_0 = ARR_DIMS(arr)[0]; int LBOUND_0 = ARR_LBOUND(arr)[0]; // resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,4,&isNull); // confrelname = (char *) DatumGetName(resDatum); // elog(NOTICE, "in handleParents of Table=%d, IN fetching FK for table=%s, DIM=%u, LBOUND=%u",tableOid,confrelname,DIM_0,LBOUND_0); #endif for (run=thisCols,colrun=0;colrunvals[0]; fooname = SPI_getvalue(thisatt_tuple,thisatt_tupTable->tupdesc,1); char *value = SPI_getvalue(Atuple,tupleDesc,thiscolrun=SPI_fnumber(tupleDesc,fooname)); char *coltype = SPI_gettype(tupleDesc,thiscolrun=SPI_fnumber(tupleDesc,fooname)); #if defined DEBUG_OUTPUT elog(NOTICE,"Found colname=%s syscol=%d thiscol=%d with After value=%s",fooname,*run,thiscolrun,value); #endif if (value == NULL || isExcluded(cpTableName,tupleDesc,thiscolrun)) { FkHasNullValueORXcluded=1; break; } memcpy(&(AthisColsVals[colrun][0]),value,strlen(value)+1); memcpy(&(thisColsTypes[colrun][0]),coltype,strlen(coltype)+1); if (Btuple != NULL) { char *Bvalue = SPI_getvalue(Btuple,tupleDesc,thiscolrun); #if defined DEBUG_OUTPUT elog(NOTICE,"Found colname=%s syscol=%d thiscol=%d with After value=%s,Before value=%s",fooname,*run,thiscolrun,value,Bvalue); #endif if (Bvalue == NULL) handleit=1; else if (strcmp(Bvalue,value)) { handleit=1; pfree(Bvalue); } } else handleit=1; pfree(value); } #if defined DEBUG_OUTPUT elog(NOTICE,"in handleParents--> END OF ATTR LOOP"); elog(NOTICE,"handle it = %d, ",handleit); elog(NOTICE,"FkHasNullValueORXcluded = %d, ",FkHasNullValueORXcluded); #endif if (FkHasNullValueORXcluded || !handleit) continue; resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,3,&isNull); arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum)); fkCols = (int16 *)ARR_DATA_PTR(arr); for (run=fkCols,colrun=0;colrunvals[0]; value = SPI_getvalue(PAR_resTuple,PAR_tupTable->tupdesc,1); memcpy(&(fkColsNames[colrun][0]),value,strlen(value)+1); pfree(value); } resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,4,&isNull); confrelname = (char *) DatumGetName(resDatum); /* There must be at least 1 column in compound FK!! */ WHERE = palloc(MAX_WHERE_CLAUSE); if (!strncmp(thisColsTypes[0],"int",3)) sprintf(WHERE,"\"%s\"=%s",fkColsNames[0],AthisColsVals[0]); else sprintf(WHERE,"\"%s\"='%s'",fkColsNames[0],AthisColsVals[0]); for (j=1;jvals[0]; ParTableName = palloc(256); sprintf(ParTableName,"\"public\".\"%s\"",confrelname); ret = handler(ParTableName,NULL,RealPar_tuple,RealPar_tupTable->tupdesc,confrelid,'m',slaveid,WHERE); pfree(ParTableName); pfree(WHERE); } return 0; } int existsInAccnt(char *cpTableName, int slaveid, char *pkxpress) { char *q; int ret; #if defined DEBUG_OUTPUT elog(NOTICE, "in existsInAccnt of Table=%s",cpTableName); #endif q = palloc(MAX_QUERY_LEN); /* sprintf(q,"SELECT 1 FROM dbmirror_accounting WHERE tblname='%s' AND pkxpress='%s' AND slaveid=%d",cpTableName,pkxpress,slaveid); ret = SPI_exec(q,1); pfree(q); if (ret != SPI_OK_SELECT || SPI_processed != 1) return 0; else return 1; */ sprintf(q,"UPDATE dbmirror_accounting set cnt=cnt+1 WHERE tblname='%s' AND pkxpress='%s' AND slaveid=%d",cpTableName,pkxpress,slaveid); ret = SPI_exec(q,1); pfree(q); if (ret != SPI_OK_UPDATE || SPI_processed != 1) return 0; else return 1; } int decreaseAccnt(char *cpTableName, int slaveid, char *pkxpress) { char *q; int ret; bool isNull; #if defined DEBUG_OUTPUT elog(NOTICE, "in decreaseAccnt of Table=%s",cpTableName); #endif q = palloc(MAX_QUERY_LEN); sprintf(q,"UPDATE dbmirror_accounting set cnt=cnt-1 WHERE tblname='%s' AND pkxpress='%s' AND slaveid=%d",cpTableName,pkxpress,slaveid); ret = SPI_exec(q,1); pfree(q); if (ret != SPI_OK_UPDATE || SPI_processed != 1) elog(ERROR,"problem in decreaseAccnt for table %s, slaveid=%d, pkxpress=%s",cpTableName,slaveid,pkxpress); q = palloc(MAX_QUERY_LEN); sprintf(q,"SELECT cnt FROM dbmirror_accounting WHERE tblname='%s' AND pkxpress='%s' AND slaveid=%d",cpTableName,pkxpress,slaveid); ret = SPI_exec(q,1); pfree(q); if (ret != SPI_OK_SELECT || SPI_processed != 1) elog(ERROR,"problem in decreaseAccnt for table %s, slaveid=%d, pkxpress=%s",cpTableName,slaveid,pkxpress); return DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],SPI_tuptable->tupdesc,1,&isNull)); } int createAccnt(char *cpTableName, int slaveid, char *pkxpress) { char *q; int ret; #if defined DEBUG_OUTPUT elog(NOTICE, "in createAccnt of Table=%s",cpTableName); #endif q = palloc(MAX_QUERY_LEN); /* public.dbmirror_accounting.cnt DEFAULT = 1 */ sprintf(q,"INSERT INTO dbmirror_accounting (tblname,pkxpress,slaveid) VALUES('%s','%s',%d)",cpTableName,pkxpress,slaveid); ret = SPI_exec(q,1); pfree(q); if (ret != SPI_OK_INSERT || SPI_processed != 1) return -2; else return 0; } char *getPKxpress(char *cpTableName, HeapTuple tuple, TupleDesc tupleDesc, Oid tableOid) { int2vector *pk; int i; char *WHERE = palloc(MAX_WHERE_CLAUSE); #if defined DEBUG_OUTPUT elog(NOTICE, "in getPKxpress of Table=%s",cpTableName); #endif pk = getPrimaryKey(tableOid); if (pk == NULL) return NULL; int16 *pkV = pk->values; int pkSize = pk->dim1; if (pkSize>0) { char *keyname; char *keyval; keyname = SPI_fname(tupleDesc,pkV[0]); if (keyname == NULL) { elog(ERROR,"FATAL ERROR: WRONG keyname key=%d given for table %s",pkV[0],cpTableName); return NULL; } keyval = SPI_getvalue(tuple,tupleDesc,pkV[0]); if (keyval == NULL) { elog(ERROR,"FATAL ERROR: WRONG keyval key=%d given for table %s",pkV[0],cpTableName); return NULL; } sprintf(WHERE,"\"%s\"=%s",keyname,keyval); pfree(keyname); pfree(keyval); } for (i=1;ivals[ret],SLAVE_tupTable->tupdesc,1); *(slaves+ret) = atoi(slaveidStr); pfree(slaveidStr); } *(slaves+SLAVE_processed) = 0; return slaves; } int deleteAccnt (char *cpTableName, char *pkxpress) { char *q; int ret; #if defined DEBUG_OUTPUT elog(NOTICE, "in deleteAccnt of Table=%s",cpTableName); #endif q = palloc(MAX_QUERY_LEN); sprintf(q,"DELETE FROM dbmirror_accounting where tblname='%s' AND pkxpress='%s'",cpTableName,pkxpress); ret = SPI_exec(q,0); pfree(q); if (ret == SPI_OK_DELETE) return 0; else return -2; } int deleteSlaveAccnt (char *cpTableName,int slaveid, char *pkxpress) { char *q; int ret; #if defined DEBUG_OUTPUT elog(NOTICE, "in deleteSlaveAccnt of Table=%s",cpTableName); #endif q = palloc(MAX_QUERY_LEN); sprintf(q,"DELETE FROM dbmirror_accounting where tblname='%s' AND pkxpress='%s' AND slaveid=%d",cpTableName,pkxpress,slaveid); ret = SPI_exec(q,0); pfree(q); if (ret == SPI_OK_DELETE) return 0; else return -2; } int updateAccntParents(char *cpTableName,HeapTuple Btuple,HeapTuple Atuple, TupleDesc tupleDesc,Oid tableOid,int slaveid) { char *qb; char *q; bool isNull; int ret; int i; HeapTuple resTuple; Datum resDatum; Oid confrelid; char *confrelname; int16 *thisCols; int16 *fkCols; int16 *run; ArrayType *arr; char BthisColsVals[MAXKCOLS][MAXCOL_LEN]; char fkColsNames[MAXKCOLS][MAXCOL_LEN]; char thisColsTypes[MAXKCOLS][100]; short numOfCols; SPITupleTable *FK_tupTable; int FK_processed; int ParSlaveId; #if defined DEBUG_OUTPUT elog(NOTICE, "in updateAccnt parents of Table=%d",tableOid); #endif qb = "SELECT c.confrelid,c.conkey,c.confkey,f.relname FROM pg_catalog.pg_constraint c,pg_catalog.pg_class f WHERE c.contype = 'f' AND c.confrelid = f.oid AND c.conrelid = "; q = palloc(strlen(qb)+MAX_OID_LEN+1); sprintf(q,"%s%d",qb,tableOid); ret = SPI_exec(q,0); pfree(q); if (ret != SPI_OK_SELECT || SPI_processed < 0) { elog(NOTICE, "no FKs"); return 0; } /* For every FK dependency we track and handle the parent table */ FK_tupTable = SPI_tuptable; FK_processed = SPI_processed; for (i=0;ivals[i]; resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,1,&isNull); confrelid = (Oid) DatumGetObjectId(resDatum); resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,2,&isNull); arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum)); thisCols = (int16 *)ARR_DATA_PTR(arr); numOfCols=ARR_DIMS(arr)[0]; for (run=thisCols,colrun=0;colrunvals[0]; fooname = SPI_getvalue(thisatt_tuple,thisatt_tupTable->tupdesc,1); char *value = SPI_getvalue(Btuple,tupleDesc,thiscolrun=SPI_fnumber(tupleDesc,fooname)); char *coltype = SPI_gettype(tupleDesc,thiscolrun=SPI_fnumber(tupleDesc,fooname)); #if defined DEBUG_OUTPUT elog(NOTICE,"Found colname=%s syscol=%d thiscol=%d with Before value=%s",fooname,*run,thiscolrun,value); #endif if (value == NULL || isExcluded(cpTableName,tupleDesc,thiscolrun)) { FkHasNullValueORXcluded=1; break; /* Nothing is done regarding "Before" status of parent since there is not one */ } memcpy(&(BthisColsVals[colrun][0]),value,strlen(value)+1); memcpy(&(thisColsTypes[colrun][0]),coltype,strlen(coltype)+1); if (Atuple != NULL) { char *Avalue = SPI_getvalue(Atuple,tupleDesc,thiscolrun); #if defined DEBUG_OUTPUT elog(NOTICE,"Found colname=%s syscol=%d thiscol=%d with Before value=%s,After value=%s",fooname,*run,thiscolrun,value,Avalue); #endif if (Avalue == NULL) decreaseit=1; else if (strcmp(Avalue,value)) { decreaseit=1; pfree(Avalue); } } else decreaseit=1; /* is delete operation*/ pfree(value); } #if defined DEBUG_OUTPUT elog(NOTICE,"in updateAccnParents--> END OF ATTR LOOP"); elog(NOTICE,"decrease it = %d, ",decreaseit); elog(NOTICE,"FkHasNullValueORXcluded = %d, ",FkHasNullValueORXcluded); #endif if (FkHasNullValueORXcluded || !decreaseit) continue; resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,3,&isNull); arr = (ArrayType *) DatumGetPointer(PG_DETOAST_DATUM(resDatum)); fkCols = (int16 *)ARR_DATA_PTR(arr); for (run=fkCols,colrun=0;colrunvals[0]; value = SPI_getvalue(PAR_resTuple,PAR_tupTable->tupdesc,1); memcpy(&(fkColsNames[colrun][0]),value,strlen(value)+1); pfree(value); } resDatum = SPI_getbinval(resTuple,FK_tupTable->tupdesc,4,&isNull); confrelname = (char *) DatumGetName(resDatum); /* there must be at least 1 column in compound FK!! */ WHERE = palloc(MAX_WHERE_CLAUSE); if (!strncmp(thisColsTypes[0],"int",3)) sprintf(WHERE,"\"%s\"=%s",fkColsNames[0],BthisColsVals[0]); else sprintf(WHERE,"\"%s\"='%s'",fkColsNames[0],BthisColsVals[0]); for (j=1;jvals[0]; ParTableName = palloc(256); sprintf(ParTableName,"\"public\".\"%s\"",confrelname); ParSlaveId = getSlaveId(ParTableName,RealPar_tuple,RealPar_tupTable->tupdesc); if (ParSlaveId >= 0 || ParSlaveId == -2 || ParSlaveId == -3) { pfree(ParTableName); continue; } if (decreaseAccnt(ParTableName,slaveid,WHERE)==0) { ret = storePending(ParTableName,RealPar_tuple,NULL,RealPar_tupTable->tupdesc,confrelid,'d',slaveid,'f'); ret = deleteSlaveAccnt(ParTableName,slaveid,WHERE); ret = updateAccntParents(ParTableName,RealPar_tuple,NULL,RealPar_tupTable->tupdesc,confrelid,slaveid); } pfree(ParTableName); pfree(WHERE); } return 0; }