diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 3a4c98b..8bd44af 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -3046,6 +3046,64 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
+ xact_commit bigint
+
+
+ Number of transactions successfully applied in this subscription
+
+
+
+
+
+ xact_commit_bytes bigint
+
+
+ Amount of transactions data successfully applied in this subscription
+
+
+
+
+
+ xact_error bigint
+
+
+ Number of transactions unsuccessfully applied in this subscription
+
+
+
+
+
+ xact_error_bytes bigint
+
+
+ Amount of transactions data unsuccessfully applied in this subscription
+
+
+
+
+
+ xact_abort bigint
+
+
+ Number of transactions aborted in this subscription
+
+
+
+
+
+ xact_abort_bytes bigint
+
+
+ Amount of transactions data aborted in this subscription.
+ Increase logical_decoding_work_mem on the publisher
+ to suppress unnecessary consumed network bandwidth or change in memory
+ of the subscriber, if unexpected amount of rollbacked transactions are
+ streamed.
+
+
+
+
+
latest_end_time timestamp with time zone
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index b0cd8d2..3a2c69b 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -909,10 +909,37 @@ CREATE VIEW pg_stat_subscription AS
st.last_msg_send_time,
st.last_msg_receipt_time,
st.latest_end_lsn,
+ pg_stat_get_subscription_xact_commit(su.oid, st.relid) AS xact_commit,
+ pg_stat_get_subscription_xact_commit_bytes(su.oid, st.relid) AS xact_commit_bytes,
+ pg_stat_get_subscription_xact_error(su.oid, st.relid) AS xact_error,
+ pg_stat_get_subscription_xact_error_bytes(su.oid, st.relid) AS xact_error_bytes,
+ pg_stat_get_subscription_xact_abort(su.oid, st.relid) AS xact_abort,
+ pg_stat_get_subscription_xact_abort_bytes(su.oid, st.relid) AS xact_abort_bytes,
st.latest_end_time
FROM pg_subscription su
- LEFT JOIN pg_stat_get_subscription(NULL) st
- ON (st.subid = su.oid);
+ LEFT JOIN
+ (SELECT
+ s.subid,
+ s.relid,
+ s.pid,
+ s.received_lsn,
+ s.last_msg_send_time,
+ s.last_msg_receipt_time,
+ s.latest_end_lsn,
+ s.latest_end_time
+ FROM pg_stat_get_subscription(NULL) s
+ UNION -- acquire relid of table sync worker
+ SELECT
+ r.srsubid,
+ r.srrelid,
+ NULL as pid,
+ NULL as received_lsn,
+ NULL as last_msg_send_time,
+ NULL as last_msg_receipt_time,
+ NULL as latest_end_lsn,
+ NULL as latest_end_time
+ FROM pg_subscription_rel r WHERE r.srsubstate <> 'r') st
+ ON (st.subid = su.oid);
CREATE VIEW pg_stat_ssl AS
SELECT
diff --git a/src/backend/executor/execPartition.c b/src/backend/executor/execPartition.c
index 5c723bc..107a7aa 100644
--- a/src/backend/executor/execPartition.c
+++ b/src/backend/executor/execPartition.c
@@ -194,6 +194,17 @@ static void find_matching_subplans_recurse(PartitionPruningData *prunedata,
/*
+ * PartitionTupleRoutingSize - exported to calculate total data size
+ * of logical replication mesage apply, because this is one of the
+ * ApplyExecutionData struct members.
+ */
+size_t
+PartitionTupleRoutingSize(void)
+{
+ return sizeof(PartitionTupleRouting);
+}
+
+/*
* ExecSetupPartitionTupleRouting - sets up information needed during
* tuple routing for partitioned tables, encapsulates it in
* PartitionTupleRouting, and returns it.
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index da4c493..c91ecf3 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -56,6 +56,7 @@
#include "postmaster/postmaster.h"
#include "replication/slot.h"
#include "replication/walsender.h"
+#include "replication/logicalworker.h"
#include "storage/backendid.h"
#include "storage/dsm.h"
#include "storage/fd.h"
@@ -381,6 +382,7 @@ static void pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len
static void pgstat_recv_connstat(PgStat_MsgConn *msg, int len);
static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len);
static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len);
+static void pgstat_recv_subscription_success(PgStat_MsgSubscriptionErr *msg, int len);
static void pgstat_recv_subscription_error(PgStat_MsgSubscriptionErr *msg, int len);
static void pgstat_recv_subscription_error_purge(PgStat_MsgSubscriptionErrPurge *msg,
int len);
@@ -2039,6 +2041,39 @@ pgstat_report_replslot_drop(const char *slotname)
}
/* ----------
+ * pgstat_report_subscription_success() -
+ *
+ * Tell the collector about the subscription success.
+ * ----------
+ */
+void
+pgstat_report_subscription_success(Oid subid, Oid subrel,
+ LogicalRepMsgType command, PgStat_Counter bytes)
+{
+ PgStat_MsgSubscriptionErr msg;
+
+ Assert(command == 0 /* table sync worker */ ||
+ command == LOGICAL_REP_MSG_COMMIT ||
+ command == LOGICAL_REP_MSG_COMMIT_PREPARED ||
+ command == LOGICAL_REP_MSG_STREAM_ABORT);
+
+ pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONSUCCESS);
+ msg.m_subid = subid;
+ msg.m_subrelid = subrel;
+ msg.m_clear = false;
+ msg.m_reset = false;
+ msg.m_relid = InvalidOid;
+ msg.m_command = command;
+ msg.m_xid = InvalidTransactionId;
+ msg.m_bytes = bytes;
+ msg.m_failure_time = GetCurrentTimestamp();
+ msg.m_errmsg[0] = '\0';
+ pgstat_send(&msg, sizeof(PgStat_MsgSubscriptionErr));
+
+ reset_apply_error_context_xact_size();
+}
+
+/* ----------
* pgstat_report_subscription_error() -
*
* Tell the collector about the subscription error.
@@ -2063,10 +2098,13 @@ pgstat_report_subscription_error(Oid subid, Oid subrelid, Oid relid,
msg.m_relid = relid;
msg.m_command = command;
msg.m_xid = xid;
+ msg.m_bytes = get_apply_error_context_xact_size();
msg.m_failure_time = GetCurrentTimestamp();
strlcpy(msg.m_errmsg, errmsg, PGSTAT_SUBSCRIPTIONERR_MSGLEN);
pgstat_send(&msg, len);
+
+ reset_apply_error_context_xact_size();
}
/* ----------
@@ -3730,6 +3768,10 @@ PgstatCollectorMain(int argc, char *argv[])
pgstat_recv_connstat(&msg.msg_conn, len);
break;
+ case PGSTAT_MTYPE_SUBSCRIPTIONSUCCESS:
+ pgstat_recv_subscription_success(&msg.msg_subscriptionerr, len);
+ break;
+
case PGSTAT_MTYPE_SUBSCRIPTIONERR:
pgstat_recv_subscription_error(&msg.msg_subscriptionerr, len);
break;
@@ -6172,6 +6214,45 @@ pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len)
}
/* ----------
+ * pgstat_recv_subscription_success() -
+ *
+ * Process a SUBSCRIPTIONERR message.
+ * ----------
+ */
+static void
+pgstat_recv_subscription_success(PgStat_MsgSubscriptionErr *msg, int len)
+{
+ PgStat_StatSubErrEntry *errent;
+ bool create = !(msg->m_reset || msg->m_clear);
+
+ errent = pgstat_get_subscription_error_entry(msg->m_subid,
+ msg->m_subrelid,
+ create);
+
+ /* msg from table sync worker */
+ if (msg->m_command == 0)
+ errent->xact_commit++;
+ else
+ {
+ switch(msg->m_command)
+ {
+ case LOGICAL_REP_MSG_COMMIT:
+ case LOGICAL_REP_MSG_COMMIT_PREPARED:
+ errent->xact_commit++;
+ errent->xact_commit_bytes += msg->m_bytes;
+ break;
+ case LOGICAL_REP_MSG_STREAM_ABORT:
+ errent->xact_abort++;
+ errent->xact_abort_bytes = msg->m_bytes;
+ break;
+ default:
+ elog(ERROR, "unexpected command type");
+ }
+ }
+ errent->last_failure = msg->m_failure_time;
+}
+
+/* ----------
* pgstat_recv_subscription_error() -
*
* Process a SUBSCRIPTIONERR message.
@@ -6232,6 +6313,8 @@ pgstat_recv_subscription_error(PgStat_MsgSubscriptionErr *msg, int len)
/* update the error entry */
errent->command = msg->m_command;
errent->xid = msg->m_xid;
+ errent->xact_error++;
+ errent->xact_error_bytes += msg->m_bytes;
errent->failure_count++;
errent->last_failure = msg->m_failure_time;
strlcpy(errent->last_errmsg, msg->m_errmsg, PGSTAT_SUBSCRIPTIONERR_MSGLEN);
@@ -6533,6 +6616,12 @@ pgstat_reset_subscription_error_entry(PgStat_StatSubErrEntry *errent,
{
errent->command = 0;
errent->xid = InvalidTransactionId;
+ errent->xact_commit = 0;
+ errent->xact_commit_bytes = 0;
+ errent->xact_error = 0;
+ errent->xact_error_bytes = 0;
+ errent->xact_abort = 0;
+ errent->xact_abort_bytes = 0;
errent->failure_count = 0;
errent->last_failure = 0;
errent->last_errmsg[0] = '\0';
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 9f5bf4b..2e1fc94 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -17,6 +17,7 @@
#include "catalog/pg_type.h"
#include "libpq/pqformat.h"
#include "replication/logicalproto.h"
+#include "replication/logicalworker.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index f07983a..7ba7486 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -1150,6 +1150,15 @@ copy_table_done:
SpinLockRelease(&MyLogicalRepWorker->relmutex);
/*
+ * Update the stats for table sync. We don't record the bytes of
+ * table synchronization.
+ */
+ pgstat_report_subscription_success(MyLogicalRepWorker->subid,
+ MyLogicalRepWorker->relid,
+ 0, /* no corresponding logical message type */
+ 0);
+
+ /*
* Finally, wait until the main apply worker tells us to catch up and then
* return to let LogicalRepApplyLoop do it.
*/
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index b20de59..1cd1d26 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -222,6 +222,20 @@ typedef struct ApplyErrorCallbackArg
LogicalRepMsgType command; /* 0 if invalid */
LogicalRepRelMapEntry *rel;
+ /*
+ * Store data size of this transaction.
+ *
+ * The byte size of transaction on the publisher is calculated
+ * by ReorderBufferChangeSize() based on the ReorderBufferChange
+ * structure. But on the subscriber, consumed resources are
+ * not same as the publisher's decoding process and necessary
+ * to compute those in different way. Then, the exact same byte
+ * size is not restored on the subscriber usually. Further, in
+ * order to give accurate bytes even in the case of error, add
+ * byte size to this value step by step.
+ */
+ PgStat_Counter bytes;
+
/* Remote node information */
int remote_attnum; /* -1 if invalid */
TransactionId remote_xid;
@@ -232,6 +246,7 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
{
.command = 0,
.rel = NULL,
+ .bytes = 0,
.remote_attnum = -1,
.remote_xid = InvalidTransactionId,
.ts = 0,
@@ -319,6 +334,7 @@ static void maybe_reread_subscription(void);
/* prototype needed because of stream_commit */
static void apply_dispatch(StringInfo s);
+void update_apply_change_size(LogicalRepMsgType action, void *data);
static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
static void apply_handle_insert_internal(ApplyExecutionData *edata,
@@ -852,6 +868,10 @@ apply_handle_commit(StringInfo s)
/* Process any tables that are being synchronized in parallel. */
process_syncing_tables(commit_data.end_lsn);
+ pgstat_report_subscription_success(MySubscription->oid,
+ InvalidOid,
+ LOGICAL_REP_MSG_COMMIT,
+ apply_error_callback_arg.bytes);
pgstat_report_activity(STATE_IDLE, NULL);
reset_apply_error_context_info();
@@ -965,6 +985,7 @@ apply_handle_prepare(StringInfo s)
/* Process any tables that are being synchronized in parallel. */
process_syncing_tables(prepare_data.end_lsn);
+ update_apply_change_size(LOGICAL_REP_MSG_PREPARE, NULL);
pgstat_report_activity(STATE_IDLE, NULL);
reset_apply_error_context_info();
}
@@ -1006,6 +1027,12 @@ apply_handle_commit_prepared(StringInfo s)
/* Process any tables that are being synchronized in parallel. */
process_syncing_tables(prepare_data.end_lsn);
+ update_apply_change_size(LOGICAL_REP_MSG_COMMIT_PREPARED, NULL);
+ pgstat_report_subscription_success(MySubscription->oid,
+ InvalidOid,
+ LOGICAL_REP_MSG_COMMIT_PREPARED,
+ apply_error_callback_arg.bytes);
+
pgstat_report_activity(STATE_IDLE, NULL);
reset_apply_error_context_info();
}
@@ -1193,6 +1220,7 @@ apply_handle_stream_start(StringInfo s)
MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet));
FileSetInit(MyLogicalRepWorker->stream_fileset);
+ update_apply_change_size(LOGICAL_REP_MSG_STREAM_START, NULL);
MemoryContextSwitchTo(oldctx);
}
@@ -1345,6 +1373,10 @@ apply_handle_stream_abort(StringInfo s)
if (is_skipping_changes())
stop_skipping_changes(InvalidXLogRecPtr, 0);
+ pgstat_report_subscription_success(MySubscription->oid,
+ InvalidOid,
+ LOGICAL_REP_MSG_STREAM_ABORT,
+ apply_error_callback_arg.bytes);
reset_apply_error_context_info();
}
@@ -1494,6 +1526,10 @@ apply_handle_stream_commit(StringInfo s)
stop_skipping_changes(commit_data.end_lsn, commit_data.committime);
store_flush_position(commit_data.end_lsn);
+
+ /* Update the size of change in memory of this transaction */
+ update_apply_change_size(LOGICAL_REP_MSG_STREAM_COMMIT, NULL);
+
in_remote_transaction = false;
}
else
@@ -1646,6 +1682,8 @@ apply_handle_insert(StringInfo s)
slot_fill_defaults(rel, estate, remoteslot);
MemoryContextSwitchTo(oldctx);
+ update_apply_change_size(LOGICAL_REP_MSG_INSERT, rel);
+
/* For a partitioned table, insert the tuple into a partition. */
if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
apply_handle_tuple_routing(edata,
@@ -1803,6 +1841,8 @@ apply_handle_update(StringInfo s)
has_oldtup ? &oldtup : &newtup);
MemoryContextSwitchTo(oldctx);
+ update_apply_change_size(LOGICAL_REP_MSG_UPDATE, rel);
+
/* For a partitioned table, apply update to correct partition. */
if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
apply_handle_tuple_routing(edata,
@@ -1937,6 +1977,7 @@ apply_handle_delete(StringInfo s)
slot_store_data(remoteslot, rel, &oldtup);
MemoryContextSwitchTo(oldctx);
+ update_apply_change_size(LOGICAL_REP_MSG_DELETE, rel);
/* For a partitioned table, apply delete to correct partition. */
if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
apply_handle_tuple_routing(edata,
@@ -2498,6 +2539,133 @@ apply_dispatch(StringInfo s)
apply_error_callback_arg.command = saved_command;
}
+
+/*
+ * Subscriber side implementation equivalent to ReorderBufferChangeSize
+ * of the publisher.
+ *
+ * According to the logical replication message type, record major
+ * resource consumptions of this subscription for each message.
+ * At present, do not collect data from generic functions to keep
+ * code simplicity. Also, by adding multiple values at once,
+ * reduce number of function calls.
+ *
+ * 'data' controls detail handling of data size calculation.
+ */
+void
+update_apply_change_size(LogicalRepMsgType action, void *data)
+{
+ int64 size = 0;
+ LogicalRepRelMapEntry *relmapentry;
+ LogicalRepRelation *reprelation;
+ int *stream_write_len;
+
+ /*
+ * In streaming mode, stream_write_change is called
+ * instead of immediate apply. List up the messages types
+ * that can be caught by handle_streamed_transaction and
+ * treat the write length as the size of transaction so
+ * that we can export it as part of pg_stat_subscription.
+ */
+ if (in_streamed_transaction &&
+ (action == LOGICAL_REP_MSG_INSERT ||
+ action == LOGICAL_REP_MSG_UPDATE ||
+ action == LOGICAL_REP_MSG_DELETE ||
+ action == LOGICAL_REP_MSG_TRUNCATE ||
+ action == LOGICAL_REP_MSG_RELATION ||
+ action == LOGICAL_REP_MSG_TYPE))
+ {
+ stream_write_len = (int *) data;
+ size += *stream_write_len;
+ add_apply_error_context_xact_size(size);
+ return;
+ }
+
+ switch (action)
+ {
+ /* Follow the same order as in the apply_dispatch */
+ case LOGICAL_REP_MSG_BEGIN:
+ case LOGICAL_REP_MSG_COMMIT:
+ break;
+
+ case LOGICAL_REP_MSG_INSERT:
+ case LOGICAL_REP_MSG_UPDATE:
+ case LOGICAL_REP_MSG_DELETE:
+ Assert(data != NULL);
+
+ /*
+ * Compute size based on ApplyExecutionData.
+ * The size of LogicalRepRelMapEntry can be skipped because
+ * it is obtained from hash_search in logicalrep_rel_open.
+ */
+ size += sizeof(ApplyExecutionData) + sizeof(EState) +
+ sizeof(ResultRelInfo) + sizeof(ResultRelInfo);
+
+ /*
+ * Add some extra size if the target relation is partitioned.
+ * PartitionTupleRouting isn't exported. Therefore, call the
+ * function that returns its size instead.
+ */
+ relmapentry = (LogicalRepRelMapEntry *) data;
+ if (relmapentry->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ size += sizeof(ModifyTableState) + PartitionTupleRoutingSize();
+ break;
+
+ case LOGICAL_REP_MSG_TRUNCATE:
+ /* No special consumption except for generic functions */
+ break;
+
+ case LOGICAL_REP_MSG_RELATION:
+ Assert(data != NULL);
+
+ reprelation = (LogicalRepRelation *) data;
+ /* See logicalrep_read_attrs for the last two */
+ size += sizeof(LogicalRepRelation) +
+ reprelation->natts * sizeof(char *) +
+ reprelation->natts * sizeof(Oid);
+ break;
+ case LOGICAL_REP_MSG_TYPE:
+ case LOGICAL_REP_MSG_ORIGIN:
+ case LOGICAL_REP_MSG_MESSAGE:
+ break;
+
+ case LOGICAL_REP_MSG_STREAM_START:
+ size += sizeof(FileSet);
+ break;
+
+ case LOGICAL_REP_MSG_STREAM_STOP:
+ case LOGICAL_REP_MSG_STREAM_ABORT:
+ break;
+
+ case LOGICAL_REP_MSG_STREAM_COMMIT:
+ size += sizeof(FlushPosition);
+ break;
+
+ case LOGICAL_REP_MSG_BEGIN_PREPARE:
+ case LOGICAL_REP_MSG_PREPARE:
+ break;
+
+ case LOGICAL_REP_MSG_COMMIT_PREPARED:
+ size += sizeof(FlushPosition);
+ break;
+
+ case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
+ break;
+
+ case LOGICAL_REP_MSG_STREAM_PREPARE:
+ size += sizeof(FlushPosition);
+ break;
+
+ default:
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("invalid logical replication message type \"%c\"", action)));
+ }
+ /* update the total size of consumption */
+ add_apply_error_context_xact_size(size);
+}
+
+
/*
* Figure out which write/flush positions to report to the walsender process.
*
@@ -3352,6 +3520,7 @@ static void
stream_write_change(char action, StringInfo s)
{
int len;
+ int total_len;
Assert(in_streamed_transaction);
Assert(TransactionIdIsValid(stream_xid));
@@ -3370,6 +3539,9 @@ stream_write_change(char action, StringInfo s)
len = (s->len - s->cursor);
BufFileWrite(stream_fd, &s->data[s->cursor], len);
+
+ total_len = (s->len - s->cursor) * 2 + sizeof(char) + sizeof(action);
+ update_apply_change_size(action, &total_len);
}
/*
@@ -3734,6 +3906,27 @@ set_apply_error_context_xact(TransactionId xid, TimestampTz ts)
apply_error_callback_arg.ts = ts;
}
+/* Exported so that stats collector can utilize this value */
+int64
+get_apply_error_context_xact_size(void)
+{
+ return apply_error_callback_arg.bytes;
+}
+
+/* Add size to apply error callback bytes */
+void
+add_apply_error_context_xact_size(int64 size)
+{
+ apply_error_callback_arg.bytes += size;
+}
+
+/* Reset bytes information of apply error callback */
+void
+reset_apply_error_context_xact_size(void)
+{
+ apply_error_callback_arg.bytes = 0;
+}
+
/* Reset all information of apply error callback */
static inline void
reset_apply_error_context_info(void)
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 51f693c..540de86 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2491,3 +2491,136 @@ pg_stat_get_subscription_error(PG_FUNCTION_ARGS)
/* Returns the record as Datum */
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
}
+
+Datum
+pg_stat_get_subscription_xact_commit(PG_FUNCTION_ARGS)
+{
+ Oid subid = PG_GETARG_OID(0);
+ Oid relid;
+ int64 result;
+ PgStat_StatSubErrEntry *errent;
+
+ if (PG_ARGISNULL(1))
+ relid = InvalidOid;
+ else
+ relid = PG_GETARG_OID(1);
+
+ /* Get subscription error entry */
+ if ((errent = pgstat_fetch_subscription_error(subid, relid)) == NULL)
+ result = 0;
+ else
+ result = (int64) (errent->xact_commit);
+
+ PG_RETURN_INT64(result);
+}
+
+Datum
+pg_stat_get_subscription_xact_commit_bytes(PG_FUNCTION_ARGS)
+{
+ Oid subid = PG_GETARG_OID(0);
+ Oid relid;
+ int64 result;
+ PgStat_StatSubErrEntry *errent;
+
+ if (PG_ARGISNULL(1))
+ relid = InvalidOid;
+ else
+ relid = PG_GETARG_OID(1);
+
+ /* Get subscription error entry */
+ if ((errent = pgstat_fetch_subscription_error(subid, relid)) == NULL)
+ result = 0;
+ else
+ result = (int64) (errent->xact_commit_bytes);
+
+ PG_RETURN_INT64(result);
+}
+
+
+Datum
+pg_stat_get_subscription_xact_error(PG_FUNCTION_ARGS)
+{
+ Oid subid = PG_GETARG_OID(0);
+ Oid relid;
+ int64 result;
+ PgStat_StatSubErrEntry *errent;
+
+ if (PG_ARGISNULL(1))
+ relid = InvalidOid;
+ else
+ relid = PG_GETARG_OID(1);
+
+ /* Get subscription error entry */
+ if ((errent = pgstat_fetch_subscription_error(subid, relid)) == NULL)
+ result = 0;
+ else
+ result = (int64) (errent->xact_error);
+
+ PG_RETURN_INT64(result);
+}
+
+Datum
+pg_stat_get_subscription_xact_error_bytes(PG_FUNCTION_ARGS)
+{
+ Oid subid = PG_GETARG_OID(0);
+ Oid relid;
+ int64 result;
+ PgStat_StatSubErrEntry *errent;
+
+ if (PG_ARGISNULL(1))
+ relid = InvalidOid;
+ else
+ relid = PG_GETARG_OID(1);
+
+ /* Get subscription error entry */
+ if ((errent = pgstat_fetch_subscription_error(subid, relid)) == NULL)
+ result = 0;
+ else
+ result = (int64) (errent->xact_error_bytes);
+
+ PG_RETURN_INT64(result);
+}
+
+Datum
+pg_stat_get_subscription_xact_abort(PG_FUNCTION_ARGS)
+{
+ Oid subid = PG_GETARG_OID(0);
+ Oid relid;
+ int64 result;
+ PgStat_StatSubErrEntry *errent;
+
+ if (PG_ARGISNULL(1))
+ relid = InvalidOid;
+ else
+ relid = PG_GETARG_OID(1);
+
+ /* Get subscription error entry */
+ if ((errent = pgstat_fetch_subscription_error(subid, relid)) == NULL)
+ result = 0;
+ else
+ result = (int64) (errent->xact_abort);
+
+ PG_RETURN_INT64(result);
+}
+
+Datum
+pg_stat_get_subscription_xact_abort_bytes(PG_FUNCTION_ARGS)
+{
+ Oid subid = PG_GETARG_OID(0);
+ Oid relid;
+ int64 result;
+ PgStat_StatSubErrEntry *errent;
+
+ if (PG_ARGISNULL(1))
+ relid = InvalidOid;
+ else
+ relid = PG_GETARG_OID(1);
+
+ /* Get subscription error entry */
+ if ((errent = pgstat_fetch_subscription_error(subid, relid)) == NULL)
+ result = 0;
+ else
+ result = (int64) (errent->xact_abort_bytes);
+
+ PG_RETURN_INT64(result);
+}
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index ac02061..83969a5 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5393,6 +5393,30 @@
proargmodes => '{i,i,o,o,o,o,o,o,o,o,o}',
proargnames => '{subid,relid,subid,relid,command,xid,failure_source,failure_count,last_failure,last_failure_message,stats_reset}',
prosrc => 'pg_stat_get_subscription_error' },
+{ oid => '8525', descr => 'statistics: number of transactions commit for a subscription',
+ proname => 'pg_stat_get_subscription_xact_commit', proisstrict => 'f', provolatile => 's',
+ proparallel => 'r', prorettype => 'int8', proargtypes => 'oid oid',
+ prosrc => 'pg_stat_get_subscription_xact_commit' },
+{ oid => '8526', descr => 'statistics: bytes of transactions commit for a subscription',
+ proname => 'pg_stat_get_subscription_xact_commit_bytes', proisstrict => 'f', provolatile => 's',
+ proparallel => 'r', prorettype => 'int8', proargtypes => 'oid oid',
+ prosrc => 'pg_stat_get_subscription_xact_commit_bytes' },
+{ oid => '8527', descr => 'statistics: number of transactions error for a subscription',
+ proname => 'pg_stat_get_subscription_xact_error', proisstrict => 'f', provolatile => 's',
+ proparallel => 'r', prorettype => 'int8', proargtypes => 'oid oid',
+ prosrc => 'pg_stat_get_subscription_xact_error' },
+{ oid => '8528', descr => 'statistics: bytes of transactions error for a subscription',
+ proname => 'pg_stat_get_subscription_xact_error_bytes', proisstrict => 'f', provolatile => 's',
+ proparallel => 'r', prorettype => 'int8', proargtypes => 'oid oid',
+ prosrc => 'pg_stat_get_subscription_xact_error_bytes' },
+{ oid => '8529', descr => 'statistics: number of transactions abort for a subscription',
+ proname => 'pg_stat_get_subscription_xact_abort', proisstrict => 'f', provolatile => 's',
+ proparallel => 'r', prorettype => 'int8', proargtypes => 'oid oid',
+ prosrc => 'pg_stat_get_subscription_xact_abort' },
+{ oid => '8530', descr => 'statistics: bytes of transactions abort for a subscription',
+ proname => 'pg_stat_get_subscription_xact_abort_bytes', proisstrict => 'f', provolatile => 's',
+ proparallel => 'r', prorettype => 'int8', proargtypes => 'oid oid',
+ prosrc => 'pg_stat_get_subscription_xact_abort_bytes' },
{ oid => '6118', descr => 'statistics: information about subscription',
proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
proretset => 't', provolatile => 's', proparallel => 'r',
diff --git a/src/include/executor/execPartition.h b/src/include/executor/execPartition.h
index 694e38b..773e46c 100644
--- a/src/include/executor/execPartition.h
+++ b/src/include/executor/execPartition.h
@@ -110,6 +110,7 @@ typedef struct PartitionPruneState
PartitionPruningData *partprunedata[FLEXIBLE_ARRAY_MEMBER];
} PartitionPruneState;
+extern size_t PartitionTupleRoutingSize(void);
extern PartitionTupleRouting *ExecSetupPartitionTupleRouting(EState *estate,
Relation rel);
extern ResultRelInfo *ExecFindPartition(ModifyTableState *mtstate,
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 5ed1319..6109933 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -67,6 +67,7 @@ typedef enum StatMsgType
PGSTAT_MTYPE_RESETSINGLECOUNTER,
PGSTAT_MTYPE_RESETSLRUCOUNTER,
PGSTAT_MTYPE_RESETREPLSLOTCOUNTER,
+ PGSTAT_MTYPE_SUBSCRIPTIONSUCCESS,
PGSTAT_MTYPE_SUBSCRIPTIONERR,
PGSTAT_MTYPE_SUBSCRIPTIONERRPURGE,
PGSTAT_MTYPE_SUBSCRIPTIONPURGE,
@@ -564,6 +565,7 @@ typedef struct PgStat_MsgSubscriptionErr
Oid m_relid;
LogicalRepMsgType m_command;
TransactionId m_xid;
+ PgStat_Counter m_bytes;
TimestampTz m_failure_time;
char m_errmsg[PGSTAT_SUBSCRIPTIONERR_MSGLEN];
} PgStat_MsgSubscriptionErr;
@@ -990,13 +992,28 @@ typedef struct PgStat_StatReplSlotEntry
* Subscription error statistics kept in the stats collector, representing
* an error that occurred during application of logical replicatoin or
* initial table synchronization.
+ *
+ * Other general stats for transaction on subscription are stored in this entry
+ * as well, aligned with transaction error stats. Holding the size of computed
+ * result of bytes during replay depends on the apply error callback not to be
+ * lost and having general stats in the same place reduces code complexity
+ * and redundancy.
*/
typedef struct PgStat_StatSubErrEntry
{
Oid relid; /* hash table key */
LogicalRepMsgType command;
TransactionId xid;
- PgStat_Counter failure_count;
+
+ /* transaction stats of subscription */
+ PgStat_Counter xact_commit;
+ PgStat_Counter xact_commit_bytes;
+ PgStat_Counter xact_error;
+ PgStat_Counter xact_error_bytes; /* total error counts of this subscription */
+ PgStat_Counter xact_abort;
+ PgStat_Counter xact_abort_bytes;
+ PgStat_Counter failure_count; /* total error counts of one specific error
+ continuously happening */
TimestampTz last_failure;
char last_errmsg[PGSTAT_SUBSCRIPTIONERR_MSGLEN];
TimestampTz stat_reset_timestamp;
@@ -1129,6 +1146,8 @@ extern void pgstat_report_checksum_failure(void);
extern void pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat);
extern void pgstat_report_replslot_create(const char *slotname);
extern void pgstat_report_replslot_drop(const char *slotname);
+extern void pgstat_report_subscription_success(Oid subid, Oid subrel,
+ LogicalRepMsgType command, PgStat_Counter bytes);
extern void pgstat_report_subscription_error(Oid subid, Oid subrelid, Oid relid,
LogicalRepMsgType command,
TransactionId xid, const char *errmsg);
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
index 2ad61a0..923c8ce 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -15,5 +15,9 @@
extern void ApplyWorkerMain(Datum main_arg);
extern bool IsLogicalWorker(void);
+extern int64 get_apply_error_context_xact_size(void);
+extern void add_apply_error_context_xact_size(int64 size);
+extern void reset_apply_error_context_xact_size(void);
+
#endif /* LOGICALWORKER_H */
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 3719b8a..7a323c6 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2091,9 +2091,34 @@ pg_stat_subscription| SELECT su.oid AS subid,
st.last_msg_send_time,
st.last_msg_receipt_time,
st.latest_end_lsn,
+ pg_stat_get_subscription_xact_commit(su.oid, st.relid) AS xact_commit,
+ pg_stat_get_subscription_xact_commit_bytes(su.oid, st.relid) AS xact_commit_bytes,
+ pg_stat_get_subscription_xact_error(su.oid, st.relid) AS xact_error,
+ pg_stat_get_subscription_xact_error_bytes(su.oid, st.relid) AS xact_error_bytes,
+ pg_stat_get_subscription_xact_abort(su.oid, st.relid) AS xact_abort,
+ pg_stat_get_subscription_xact_abort_bytes(su.oid, st.relid) AS xact_abort_bytes,
st.latest_end_time
FROM (pg_subscription su
- LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid)));
+ LEFT JOIN ( SELECT s.subid,
+ s.relid,
+ s.pid,
+ s.received_lsn,
+ s.last_msg_send_time,
+ s.last_msg_receipt_time,
+ s.latest_end_lsn,
+ s.latest_end_time
+ FROM pg_stat_get_subscription(NULL::oid) s(subid, relid, pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time)
+ UNION
+ SELECT r.srsubid,
+ r.srrelid,
+ NULL::integer AS pid,
+ NULL::pg_lsn AS received_lsn,
+ NULL::timestamp with time zone AS last_msg_send_time,
+ NULL::timestamp with time zone AS last_msg_receipt_time,
+ NULL::pg_lsn AS latest_end_lsn,
+ NULL::timestamp with time zone AS latest_end_time
+ FROM pg_subscription_rel r
+ WHERE (r.srsubstate <> 'r'::"char")) st ON ((st.subid = su.oid)));
pg_stat_subscription_errors| SELECT d.datname,
sr.subid,
s.subname,