diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index 53ddd25c42d..c9c4223b22e 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -131,7 +131,7 @@ build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel, * invoking table_tuple_lock. */ static bool -should_refetch_tuple(TM_Result res, TM_FailureData *tmfd) +should_refetch_tuple(TM_Result res, TM_FailureData *tmfd, LockTupleMode lockmode) { bool refetch = false; @@ -141,22 +141,28 @@ should_refetch_tuple(TM_Result res, TM_FailureData *tmfd) break; case TM_Updated: /* XXX: Improve handling here */ - if (ItemPointerIndicatesMovedPartitions(&tmfd->ctid)) - ereport(LOG, - (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), - errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying"))); - else - ereport(LOG, - (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), - errmsg("concurrent update, retrying"))); - refetch = true; + if (lockmode != LockTupleTryExclusive) + { + if (ItemPointerIndicatesMovedPartitions(&tmfd->ctid)) + ereport(LOG, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying"))); + else + ereport(LOG, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("concurrent update, retrying"))); + refetch = true; + } break; case TM_Deleted: - /* XXX: Improve handling here */ - ereport(LOG, - (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), - errmsg("concurrent delete, retrying"))); - refetch = true; + if (lockmode != LockTupleTryExclusive) + { + /* XXX: Improve handling here */ + ereport(LOG, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("concurrent delete, retrying"))); + refetch = true; + } break; case TM_Invisible: elog(ERROR, "attempted to lock invisible tuple"); @@ -236,8 +242,16 @@ retry: */ if (TransactionIdIsValid(xwait)) { - XactLockTableWait(xwait, NULL, NULL, XLTW_None); - goto retry; + if (lockmode == LockTupleTryExclusive) + { + found = false; + break; + } + else if (lockmode != LockTupleNoLock) + { + XactLockTableWait(xwait, NULL, NULL, XLTW_None); + goto retry; + } } /* Found our tuple and it's not locked */ @@ -246,7 +260,7 @@ retry: } /* Found tuple, try to lock it in the lockmode. */ - if (found) + if (found && lockmode != LockTupleNoLock) { TM_FailureData tmfd; TM_Result res; @@ -256,14 +270,14 @@ retry: res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(), outslot, GetCurrentCommandId(false), - lockmode, + lockmode == LockTupleTryExclusive ? LockTupleExclusive : lockmode, LockWaitBlock, 0 /* don't follow updates */ , &tmfd); PopActiveSnapshot(); - if (should_refetch_tuple(res, &tmfd)) + if (should_refetch_tuple(res, &tmfd, lockmode)) goto retry; } @@ -395,16 +409,23 @@ retry: */ if (TransactionIdIsValid(xwait)) { - XactLockTableWait(xwait, NULL, NULL, XLTW_None); - goto retry; + if (lockmode == LockTupleTryExclusive) + { + found = false; + break; + } + else if (lockmode != LockTupleNoLock) + { + XactLockTableWait(xwait, NULL, NULL, XLTW_None); + goto retry; + } } - /* Found our tuple and it's not locked */ break; } /* Found tuple, try to lock it in the lockmode. */ - if (found) + if (found && lockmode != LockTupleNoLock) { TM_FailureData tmfd; TM_Result res; @@ -414,14 +435,14 @@ retry: res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(), outslot, GetCurrentCommandId(false), - lockmode, + lockmode == LockTupleTryExclusive ? LockTupleExclusive : lockmode, LockWaitBlock, 0 /* don't follow updates */ , &tmfd); PopActiveSnapshot(); - if (should_refetch_tuple(res, &tmfd)) + if (should_refetch_tuple(res, &tmfd, lockmode)) goto retry; } @@ -508,7 +529,7 @@ retry: PopActiveSnapshot(); - if (should_refetch_tuple(res, &tmfd)) + if (should_refetch_tuple(res, &tmfd, LockTupleShare)) goto retry; return true; diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index d25085d3515..d2c426ecab7 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -400,7 +400,7 @@ pa_setup_dsm(ParallelApplyWorkerInfo *winfo) * Try to get a parallel apply worker from the pool. If none is available then * start a new one. */ -static ParallelApplyWorkerInfo * +ParallelApplyWorkerInfo * pa_launch_parallel_worker(void) { MemoryContext oldcontext; @@ -729,6 +729,43 @@ ProcessParallelApplyInterrupts(void) } } + +static void +pa_apply_dispatch(StringInfo s) +{ + if (MyParallelShared->do_prefetch) + { + PG_TRY(); + { + apply_dispatch(s); + } + PG_CATCH(); + { + HOLD_INTERRUPTS(); + + elog(DEBUG1, "Failed to prefetch LR operation"); + + /* TODO: should we somehow dump the error or just silently ignore it? */ + /* EmitErrorReport(); */ + FlushErrorState(); + + RESUME_INTERRUPTS(); + + lr_prefetch_errors += 1; + } + PG_END_TRY(); + if (!prefetch_replica_identity_only) + { + /* We need to abort transaction to undo insert */ + AbortCurrentTransaction(); + } + } + else + { + apply_dispatch(s); + } +} + /* Parallel apply worker main loop. */ static void LogicalParallelApplyLoop(shm_mq_handle *mqh) @@ -794,7 +831,7 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh) */ s.cursor += SIZE_STATS_MESSAGE; - apply_dispatch(&s); + pa_apply_dispatch(&s); } else if (shmq_res == SHM_MQ_WOULD_BLOCK) { @@ -943,20 +980,27 @@ ParallelApplyWorkerMain(Datum main_arg) InitializingApplyWorker = false; - /* Setup replication origin tracking. */ - StartTransactionCommand(); - ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid, + if (!MyParallelShared->do_prefetch) + { + /* Setup replication origin tracking. */ + StartTransactionCommand(); + ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid, originname, sizeof(originname)); - originid = replorigin_by_name(originname, false); - - /* - * The parallel apply worker doesn't need to monopolize this replication - * origin which was already acquired by its leader process. - */ - replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid); - replorigin_session_origin = originid; - CommitTransactionCommand(); + originid = replorigin_by_name(originname, false); + /* + * The parallel apply worker doesn't need to monopolize this replication + * origin which was already acquired by its leader process. + */ + replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid); + replorigin_session_origin = originid; + CommitTransactionCommand(); + } + else + { + /* Do not write WAL for prefetch */ + wal_level = WAL_LEVEL_MINIMAL; + } /* * Setup callback for syscache so that we know when something changes in * the subscription relation state. @@ -1149,8 +1193,11 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data) shm_mq_result result; TimestampTz startTime = 0; - Assert(!IsTransactionState()); - Assert(!winfo->serialize_changes); + if (!winfo->shared->do_prefetch) + { + Assert(!IsTransactionState()); + Assert(!winfo->serialize_changes); + } /* * We don't try to send data to parallel worker for 'immediate' mode. This diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 4aed0dfcebb..ff2eaad5462 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -50,6 +50,7 @@ int max_logical_replication_workers = 4; int max_sync_workers_per_subscription = 2; int max_parallel_apply_workers_per_subscription = 2; +int max_parallel_prefetch_workers_per_subscription = 2; LogicalRepWorker *MyLogicalRepWorker = NULL; diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index fd11805a44c..8ff0076dad3 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -311,6 +311,18 @@ static uint32 parallel_stream_nchanges = 0; /* Are we initializing an apply worker? */ bool InitializingApplyWorker = false; +#define INIT_PREFETCH_BUF_SIZE (128*1024) +static ParallelApplyWorkerInfo* prefetch_worker[MAX_LR_PREFETCH_WORKERS]; +static int prefetch_worker_rr = 0; +static int n_prefetch_workers; + +bool prefetch_replica_identity_only = true; + +size_t lr_prefetch_hits; +size_t lr_prefetch_misses; +size_t lr_prefetch_errors; +size_t lr_prefetch_inserts; + /* * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for * the subscription if the remote transaction's finish LSN matches the subskiplsn. @@ -329,6 +341,11 @@ bool InitializingApplyWorker = false; static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr; #define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn))) +/* + * If operation is performed by parallel prefetch worker + */ +#define is_prefetching() (am_parallel_apply_worker() && MyParallelShared->do_prefetch) + /* BufFile handle of the current streaming file */ static BufFile *stream_fd = NULL; @@ -556,6 +573,11 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) TransApplyAction apply_action; StringInfoData original_msg; + if (is_prefetching()) + { + return false; + } + apply_action = get_transaction_apply_action(stream_xid, &winfo); /* not in streaming mode */ @@ -2380,6 +2402,27 @@ TargetPrivilegesCheck(Relation rel, AclMode mode) RelationGetRelationName(rel)))); } +#define SAFE_APPLY(call) \ + if (is_prefetching()) \ + { \ + PG_TRY(); \ + { \ + call; \ + } \ + PG_CATCH(); \ + { \ + HOLD_INTERRUPTS(); \ + elog(DEBUG1, "Failed to prefetch LR operation");\ + FlushErrorState(); \ + RESUME_INTERRUPTS(); \ + lr_prefetch_errors += 1; \ + } \ + PG_END_TRY(); \ + } else { \ + call; \ + } + + /* * Handle INSERT message. */ @@ -2453,7 +2496,7 @@ apply_handle_insert(StringInfo s) ResultRelInfo *relinfo = edata->targetRelInfo; ExecOpenIndices(relinfo, false); - apply_handle_insert_internal(edata, relinfo, remoteslot); + SAFE_APPLY(apply_handle_insert_internal(edata, relinfo, remoteslot)); ExecCloseIndices(relinfo); } @@ -2487,13 +2530,34 @@ apply_handle_insert_internal(ApplyExecutionData *edata, !relinfo->ri_RelationDesc->rd_rel->relhasindex || RelationGetIndexList(relinfo->ri_RelationDesc) == NIL); - /* Caller will not have done this bit. */ - Assert(relinfo->ri_onConflictArbiterIndexes == NIL); - InitConflictIndexes(relinfo); + if (is_prefetching() && prefetch_replica_identity_only) + { + TupleTableSlot *localslot = NULL; + LogicalRepRelMapEntry *relmapentry = edata->targetRel; + Relation localrel = relinfo->ri_RelationDesc; + EPQState epqstate; - /* Do the insert. */ - TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT); - ExecSimpleRelationInsert(relinfo, estate, remoteslot); + EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL); + + (void)FindReplTupleInLocalRel(edata, localrel, + &relmapentry->remoterel, + relmapentry->localindexoid, + remoteslot, &localslot); + } + else + { + /* Caller will not have done this bit. */ + Assert(relinfo->ri_onConflictArbiterIndexes == NIL); + InitConflictIndexes(relinfo); + + /* Do the insert. */ + TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT); + ExecSimpleRelationInsert(relinfo, estate, remoteslot); + } + if (is_prefetching()) + { + lr_prefetch_inserts += 1; + } } /* @@ -2637,8 +2701,8 @@ apply_handle_update(StringInfo s) apply_handle_tuple_routing(edata, remoteslot, &newtup, CMD_UPDATE); else - apply_handle_update_internal(edata, edata->targetRelInfo, - remoteslot, &newtup, rel->localindexoid); + SAFE_APPLY(apply_handle_update_internal(edata, edata->targetRelInfo, + remoteslot, &newtup, rel->localindexoid)); finish_edata(edata); @@ -2682,6 +2746,16 @@ apply_handle_update_internal(ApplyExecutionData *edata, localindexoid, remoteslot, &localslot); + if (is_prefetching()) + { + if (found) + lr_prefetch_hits += 1; + else + lr_prefetch_misses += 1; + if (prefetch_replica_identity_only) + goto Cleanup; + } + /* * Tuple found. * @@ -2739,7 +2813,7 @@ apply_handle_update_internal(ApplyExecutionData *edata, remoteslot, newslot, list_make1(&conflicttuple)); } - /* Cleanup. */ + Cleanup: ExecCloseIndices(relinfo); EvalPlanQualEnd(&epqstate); } @@ -2820,8 +2894,8 @@ apply_handle_delete(StringInfo s) ResultRelInfo *relinfo = edata->targetRelInfo; ExecOpenIndices(relinfo, false); - apply_handle_delete_internal(edata, relinfo, - remoteslot, rel->localindexoid); + SAFE_APPLY(apply_handle_delete_internal(edata, relinfo, + remoteslot, rel->localindexoid)); ExecCloseIndices(relinfo); } @@ -2867,6 +2941,15 @@ apply_handle_delete_internal(ApplyExecutionData *edata, found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid, remoteslot, &localslot); + if (is_prefetching()) + { + if (found) + lr_prefetch_hits += 1; + else + lr_prefetch_misses += 1; + goto Cleanup; + } + /* If found delete it. */ if (found) { @@ -2900,7 +2983,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata, remoteslot, NULL, list_make1(&conflicttuple)); } - /* Cleanup. */ + Cleanup: EvalPlanQualEnd(&epqstate); } @@ -2921,6 +3004,8 @@ FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel, EState *estate = edata->estate; bool found; + LockTupleMode lockmode = is_prefetching() ? prefetch_replica_identity_only ? LockTupleNoLock : LockTupleTryExclusive : LockTupleExclusive; + /* * Regardless of the top-level operation, we're performing a read here, so * check for SELECT privileges. @@ -2946,11 +3031,11 @@ FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel, #endif found = RelationFindReplTupleByIndex(localrel, localidxoid, - LockTupleExclusive, + lockmode, remoteslot, *localslot); } else - found = RelationFindReplTupleSeq(localrel, LockTupleExclusive, + found = RelationFindReplTupleSeq(localrel, lockmode, remoteslot, *localslot); return found; @@ -3041,14 +3126,14 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, switch (operation) { case CMD_INSERT: - apply_handle_insert_internal(edata, partrelinfo, - remoteslot_part); + SAFE_APPLY(apply_handle_insert_internal(edata, partrelinfo, + remoteslot_part)); break; case CMD_DELETE: - apply_handle_delete_internal(edata, partrelinfo, - remoteslot_part, - part_entry->localindexoid); + SAFE_APPLY(apply_handle_delete_internal(edata, partrelinfo, + remoteslot_part, + part_entry->localindexoid)); break; case CMD_UPDATE: @@ -3076,6 +3161,9 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, { TupleTableSlot *newslot = localslot; + if (is_prefetching()) + return; + /* Store the new tuple for conflict reporting */ slot_store_data(newslot, part_entry, newtup); @@ -3101,6 +3189,9 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, { TupleTableSlot *newslot; + if (is_prefetching()) + return; + /* Store the new tuple for conflict reporting */ newslot = table_slot_create(partrel, &estate->es_tupleTable); slot_store_data(newslot, part_entry, newtup); @@ -3217,8 +3308,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, slot_getallattrs(remoteslot); } MemoryContextSwitchTo(oldctx); - apply_handle_insert_internal(edata, partrelinfo_new, - remoteslot_part); + SAFE_APPLY(apply_handle_insert_internal(edata, partrelinfo_new, + remoteslot_part)); } EvalPlanQualEnd(&epqstate); @@ -3552,7 +3643,6 @@ store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn) MemoryContextSwitchTo(ApplyMessageContext); } - /* Update statistics of the worker. */ static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply) @@ -3567,6 +3657,42 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply) } } +#define MSG_CODE_OFFSET (1 + 8*3) + +static void +lr_do_prefetch(char* buf, int len) +{ + ParallelApplyWorkerInfo* winfo; + + if (buf[0] != 'w') + return; + + switch (buf[MSG_CODE_OFFSET]) + { + case LOGICAL_REP_MSG_INSERT: + case LOGICAL_REP_MSG_UPDATE: + case LOGICAL_REP_MSG_DELETE: + /* Round robin prefetch worker */ + winfo = prefetch_worker[prefetch_worker_rr++ % n_prefetch_workers]; + pa_send_data(winfo, len, buf); + break; + + case LOGICAL_REP_MSG_TYPE: + case LOGICAL_REP_MSG_RELATION: + /* broadcast to all prefetch workers */ + for (int i = 0; i < n_prefetch_workers; i++) + { + winfo = prefetch_worker[i]; + pa_send_data(winfo, len, buf); + } + break; + + default: + /* Ignore other messages */ + break; + } +} + /* * Apply main loop. */ @@ -3577,6 +3703,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received) bool ping_sent = false; TimeLineID tli; ErrorContextCallback errcallback; + char* prefetch_buf = NULL; + size_t prefetch_buf_pos = 0; + size_t prefetch_buf_used = 0; + size_t prefetch_buf_size = INIT_PREFETCH_BUF_SIZE; /* * Init the ApplyMessageContext which we clean up after each replication @@ -3594,6 +3724,25 @@ LogicalRepApplyLoop(XLogRecPtr last_received) "LogicalStreamingContext", ALLOCSET_DEFAULT_SIZES); + if (max_parallel_prefetch_workers_per_subscription != 0) + { + int i; + for (i = 0; i < max_parallel_prefetch_workers_per_subscription; i++) + { + prefetch_worker[i] = pa_launch_parallel_worker(); + if (!prefetch_worker[i]) + { + elog(LOG, "Launch only %d prefetch worklers from %d", + i, max_parallel_prefetch_workers_per_subscription); + break; + } + prefetch_worker[i]->in_use = true; + prefetch_worker[i]->shared->do_prefetch = true; + } + n_prefetch_workers = i; + prefetch_buf = palloc(prefetch_buf_size); + } + /* mark as idle, before starting to loop */ pgstat_report_activity(STATE_IDLE, NULL); @@ -3611,9 +3760,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received) { pgsocket fd = PGINVALID_SOCKET; int rc; - int len; + int32 len; char *buf = NULL; bool endofstream = false; + bool no_more_data = false; long wait_time; CHECK_FOR_INTERRUPTS(); @@ -3622,87 +3772,127 @@ LogicalRepApplyLoop(XLogRecPtr last_received) len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd); - if (len != 0) + /* Loop to process all available data (without blocking). */ + for (;;) { - /* Loop to process all available data (without blocking). */ - for (;;) - { - CHECK_FOR_INTERRUPTS(); + CHECK_FOR_INTERRUPTS(); - if (len == 0) + if (len > 0 && n_prefetch_workers != 0 && prefetch_buf_pos == prefetch_buf_used) + { + prefetch_buf_used = 0; + do { - break; - } - else if (len < 0) + if (prefetch_buf_used + len + 4 > prefetch_buf_size) + { + prefetch_buf_size *= 2; + elog(DEBUG1, "Increase prefetch buffer size to %ld", prefetch_buf_size); + prefetch_buf = repalloc(prefetch_buf, prefetch_buf_size); + } + memcpy(&prefetch_buf[prefetch_buf_used], &len, 4); + memcpy(&prefetch_buf[prefetch_buf_used+4], buf, len); + prefetch_buf_used += 4 + len; + if (prefetch_buf_used >= INIT_PREFETCH_BUF_SIZE) + break; + len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd); + } while (len > 0); + + no_more_data = len <= 0; + + for (prefetch_buf_pos = 0; prefetch_buf_pos < prefetch_buf_used; prefetch_buf_pos += 4 + len) { - ereport(LOG, - (errmsg("data stream from publisher has ended"))); - endofstream = true; - break; + memcpy(&len, &prefetch_buf[prefetch_buf_pos], 4); + lr_do_prefetch(&prefetch_buf[prefetch_buf_pos+4], len); } - else - { - int c; - StringInfoData s; + memcpy(&len, prefetch_buf, 4); + buf = &prefetch_buf[4]; + prefetch_buf_pos = len + 4; + } - if (ConfigReloadPending) - { - ConfigReloadPending = false; - ProcessConfigFile(PGC_SIGHUP); - } + if (len == 0) + { + break; + } + else if (len < 0) + { + ereport(LOG, + (errmsg("data stream from publisher has ended"))); + endofstream = true; + break; + } + else + { + int c; + StringInfoData s; - /* Reset timeout. */ - last_recv_timestamp = GetCurrentTimestamp(); - ping_sent = false; + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } - /* Ensure we are reading the data into our memory context. */ - MemoryContextSwitchTo(ApplyMessageContext); + /* Reset timeout. */ + last_recv_timestamp = GetCurrentTimestamp(); + ping_sent = false; - initReadOnlyStringInfo(&s, buf, len); + /* Ensure we are reading the data into our memory context. */ + MemoryContextSwitchTo(ApplyMessageContext); - c = pq_getmsgbyte(&s); + initReadOnlyStringInfo(&s, buf, len); - if (c == 'w') - { - XLogRecPtr start_lsn; - XLogRecPtr end_lsn; - TimestampTz send_time; + c = pq_getmsgbyte(&s); - start_lsn = pq_getmsgint64(&s); - end_lsn = pq_getmsgint64(&s); - send_time = pq_getmsgint64(&s); + if (c == 'w') + { + XLogRecPtr start_lsn; + XLogRecPtr end_lsn; + TimestampTz send_time; - if (last_received < start_lsn) - last_received = start_lsn; + start_lsn = pq_getmsgint64(&s); + end_lsn = pq_getmsgint64(&s); + send_time = pq_getmsgint64(&s); - if (last_received < end_lsn) - last_received = end_lsn; + if (last_received < start_lsn) + last_received = start_lsn; - UpdateWorkerStats(last_received, send_time, false); + if (last_received < end_lsn) + last_received = end_lsn; - apply_dispatch(&s); - } - else if (c == 'k') - { - XLogRecPtr end_lsn; - TimestampTz timestamp; - bool reply_requested; + UpdateWorkerStats(last_received, send_time, false); - end_lsn = pq_getmsgint64(&s); - timestamp = pq_getmsgint64(&s); - reply_requested = pq_getmsgbyte(&s); + apply_dispatch(&s); + } + else if (c == 'k') + { + XLogRecPtr end_lsn; + TimestampTz timestamp; + bool reply_requested; - if (last_received < end_lsn) - last_received = end_lsn; + end_lsn = pq_getmsgint64(&s); + timestamp = pq_getmsgint64(&s); + reply_requested = pq_getmsgbyte(&s); - send_feedback(last_received, reply_requested, false); - UpdateWorkerStats(last_received, timestamp, true); - } - /* other message types are purposefully ignored */ + if (last_received < end_lsn) + last_received = end_lsn; - MemoryContextReset(ApplyMessageContext); + send_feedback(last_received, reply_requested, false); + UpdateWorkerStats(last_received, timestamp, true); } + /* other message types are purposefully ignored */ + MemoryContextReset(ApplyMessageContext); + } + if (prefetch_buf_pos < prefetch_buf_used) + { + memcpy(&len, &prefetch_buf[prefetch_buf_pos], 4); + buf = &prefetch_buf[prefetch_buf_pos + 4]; + prefetch_buf_pos += 4 + len; + } + else if (prefetch_buf_used != 0 && no_more_data) + { + break; + } + else + { len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd); } } diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 511dc32d519..3b254898663 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -76,6 +76,7 @@ #include "replication/slot.h" #include "replication/slotsync.h" #include "replication/syncrep.h" +#include "replication/worker_internal.h" #include "storage/aio.h" #include "storage/bufmgr.h" #include "storage/bufpage.h" @@ -2143,6 +2144,18 @@ struct config_bool ConfigureNamesBool[] = NULL, NULL, NULL }, + { + {"prefetch_replica_identity_only", + PGC_SIGHUP, + REPLICATION_SUBSCRIBERS, + gettext_noop("Whether LR prefetch work should prefetch only replica identity index or all other indexes too."), + NULL, + }, + &prefetch_replica_identity_only, + true, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL @@ -3376,6 +3389,18 @@ struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"max_parallel_prefetch_workers_per_subscription", + PGC_SIGHUP, + REPLICATION_SUBSCRIBERS, + gettext_noop("Maximum number of parallel prefetch workers per subscription."), + NULL, + }, + &max_parallel_prefetch_workers_per_subscription, + 2, 0, MAX_LR_PREFETCH_WORKERS, + NULL, NULL, NULL + }, + { {"max_active_replication_origins", PGC_POSTMASTER, diff --git a/src/include/nodes/lockoptions.h b/src/include/nodes/lockoptions.h index 0b534e30603..88f5d2e4cc5 100644 --- a/src/include/nodes/lockoptions.h +++ b/src/include/nodes/lockoptions.h @@ -56,6 +56,10 @@ typedef enum LockTupleMode LockTupleNoKeyExclusive, /* SELECT FOR UPDATE, UPDATEs that modify key columns, and DELETE */ LockTupleExclusive, + /* Do not lock tuple */ + LockTupleNoLock, + /* Try explusive lock, silent give up in case of conflict */ + LockTupleTryExclusive, } LockTupleMode; #endif /* LOCKOPTIONS_H */ diff --git a/src/include/port/pg_iovec.h b/src/include/port/pg_iovec.h index 90be3af449d..8fefeb8c245 100644 --- a/src/include/port/pg_iovec.h +++ b/src/include/port/pg_iovec.h @@ -53,6 +53,7 @@ struct iovec static inline ssize_t pg_preadv(int fd, const struct iovec *iov, int iovcnt, off_t offset) { + pg_usleep(100); #if HAVE_DECL_PREADV /* * Avoid a small amount of argument copying overhead in the kernel if diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h index 82b202f3305..19d1a8d466b 100644 --- a/src/include/replication/logicallauncher.h +++ b/src/include/replication/logicallauncher.h @@ -15,6 +15,8 @@ extern PGDLLIMPORT int max_logical_replication_workers; extern PGDLLIMPORT int max_sync_workers_per_subscription; extern PGDLLIMPORT int max_parallel_apply_workers_per_subscription; +extern PGDLLIMPORT int max_parallel_apply_workers_per_subscription; +extern PGDLLIMPORT int max_parallel_prefetch_workers_per_subscription; extern void ApplyLauncherRegister(void); extern void ApplyLauncherMain(Datum main_arg); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 30b2775952c..c6745e77efc 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -180,6 +180,11 @@ typedef struct ParallelApplyWorkerShared */ PartialFileSetState fileset_state; FileSet fileset; + + /* + * Prefetch worker + */ + bool do_prefetch; } ParallelApplyWorkerShared; /* @@ -237,6 +242,14 @@ extern PGDLLIMPORT bool in_remote_transaction; extern PGDLLIMPORT bool InitializingApplyWorker; +#define MAX_LR_PREFETCH_WORKERS 128 +extern PGDLLIMPORT size_t lr_prefetch_hits; +extern PGDLLIMPORT size_t lr_prefetch_misses; +extern PGDLLIMPORT size_t lr_prefetch_errors; +extern PGDLLIMPORT size_t lr_prefetch_inserts; + +extern bool prefetch_replica_identity_only; + extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, bool only_running); @@ -326,10 +339,13 @@ extern void pa_decr_and_wait_stream_block(void); extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn); -#define isParallelApplyWorker(worker) ((worker)->in_use && \ +extern void pa_prefetch_handle_modification(StringInfo s, LogicalRepMsgType action); + +#define isParallelApplyWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_PARALLEL_APPLY) #define isTablesyncWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_TABLESYNC) +extern ParallelApplyWorkerInfo* pa_launch_parallel_worker(void); static inline bool am_tablesync_worker(void)