*** a/src/backend/utils/adt/trigfuncs.c --- b/src/backend/utils/adt/trigfuncs.c *************** *** 13,21 **** */ #include "postgres.h" ! #include "access/htup.h" #include "commands/trigger.h" #include "utils/builtins.h" /* --- 13,25 ---- */ #include "postgres.h" ! #include "executor/spi.h" ! #include "catalog/indexing.h" ! #include "commands/async.h" #include "commands/trigger.h" #include "utils/builtins.h" + #include "utils/fmgroids.h" + #include "utils/tqual.h" /* *************** *** 93,95 **** suppress_redundant_updates_trigger(PG_FUNCTION_ARGS) --- 97,261 ---- return PointerGetDatum(rettuple); } + + + /* + * Copy from s (for source) to r (for result), wrapping with q (quote) + * characters and doubling any quote characters found. + */ + static char * + strcpy_quoted(char *r, const char *s, const char q) + { + *r++ = q; + while (*s) + { + if (*s == q) + *r++ = q; + *r++ = *s; + s++; + } + *r++ = q; + return r; + } + + /* + * triggered_change_notification + * + * This trigger function will send a notification of data modification with + * primary key values. The channel will be "tcn" unless the trigger is + * created with a parameter, in which case that parameter will be used. + */ + Datum + triggered_change_notification(PG_FUNCTION_ARGS) + { + TriggerData *trigdata = (TriggerData *) fcinfo->context; + Trigger *trigger; + int nargs; + HeapTuple trigtuple, + newtuple; + HeapTupleHeader trigheader, + newheader; + Relation rel; + TupleDesc tupdesc; + Relation indexRelation; + ScanKeyData skey; + SysScanDesc scan; + HeapTuple indexTuple; + char *channel; + char operation; + char payload[200]; + char *p; + bool foundPK; + + /* make sure it's called as a trigger */ + if (!CALLED_AS_TRIGGER(fcinfo)) + ereport(ERROR, + (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), + errmsg("triggered_change_notification: must be called as trigger"))); + + /* and that it's called after the change */ + if (!TRIGGER_FIRED_AFTER(trigdata->tg_event)) + ereport(ERROR, + (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), + errmsg("triggered_change_notification: must be called after the change"))); + + /* and that it's called for each row */ + if (!TRIGGER_FIRED_FOR_ROW(trigdata->tg_event)) + ereport(ERROR, + (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), + errmsg("suppress_redundant_updates_trigger: must be called for each row"))); + + if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event)) + operation = 'I'; + else if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event)) + operation = 'U'; + else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event)) + operation = 'D'; + else + { + elog(ERROR, "suppress_redundant_updates_trigger: trigger fired by unrecognized operation"); + operation = 'X'; /* silence compiler warning */ + } + + trigger = trigdata->tg_trigger; + nargs = trigger->tgnargs; + if (nargs > 1) + ereport(ERROR, + (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), + errmsg("suppress_redundant_updates_trigger: must not be called with more than one parameter"))); + + if (nargs == 0) + channel = "tcn"; + else + channel = trigger->tgargs[0]; + + /* get tuple data, set default result */ + trigtuple = trigdata->tg_trigtuple; + newtuple = trigdata->tg_newtuple; + + trigheader = trigtuple->t_data; + newheader = newtuple->t_data; + + rel = trigdata->tg_relation; + tupdesc = rel->rd_att; + + foundPK = false; + + /* Prepare to scan pg_index for entries having indrelid = this rel. */ + indexRelation = heap_open(IndexRelationId, AccessShareLock); + ScanKeyInit(&skey, + Anum_pg_index_indrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationGetRelid(rel))); + + scan = systable_beginscan(indexRelation, IndexIndrelidIndexId, true, + SnapshotNow, 1, &skey); + + while (HeapTupleIsValid(indexTuple = systable_getnext(scan))) + { + Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple); + + /* we're only interested if it is the primary key */ + if (index->indisprimary) + { + int numatts = index->indnatts; + + if (numatts > 0) + { + int i; + + foundPK = true; + + p = strcpy_quoted(payload, SPI_getrelname(rel), '"'); + *p++ = ','; + *p++ = operation; + + for (i = 0; i < numatts; i++) + { + int colno = index->indkey.values[i]; + + /* TODO: for UPDATE, check that the value wasn't updated? */ + + *p++ = ','; + p = strcpy_quoted(p, SPI_fname(tupdesc, colno), '"'); + *p++ = '='; + p = strcpy_quoted(p, SPI_getvalue(trigtuple, tupdesc, colno), '\''); + } + *p = '\0'; + + Async_Notify(channel, payload); + } + break; + } + } + + systable_endscan(scan); + heap_close(indexRelation, AccessShareLock); + + if (!foundPK) + ereport(ERROR, + (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), + errmsg("triggered_change_notification: must be called on a table with a primary key"))); + + return PointerGetDatum(NULL); /* after trigger; value doesn't matter */ + } *** a/src/include/catalog/pg_proc.h --- b/src/include/catalog/pg_proc.h *************** *** 1638,1643 **** DESCR("convert oid to int8"); --- 1638,1645 ---- DATA(insert OID = 1291 ( suppress_redundant_updates_trigger PGNSP PGUID 12 1 0 0 f f f t f v 0 0 2279 "" _null_ _null_ _null_ _null_ suppress_redundant_updates_trigger _null_ _null_ _null_ )); DESCR("trigger to suppress updates when new and old records match"); + DATA(insert OID = 2650 ( triggered_change_notification PGNSP PGUID 12 1 0 0 f f f t f v 0 0 2279 "" _null_ _null_ _null_ _null_ triggered_change_notification _null_ _null_ _null_ )); + DESCR("trigger function to send change notification with primary key in payload"); DATA(insert OID = 1292 ( tideq PGNSP PGUID 12 1 0 0 f f f t f i 2 0 16 "27 27" _null_ _null_ _null_ _null_ tideq _null_ _null_ _null_ )); DESCR("equal"); *** a/src/include/utils/builtins.h --- b/src/include/utils/builtins.h *************** *** 959,964 **** extern Datum RI_FKey_setdefault_upd(PG_FUNCTION_ARGS); --- 959,965 ---- /* trigfuncs.c */ extern Datum suppress_redundant_updates_trigger(PG_FUNCTION_ARGS); + extern Datum triggered_change_notification(PG_FUNCTION_ARGS); /* encoding support functions */ extern Datum getdatabaseencoding(PG_FUNCTION_ARGS);