diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index 53ddd25c42..3c50f17227 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -275,6 +275,47 @@ retry: return found; } +/* + * Search the relation 'rel' for tuple using the index. + * Returns true if tuple is found. + */ +bool +RelationPrefetchIndex(Relation rel, Oid idxoid, TupleTableSlot *searchslot, + TupleTableSlot *outslot) +{ + ScanKeyData skey[INDEX_MAX_KEYS]; + int skey_attoff; + IndexScanDesc scan; + SnapshotData snap; + Relation idxrel; + bool found; + + /* Do not do prefetch when there is no index */ + if (!OidIsValid(idxoid)) + return false; + + /* Open the index. */ + idxrel = index_open(idxoid, AccessShareLock); + + InitDirtySnapshot(snap); + + /* Build scan key. */ + skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot); + + /* Start an index scan. */ + scan = index_beginscan(rel, idxrel, &snap, NULL, skey_attoff, 0); + index_rescan(scan, skey, skey_attoff, NULL, 0); + + /* Try to find the tuple */ + found = index_getnext_slot(scan, ForwardScanDirection, outslot); + + /* Cleanup */ + index_endscan(scan); + index_close(idxrel, AccessShareLock); + + return found; +} + /* * Compare the tuples in the slots by checking if they have equal values. */ diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index d25085d351..a207a4acdc 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -396,6 +396,57 @@ pa_setup_dsm(ParallelApplyWorkerInfo *winfo) return true; } +/* + * Try to get a parallel prefetch worker. + */ +ParallelApplyWorkerInfo * +pa_launch_prefetch_worker(void) +{ + MemoryContext oldcontext; + bool launched; + ParallelApplyWorkerInfo *winfo; + + /* + * Start a new parallel prefetch worker. + * + * The worker info can be used for the lifetime of the worker process, so + * create it in a permanent context. + */ + oldcontext = MemoryContextSwitchTo(ApplyContext); + + winfo = (ParallelApplyWorkerInfo *) palloc0(sizeof(ParallelApplyWorkerInfo)); + + /* Setup shared memory. */ + if (!pa_setup_dsm(winfo)) + { + MemoryContextSwitchTo(oldcontext); + pfree(winfo); + return NULL; + } + + launched = logicalrep_worker_launch(WORKERTYPE_PARALLEL_PREFETCH, + MyLogicalRepWorker->dbid, + MySubscription->oid, + MySubscription->name, + MyLogicalRepWorker->userid, + InvalidOid, + dsm_segment_handle(winfo->dsm_seg)); + + if (launched) + { + winfo->do_prefetch = true; + } + else + { + pa_free_worker_info(winfo); + winfo = NULL; + } + + MemoryContextSwitchTo(oldcontext); + + return winfo; +} + /* * Try to get a parallel apply worker from the pool. If none is available then * start a new one. @@ -943,20 +994,22 @@ ParallelApplyWorkerMain(Datum main_arg) InitializingApplyWorker = false; - /* Setup replication origin tracking. */ - StartTransactionCommand(); - ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid, + if (am_parallel_apply_worker()) + { + /* Setup replication origin tracking. */ + StartTransactionCommand(); + ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid, originname, sizeof(originname)); - originid = replorigin_by_name(originname, false); - - /* - * The parallel apply worker doesn't need to monopolize this replication - * origin which was already acquired by its leader process. - */ - replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid); - replorigin_session_origin = originid; - CommitTransactionCommand(); + originid = replorigin_by_name(originname, false); + /* + * The parallel apply worker doesn't need to monopolize this replication + * origin which was already acquired by its leader process. + */ + replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid); + replorigin_session_origin = originid; + CommitTransactionCommand(); + } /* * Setup callback for syscache so that we know when something changes in * the subscription relation state. @@ -1149,8 +1202,11 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data) shm_mq_result result; TimestampTz startTime = 0; - Assert(!IsTransactionState()); - Assert(!winfo->serialize_changes); + if (!winfo->do_prefetch) + { + Assert(!IsTransactionState()); + Assert(!winfo->serialize_changes); + } /* * We don't try to send data to parallel worker for 'immediate' mode. This @@ -1519,6 +1575,9 @@ pa_get_fileset_state(void) { PartialFileSetState fileset_state; + if (am_parallel_prefetch_worker()) + return FS_EMPTY; + Assert(am_parallel_apply_worker()); SpinLockAcquire(&MyParallelShared->mutex); diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 4aed0dfceb..ed6c057cec 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -50,6 +50,7 @@ int max_logical_replication_workers = 4; int max_sync_workers_per_subscription = 2; int max_parallel_apply_workers_per_subscription = 2; +int max_parallel_prefetch_workers_per_subscription = 2; LogicalRepWorker *MyLogicalRepWorker = NULL; @@ -257,7 +258,7 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running) LogicalRepWorker *w = &LogicalRepCtx->workers[i]; /* Skip parallel apply workers. */ - if (isParallelApplyWorker(w)) + if (isParallelApplyWorker(w) || isParallelPrefetchWorker(w)) continue; if (w->in_use && w->subid == subid && w->relid == relid && @@ -322,6 +323,7 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype, TimestampTz now; bool is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC); bool is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY); + bool is_parallel_prefetch_worker = (wtype == WORKERTYPE_PARALLEL_PREFETCH); /*---------- * Sanity checks: @@ -331,7 +333,7 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype, */ Assert(wtype != WORKERTYPE_UNKNOWN); Assert(is_tablesync_worker == OidIsValid(relid)); - Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID)); + Assert((is_parallel_apply_worker|is_parallel_prefetch_worker) == (subworker_dsm != DSM_HANDLE_INVALID)); ereport(DEBUG1, (errmsg_internal("starting logical replication worker for subscription \"%s\"", @@ -452,8 +454,8 @@ retry: worker->relstate = SUBREL_STATE_UNKNOWN; worker->relstate_lsn = InvalidXLogRecPtr; worker->stream_fileset = NULL; - worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid; - worker->parallel_apply = is_parallel_apply_worker; + worker->leader_pid = (is_parallel_apply_worker|is_parallel_prefetch_worker) ? MyProcPid : InvalidPid; + worker->parallel_apply = is_parallel_apply_worker|is_parallel_prefetch_worker; worker->last_lsn = InvalidXLogRecPtr; TIMESTAMP_NOBEGIN(worker->last_send_time); TIMESTAMP_NOBEGIN(worker->last_recv_time); @@ -492,6 +494,16 @@ retry: memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle)); break; + case WORKERTYPE_PARALLEL_PREFETCH: + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain"); + snprintf(bgw.bgw_name, BGW_MAXLEN, + "logical replication parallel prefetch worker for subscription %u", + subid); + snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker"); + + memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle)); + break; + case WORKERTYPE_TABLESYNC: snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain"); snprintf(bgw.bgw_name, BGW_MAXLEN, @@ -626,7 +638,7 @@ logicalrep_worker_stop(Oid subid, Oid relid) if (worker) { - Assert(!isParallelApplyWorker(worker)); + Assert(!isParallelApplyWorker(worker) && !isParallelPrefetchWorker(worker)); logicalrep_worker_stop_internal(worker, SIGTERM); } @@ -774,7 +786,7 @@ logicalrep_worker_detach(void) { LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc); - if (isParallelApplyWorker(w)) + if (isParallelApplyWorker(w) || isParallelPrefetchWorker(w)) logicalrep_worker_stop_internal(w, SIGTERM); } @@ -1369,6 +1381,9 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) case WORKERTYPE_PARALLEL_APPLY: values[9] = CStringGetTextDatum("parallel apply"); break; + case WORKERTYPE_PARALLEL_PREFETCH: + values[9] = CStringGetTextDatum("parallel prefetch"); + break; case WORKERTYPE_TABLESYNC: values[9] = CStringGetTextDatum("table synchronization"); break; diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index c90f23ee5b..f965317529 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -681,6 +681,9 @@ process_syncing_tables(XLogRecPtr current_lsn) */ break; + case WORKERTYPE_PARALLEL_PREFETCH: + break; + case WORKERTYPE_TABLESYNC: process_syncing_tables_for_sync(current_lsn); break; diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index fd11805a44..db1f8bcebd 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -311,6 +311,18 @@ static uint32 parallel_stream_nchanges = 0; /* Are we initializing an apply worker? */ bool InitializingApplyWorker = false; +#define INIT_PREFETCH_BUF_SIZE (128*1024) +static ParallelApplyWorkerInfo* prefetch_worker[MAX_LR_PREFETCH_WORKERS]; +static int prefetch_worker_rr = 0; +static int n_prefetch_workers; + +bool prefetch_replica_identity_only = false; + +size_t lr_prefetch_hits; +size_t lr_prefetch_misses; +size_t lr_prefetch_errors; +size_t lr_prefetch_inserts; + /* * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for * the subscription if the remote transaction's finish LSN matches the subskiplsn. @@ -482,6 +494,9 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel) (rel->state == SUBREL_STATE_SYNCDONE && rel->statelsn <= remote_final_lsn)); + case WORKERTYPE_PARALLEL_PREFETCH: + return true; + case WORKERTYPE_UNKNOWN: /* Should never happen. */ elog(ERROR, "Unknown worker type"); @@ -556,6 +571,11 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) TransApplyAction apply_action; StringInfoData original_msg; + if (am_parallel_prefetch_worker()) + { + return false; + } + apply_action = get_transaction_apply_action(stream_xid, &winfo); /* not in streaming mode */ @@ -2487,13 +2507,36 @@ apply_handle_insert_internal(ApplyExecutionData *edata, !relinfo->ri_RelationDesc->rd_rel->relhasindex || RelationGetIndexList(relinfo->ri_RelationDesc) == NIL); - /* Caller will not have done this bit. */ - Assert(relinfo->ri_onConflictArbiterIndexes == NIL); - InitConflictIndexes(relinfo); + if (am_parallel_prefetch_worker()) + { + Relation localrel = relinfo->ri_RelationDesc; + TupleTableSlot *localslot = table_slot_create(localrel, &estate->es_tupleTable); + LogicalRepRelMapEntry *relmapentry = edata->targetRel; + + if (prefetch_replica_identity_only) + { + (void)RelationPrefetchIndex(localrel, relmapentry->localindexoid, remoteslot, localslot); + } + else + { + for (int i = 0; i < relinfo->ri_NumIndices; i++) + { + Oid sec_index_oid = RelationGetRelid(relinfo->ri_IndexRelationDescs[i]); + (void)RelationPrefetchIndex(localrel, sec_index_oid, remoteslot, localslot); + } + } + lr_prefetch_inserts += 1; + } + else + { + /* Caller will not have done this bit. */ + Assert(relinfo->ri_onConflictArbiterIndexes == NIL); + InitConflictIndexes(relinfo); - /* Do the insert. */ - TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT); - ExecSimpleRelationInsert(relinfo, estate, remoteslot); + /* Do the insert. */ + TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT); + ExecSimpleRelationInsert(relinfo, estate, remoteslot); + } } /* @@ -2677,6 +2720,32 @@ apply_handle_update_internal(ApplyExecutionData *edata, EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL); ExecOpenIndices(relinfo, false); + if (am_parallel_prefetch_worker()) + { + /* + * While it may be reasonable to prefetch indexes for both old and new tuples, + * we do it only for one of them (old if it exists, new otherwise), assuming + * that probability that index key is changed is quite small + */ + localslot = table_slot_create(localrel, &estate->es_tupleTable); + found = RelationPrefetchIndex(localrel, localindexoid, remoteslot, localslot); + if (found) + lr_prefetch_hits += 1; + else + lr_prefetch_misses += 1; + if (!prefetch_replica_identity_only) + { + for (int i = 0; i < relinfo->ri_NumIndices; i++) + { + Oid sec_index_oid = RelationGetRelid(relinfo->ri_IndexRelationDescs[i]); + if (sec_index_oid != localindexoid) + { + (void)RelationPrefetchIndex(localrel, sec_index_oid, remoteslot, localslot); + } + } + } + goto Cleanup; + } found = FindReplTupleInLocalRel(edata, localrel, &relmapentry->remoterel, localindexoid, @@ -2739,7 +2808,7 @@ apply_handle_update_internal(ApplyExecutionData *edata, remoteslot, newslot, list_make1(&conflicttuple)); } - /* Cleanup. */ + Cleanup: ExecCloseIndices(relinfo); EvalPlanQualEnd(&epqstate); } @@ -2864,6 +2933,17 @@ apply_handle_delete_internal(ApplyExecutionData *edata, !localrel->rd_rel->relhasindex || RelationGetIndexList(localrel) == NIL); + if (am_parallel_prefetch_worker()) + { + localslot = table_slot_create(localrel, &estate->es_tupleTable); + found = RelationPrefetchIndex(localrel, localindexoid, remoteslot, localslot); + if (found) + lr_prefetch_hits += 1; + else + lr_prefetch_misses += 1; + /* No need to prefdetch other indexes because the are not touched during delete */ + goto Cleanup; + } found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid, remoteslot, &localslot); @@ -2900,7 +2980,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata, remoteslot, NULL, list_make1(&conflicttuple)); } - /* Cleanup. */ + Cleanup: EvalPlanQualEnd(&epqstate); } @@ -3567,6 +3647,42 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply) } } +#define MSG_CODE_OFFSET (1 + 8*3) + +static void +lr_do_prefetch(char* buf, int len) +{ + ParallelApplyWorkerInfo* winfo; + + if (buf[0] != 'w') + return; + + switch (buf[MSG_CODE_OFFSET]) + { + case LOGICAL_REP_MSG_INSERT: + case LOGICAL_REP_MSG_UPDATE: + case LOGICAL_REP_MSG_DELETE: + /* Round robin prefetch worker */ + winfo = prefetch_worker[prefetch_worker_rr++ % n_prefetch_workers]; + pa_send_data(winfo, len, buf); + break; + + case LOGICAL_REP_MSG_TYPE: + case LOGICAL_REP_MSG_RELATION: + /* broadcast to all prefetch workers */ + for (int i = 0; i < n_prefetch_workers; i++) + { + winfo = prefetch_worker[i]; + pa_send_data(winfo, len, buf); + } + break; + + default: + /* Ignore other messages */ + break; + } +} + /* * Apply main loop. */ @@ -3577,6 +3693,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received) bool ping_sent = false; TimeLineID tli; ErrorContextCallback errcallback; + char* prefetch_buf = NULL; + size_t prefetch_buf_pos = 0; + size_t prefetch_buf_used = 0; + size_t prefetch_buf_size = INIT_PREFETCH_BUF_SIZE; /* * Init the ApplyMessageContext which we clean up after each replication @@ -3594,6 +3714,23 @@ LogicalRepApplyLoop(XLogRecPtr last_received) "LogicalStreamingContext", ALLOCSET_DEFAULT_SIZES); + if (max_parallel_prefetch_workers_per_subscription != 0) + { + int i; + for (i = 0; i < max_parallel_prefetch_workers_per_subscription; i++) + { + prefetch_worker[i] = pa_launch_prefetch_worker(); + if (!prefetch_worker[i]) + { + elog(LOG, "Launch only %d prefetch workers from %d", + i, max_parallel_prefetch_workers_per_subscription); + break; + } + } + n_prefetch_workers = i; + prefetch_buf = palloc(prefetch_buf_size); + } + /* mark as idle, before starting to loop */ pgstat_report_activity(STATE_IDLE, NULL); @@ -3611,9 +3748,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received) { pgsocket fd = PGINVALID_SOCKET; int rc; - int len; - char *buf = NULL; + int32 len; + char *buf = NULL; bool endofstream = false; + bool no_more_data = false; long wait_time; CHECK_FOR_INTERRUPTS(); @@ -3622,87 +3760,127 @@ LogicalRepApplyLoop(XLogRecPtr last_received) len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd); - if (len != 0) + /* Loop to process all available data (without blocking). */ + for (;;) { - /* Loop to process all available data (without blocking). */ - for (;;) - { - CHECK_FOR_INTERRUPTS(); + CHECK_FOR_INTERRUPTS(); - if (len == 0) + if (len > 0 && n_prefetch_workers != 0 && prefetch_buf_pos == prefetch_buf_used) + { + prefetch_buf_used = 0; + do { - break; - } - else if (len < 0) + if (prefetch_buf_used + len + 4 > prefetch_buf_size) + { + prefetch_buf_size *= 2; + elog(DEBUG1, "Increase prefetch buffer size to %ld", prefetch_buf_size); + prefetch_buf = repalloc(prefetch_buf, prefetch_buf_size); + } + memcpy(&prefetch_buf[prefetch_buf_used], &len, 4); + memcpy(&prefetch_buf[prefetch_buf_used+4], buf, len); + prefetch_buf_used += 4 + len; + if (prefetch_buf_used >= INIT_PREFETCH_BUF_SIZE) + break; + len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd); + } while (len > 0); + + no_more_data = len <= 0; + + for (prefetch_buf_pos = 0; prefetch_buf_pos < prefetch_buf_used; prefetch_buf_pos += 4 + len) { - ereport(LOG, - (errmsg("data stream from publisher has ended"))); - endofstream = true; - break; + memcpy(&len, &prefetch_buf[prefetch_buf_pos], 4); + lr_do_prefetch(&prefetch_buf[prefetch_buf_pos+4], len); } - else - { - int c; - StringInfoData s; + memcpy(&len, prefetch_buf, 4); + buf = &prefetch_buf[4]; + prefetch_buf_pos = len + 4; + } - if (ConfigReloadPending) - { - ConfigReloadPending = false; - ProcessConfigFile(PGC_SIGHUP); - } + if (len == 0) + { + break; + } + else if (len < 0) + { + ereport(LOG, + (errmsg("data stream from publisher has ended"))); + endofstream = true; + break; + } + else + { + int c; + StringInfoData s; - /* Reset timeout. */ - last_recv_timestamp = GetCurrentTimestamp(); - ping_sent = false; + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } - /* Ensure we are reading the data into our memory context. */ - MemoryContextSwitchTo(ApplyMessageContext); + /* Reset timeout. */ + last_recv_timestamp = GetCurrentTimestamp(); + ping_sent = false; - initReadOnlyStringInfo(&s, buf, len); + /* Ensure we are reading the data into our memory context. */ + MemoryContextSwitchTo(ApplyMessageContext); - c = pq_getmsgbyte(&s); + initReadOnlyStringInfo(&s, buf, len); - if (c == 'w') - { - XLogRecPtr start_lsn; - XLogRecPtr end_lsn; - TimestampTz send_time; + c = pq_getmsgbyte(&s); - start_lsn = pq_getmsgint64(&s); - end_lsn = pq_getmsgint64(&s); - send_time = pq_getmsgint64(&s); + if (c == 'w') + { + XLogRecPtr start_lsn; + XLogRecPtr end_lsn; + TimestampTz send_time; - if (last_received < start_lsn) - last_received = start_lsn; + start_lsn = pq_getmsgint64(&s); + end_lsn = pq_getmsgint64(&s); + send_time = pq_getmsgint64(&s); - if (last_received < end_lsn) - last_received = end_lsn; + if (last_received < start_lsn) + last_received = start_lsn; - UpdateWorkerStats(last_received, send_time, false); + if (last_received < end_lsn) + last_received = end_lsn; - apply_dispatch(&s); - } - else if (c == 'k') - { - XLogRecPtr end_lsn; - TimestampTz timestamp; - bool reply_requested; + UpdateWorkerStats(last_received, send_time, false); - end_lsn = pq_getmsgint64(&s); - timestamp = pq_getmsgint64(&s); - reply_requested = pq_getmsgbyte(&s); + apply_dispatch(&s); + } + else if (c == 'k') + { + XLogRecPtr end_lsn; + TimestampTz timestamp; + bool reply_requested; - if (last_received < end_lsn) - last_received = end_lsn; + end_lsn = pq_getmsgint64(&s); + timestamp = pq_getmsgint64(&s); + reply_requested = pq_getmsgbyte(&s); - send_feedback(last_received, reply_requested, false); - UpdateWorkerStats(last_received, timestamp, true); - } - /* other message types are purposefully ignored */ + if (last_received < end_lsn) + last_received = end_lsn; - MemoryContextReset(ApplyMessageContext); + send_feedback(last_received, reply_requested, false); + UpdateWorkerStats(last_received, timestamp, true); } + /* other message types are purposefully ignored */ + MemoryContextReset(ApplyMessageContext); + } + if (prefetch_buf_pos < prefetch_buf_used) + { + memcpy(&len, &prefetch_buf[prefetch_buf_pos], 4); + buf = &prefetch_buf[prefetch_buf_pos + 4]; + prefetch_buf_pos += 4 + len; + } + else if (prefetch_buf_used != 0 && no_more_data) + { + break; + } + else + { len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd); } } @@ -3926,6 +4104,10 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) static void apply_worker_exit(void) { + /* Don't restart prefetch workers */ + if (am_parallel_prefetch_worker()) + return; + if (am_parallel_apply_worker()) { /* @@ -4729,6 +4911,10 @@ InitializeLogRepWorker(void) (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started", MySubscription->name, get_rel_name(MyLogicalRepWorker->relid)))); + else if (am_parallel_prefetch_worker()) + ereport(LOG, + (errmsg("logical replication prefetch worker for subscription \"%s\" has started", + MySubscription->name))); else ereport(LOG, (errmsg("logical replication apply worker for subscription \"%s\" has started", diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 511dc32d51..b3812d7e0e 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -76,6 +76,7 @@ #include "replication/slot.h" #include "replication/slotsync.h" #include "replication/syncrep.h" +#include "replication/worker_internal.h" #include "storage/aio.h" #include "storage/bufmgr.h" #include "storage/bufpage.h" @@ -2143,6 +2144,18 @@ struct config_bool ConfigureNamesBool[] = NULL, NULL, NULL }, + { + {"prefetch_replica_identity_only", + PGC_SIGHUP, + REPLICATION_SUBSCRIBERS, + gettext_noop("Whether LR prefetch work should prefetch only replica identity index or all other indexes too."), + NULL, + }, + &prefetch_replica_identity_only, + false, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL @@ -3376,6 +3389,18 @@ struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"max_parallel_prefetch_workers_per_subscription", + PGC_SIGHUP, + REPLICATION_SUBSCRIBERS, + gettext_noop("Maximum number of parallel prefetch workers per subscription."), + NULL, + }, + &max_parallel_prefetch_workers_per_subscription, + 2, 0, MAX_LR_PREFETCH_WORKERS, + NULL, NULL, NULL + }, + { {"max_active_replication_origins", PGC_POSTMASTER, diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index 104b059544..3403128977 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -759,6 +759,8 @@ extern bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, TupleTableSlot *outslot); extern bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot); +extern bool RelationPrefetchIndex(Relation rel, Oid idxoid, TupleTableSlot *searchslot, + TupleTableSlot *outslot); extern void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot); diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h index 82b202f330..19d1a8d466 100644 --- a/src/include/replication/logicallauncher.h +++ b/src/include/replication/logicallauncher.h @@ -15,6 +15,8 @@ extern PGDLLIMPORT int max_logical_replication_workers; extern PGDLLIMPORT int max_sync_workers_per_subscription; extern PGDLLIMPORT int max_parallel_apply_workers_per_subscription; +extern PGDLLIMPORT int max_parallel_apply_workers_per_subscription; +extern PGDLLIMPORT int max_parallel_prefetch_workers_per_subscription; extern void ApplyLauncherRegister(void); extern void ApplyLauncherMain(Datum main_arg); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 30b2775952..7f5b4fa51b 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -32,6 +32,7 @@ typedef enum LogicalRepWorkerType WORKERTYPE_TABLESYNC, WORKERTYPE_APPLY, WORKERTYPE_PARALLEL_APPLY, + WORKERTYPE_PARALLEL_PREFETCH, } LogicalRepWorkerType; typedef struct LogicalRepWorker @@ -214,6 +215,12 @@ typedef struct ParallelApplyWorkerInfo */ bool in_use; + + /* + * Performing prefetch of pages accessed by LR operations + */ + bool do_prefetch; + ParallelApplyWorkerShared *shared; } ParallelApplyWorkerInfo; @@ -237,6 +244,14 @@ extern PGDLLIMPORT bool in_remote_transaction; extern PGDLLIMPORT bool InitializingApplyWorker; +#define MAX_LR_PREFETCH_WORKERS 128 +extern PGDLLIMPORT size_t lr_prefetch_hits; +extern PGDLLIMPORT size_t lr_prefetch_misses; +extern PGDLLIMPORT size_t lr_prefetch_errors; +extern PGDLLIMPORT size_t lr_prefetch_inserts; + +extern bool prefetch_replica_identity_only; + extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, bool only_running); @@ -326,10 +341,13 @@ extern void pa_decr_and_wait_stream_block(void); extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn); -#define isParallelApplyWorker(worker) ((worker)->in_use && \ +#define isParallelApplyWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_PARALLEL_APPLY) +#define isParallelPrefetchWorker(worker) ((worker)->in_use && \ + (worker)->type == WORKERTYPE_PARALLEL_PREFETCH) #define isTablesyncWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_TABLESYNC) +extern ParallelApplyWorkerInfo* pa_launch_prefetch_worker(void); static inline bool am_tablesync_worker(void) @@ -351,4 +369,11 @@ am_parallel_apply_worker(void) return isParallelApplyWorker(MyLogicalRepWorker); } +static inline bool +am_parallel_prefetch_worker(void) +{ + Assert(MyLogicalRepWorker->in_use); + return isParallelPrefetchWorker(MyLogicalRepWorker); +} + #endif /* WORKER_INTERNAL_H */