diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 52ac1183e44..8cf90eda787 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -526,12 +526,11 @@ CREATE SUBSCRIPTION subscription_namemax_retention_duration (integer) - Maximum duration for which this subscription's apply worker is allowed - to retain the information useful for conflict detection when - retain_dead_tuples is enabled for the associated - subscriptions. The default value is 0, indicating - that the information is retained until it is no longer needed for - detection purposes. This value is taken as milliseconds. + Maximum duration in milliseconds for which this subscription's apply worker + is allowed to retain the information useful for conflict detection when + retain_dead_tuples is enabled. The default value + is 0, indicating that the information is retained + until it is no longer needed for detection purposes. The information useful for conflict detection is no longer retained if @@ -541,12 +540,13 @@ CREATE SUBSCRIPTION subscription_namemax_retention_duration set within the corresponding subscription. To re-enable retention manually, you can disable retain_dead_tuples for all subscriptions and - re-enable it after confirming this replication slot has been dropped. + re-enable it after confirming replication slot + pg_conflict_detection has been dropped. - Note that overall retention will not stop if other subscriptions - specify a greater value and have not exceeded it, or if they set this - option to 0. + Note that overall retention will not stop if other subscriptions that + have a value greater than 0 for this parameter have not exceeded it, + or if they set this option to 0. This option is effective only when diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index fbd3d1900f4..c77fa0234bb 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1389,7 +1389,7 @@ REVOKE ALL ON pg_subscription FROM public; GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled, subbinary, substream, subtwophasestate, subdisableonerr, subpasswordrequired, subrunasowner, subfailover, - subretaindeadtuples, submaxretention, subretentionactive, + subretaindeadtuples, submaxretention, subretentionactive, subslotname, subsynccommit, subpublications, suborigin) ON pg_subscription TO public; diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index f9d92702434..aeb0c413067 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -662,8 +662,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, } /* - * Ensure that the configurations for retain_dead_tuples and - * max_retention_duration is appropriate. + * Ensure that system configuration paramters are set appropriately to + * support retain_dead_tuples and max_retention_duration. */ CheckSubDeadTupleRetention(true, !opts.enabled, WARNING, opts.retaindeadtuples, opts.retaindeadtuples, @@ -1490,8 +1490,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, } /* - * Ensure that the configurations for retain_dead_tuples and - * max_retention_duration is appropriate. + * Ensure that system configuration paramters are set appropriately to + * support retain_dead_tuples and max_retention_duration. */ if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES) || IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION)) diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index c0211867881..969d48a1d5d 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -23,7 +23,6 @@ #include "access/tableam.h" #include "access/xact.h" #include "catalog/pg_subscription.h" -#include "catalog/pg_subscription_d.h" #include "catalog/pg_subscription_rel.h" #include "funcapi.h" #include "lib/dshash.h" @@ -1258,7 +1257,7 @@ ApplyLauncherMain(Datum main_arg) /* * Compute the minimum xmin required to protect dead tuples * required for conflict detection among all running apply - * workers that enables retain_dead_tuples. + * workers. */ if (sub->retaindeadtuples && sub->retentionactive && @@ -1328,11 +1327,10 @@ ApplyLauncherMain(Datum main_arg) * advance the slot's xmin to protect dead tuples required for the * conflict detection. * - * However, if all apply workers for subscriptions with - * retain_dead_tuples enabled have requested to cease retention, - * marking it as inactive, the new xmin will be set to - * InvalidTransactionId. We then update slot.xmin accordingly to - * permit the removal of dead tuples. + * Additionally, if all apply workers for subscriptions with + * retain_dead_tuples enabled have requested to stop retention, + * the slot's xmin will be set to InvalidTransactionId allowing the + * removal of dead tuples. */ if (MyReplicationSlot) { diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index d378eb08c71..df16cbe3716 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -173,6 +173,14 @@ * Advance the non-removable transaction ID if the current flush location has * reached or surpassed the last received WAL position. * + * - RDT_STOP_CONFLICT_INFO_RETENTION: + * This phase is required only when max_retention_duration is defined. We + * enter this phase if the wait time in either the + * RDT_WAIT_FOR_PUBLISHER_STATUS or RDT_WAIT_FOR_LOCAL_FLUSH phase exceeds + * configured max_retention_duration. In this phase, + * pg_subscription.subretentionactive is updated to false within a new + * transaction, and oldest_nonremovable_xid is set to InvalidTransactionId. + * * The overall state progression is: GET_CANDIDATE_XID -> * REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to * REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) -> @@ -190,16 +198,6 @@ * update_deleted is necessary, as the UPDATEs in remote transactions should be * ignored if their timestamp is earlier than that of the dead tuples. * - * If max_retention_duration is defined, one additional phase is - * involved: - * - * - RDT_STOP_CONFLICT_INFO_RETENTION: - * This phase is triggered when the wait time in either the - * RDT_WAIT_FOR_PUBLISHER_STATUS or RDT_WAIT_FOR_LOCAL_FLUSH phase exceeds - * max_retention_duration. During this phase, - * pg_subscription.subretentionactive is updated to false within a new - * transaction, and oldest_nonremovable_xid is set to InvalidTransactionId. - * * Note that advancing the non-removable transaction ID is not supported if the * publisher is also a physical standby. This is because the logical walsender * on the standby can only get the WAL replay position but there may be more @@ -569,9 +567,9 @@ static void request_publisher_status(RetainDeadTuplesData *rdt_data); static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data, bool status_received); static void wait_for_local_flush(RetainDeadTuplesData *rdt_data); +static bool should_stop_conflict_info_retention(RetainDeadTuplesData* rdt_data); static void stop_conflict_info_retention(RetainDeadTuplesData *rdt_data); static void reset_retention_data_fields(RetainDeadTuplesData *rdt_data); -static bool should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data); static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found); @@ -3247,16 +3245,16 @@ FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid, /* * For conflict detection, we use the leader worker's * oldest_nonremovable_xid value instead of invoking - * GetOldestNonRemovableTransactionId() or relying on the conflict - * detection slot's xmin. The oldest_nonremovable_xid acts as a threshold - * to identify tuples that were recently deleted. These tuples are not - * visible to concurrent transactions, but we log an update_deleted - * conflict if such a tuple matches the remote update being applied. + * GetOldestNonRemovableTransactionId() or using the conflict detection + * slot's xmin. The oldest_nonremovable_xid acts as a threshold to identify + * tuples that were recently deleted. These deleted tuples are no longer + * visible to concurrent transactions. However, if a remote update matches + * such a tuple, we log an update_deleted conflict. * - * Although GetOldestNonRemovableTransactionId() and slot.xmin can return - * a value older than the oldest_nonremovable_xid, for our current purpose - * it is acceptable to treat tuples deleted by transactions prior to - * oldest_nonremovable_xid as update_missing conflicts. + * While GetOldestNonRemovableTransactionId() and slot.xmin may return + * transaction IDs older than oldest_nonremovable_xid, for our current + * purpose, it is acceptable to treat tuples deleted by transactions prior + * to oldest_nonremovable_xid as update_missing conflicts. */ if (am_leader_apply_worker()) { @@ -4669,6 +4667,55 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data) process_rdt_phase_transition(rdt_data, false); } +/* + * Check whether conflict information retention should be stopped due to + * exceeding the maximum wait time (max_retention_duration). + * + * If retention should be stopped, transition to the + * RDT_STOP_CONFLICT_INFO_RETENTION phase and return true. Otherwise, return + * false. + * + * Note: Retention won't be resumed automatically. The user must manually + * disable retain_dead_tuples and re-enable it after confirming that the + * replication slot maintained by the launcher has been dropped. + */ +static bool +should_stop_conflict_info_retention(RetainDeadTuplesData* rdt_data) +{ + TimestampTz now; + + Assert(TransactionIdIsValid(rdt_data->candidate_xid)); + Assert(rdt_data->phase == RDT_WAIT_FOR_PUBLISHER_STATUS || + rdt_data->phase == RDT_WAIT_FOR_LOCAL_FLUSH); + + if (!MySubscription->maxretention) + return false; + + /* + * Use last_recv_time when applying changes in the loop to avoid + * unnecessary system time retrieval. If last_recv_time is not available, + * obtain the current timestamp. + */ + now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp(); + + /* + * Return early if the wait time has not exceeded the configured maximum + * (max_retention_duration). Time spent waiting for table synchronization + * is excluded from this calculation, as it occurs infrequently. + */ + if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now, + MySubscription->maxretention + + rdt_data->table_sync_wait_time)) + return false; + + rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION; + + /* process the next phase */ + process_rdt_phase_transition(rdt_data, false); + + return true; +} + /* * Workhorse for the RDT_STOP_CONFLICT_INFO_RETENTION phase. */ @@ -4732,67 +4779,22 @@ reset_retention_data_fields(RetainDeadTuplesData *rdt_data) rdt_data->table_sync_wait_time = 0; } -/* - * Check whether conflict information retention should be stopped due to - * exceeding the maximum wait time (max_retention_duration). - * - * If retention should be stopped, transition to the - * RDT_STOP_CONFLICT_INFO_RETENTION phase and return true. Otherwise, return - * false. - * - * Note: Retention won't be resumed automatically. The user must manually - * disable retain_dead_tuples and re-enable it after confirming that the - * replication slot maintained by the launcher has been dropped. - */ -static bool -should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data) -{ - TimestampTz now; - - Assert(TransactionIdIsValid(rdt_data->candidate_xid)); - Assert(rdt_data->phase == RDT_WAIT_FOR_PUBLISHER_STATUS || - rdt_data->phase == RDT_WAIT_FOR_LOCAL_FLUSH); - - if (!MySubscription->maxretention) - return false; - - /* - * Use last_recv_time when applying changes in the loop to avoid - * unnecessary system time retrieval. If last_recv_time is not available, - * obtain the current timestamp. - */ - now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp(); - - /* - * Return early if the wait time has not exceeded the configured maximum - * (max_retention_duration). Time spent waiting for table synchronization - * is excluded from this calculation, as it occurs infrequently. - */ - if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now, - MySubscription->maxretention + - rdt_data->table_sync_wait_time)) - return false; - - rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION; - - /* process the next phase */ - process_rdt_phase_transition(rdt_data, false); - - return true; -} - /* * Adjust the interval for advancing non-removable transaction IDs. * - * We double the interval to try advancing the non-removable transaction IDs if - * there is no activity on the node. The maximum value of the interval is capped - * by wal_receiver_status_interval if it is not zero, otherwise to a 3 minutes - * which should be sufficient to avoid using CPU or network resources without - * much benefit. However, this maximum interval will not exceed - * max_retention_duration. + * If there is no activity on the node, we progressively double the interval + * used to advance non-removable transaction ID. This helps conserve CPU + * and network resources when there's little benefit to frequent updates. + * + * The interval is capped by the lowest of the following: + * - wal_receiver_status_interval (if set), + * - a default maximum of 3 minutes, + * - max_retention_duration. * - * The interval is reset to the lesser of 100ms and - * max_retention_duration once there is some activity on the node. + * This ensures the interval never exceeds the retention boundary, even if + * other limits are higher. Once activity resumes on the node, the interval + * is reset to lesser of 100ms and max_retention_duration, allowing timely + * advancement of non-removable transaction ID. * * XXX The use of wal_receiver_status_interval is a bit arbitrary so we can * consider the other interval or a separate GUC if the need arises.