diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 86a7b4c5c8..0d0c556b7c 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -90,8 +90,8 @@ XLogReaderSetDecodeBuffer(XLogReaderState *state, void *buffer, size_t size) state->decode_buffer = buffer; state->decode_buffer_size = size; - state->decode_buffer_head = buffer; state->decode_buffer_tail = buffer; + state->decode_buffer_head = buffer; } /* @@ -271,7 +271,7 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr) /* * See if we can release the last record that was returned by - * XLogNextRecord(), to free up space. + * XLogNextRecord(), if any, to free up space. */ void XLogReleasePreviousRecord(XLogReaderState *state) @@ -283,16 +283,16 @@ XLogReleasePreviousRecord(XLogReaderState *state) /* * Remove it from the decoded record queue. It must be the oldest item - * decoded, decode_queue_tail. + * decoded, decode_queue_head. */ record = state->record; - Assert(record == state->decode_queue_tail); + Assert(record == state->decode_queue_head); state->record = NULL; - state->decode_queue_tail = record->next; + state->decode_queue_head = record->next; - /* It might also be the newest item decoded, decode_queue_head. */ - if (state->decode_queue_head == record) - state->decode_queue_head = NULL; + /* It might also be the newest item decoded, decode_queue_tail. */ + if (state->decode_queue_tail == record) + state->decode_queue_tail = NULL; /* Release the space. */ if (unlikely(record->oversized)) @@ -302,11 +302,11 @@ XLogReleasePreviousRecord(XLogReaderState *state) } else { - /* It must be the tail record in the decode buffer. */ - Assert(state->decode_buffer_tail == (char *) record); + /* It must be the head (oldest) record in the decode buffer. */ + Assert(state->decode_buffer_head == (char *) record); /* - * We need to update tail to point to the next record that is in the + * We need to update head to point to the next record that is in the * decode buffer, if any, being careful to skip oversized ones * (they're not in the decode buffer). */ @@ -316,8 +316,8 @@ XLogReleasePreviousRecord(XLogReaderState *state) if (record) { - /* Adjust tail to release space up to the next record. */ - state->decode_buffer_tail = (char *) record; + /* Adjust head to release space up to the next record. */ + state->decode_buffer_head = (char *) record; } else { @@ -327,8 +327,8 @@ XLogReleasePreviousRecord(XLogReaderState *state) * we'll keep overwriting the same piece of memory if we're not * doing any prefetching. */ - state->decode_buffer_tail = state->decode_buffer; state->decode_buffer_head = state->decode_buffer; + state->decode_buffer_tail = state->decode_buffer; } } } @@ -351,7 +351,7 @@ XLogNextRecord(XLogReaderState *state, char **errormsg) /* Release the last record returned by XLogNextRecord(). */ XLogReleasePreviousRecord(state); - if (state->decode_queue_tail == NULL) + if (state->decode_queue_head == NULL) { *errormsg = NULL; if (state->errormsg_deferred) @@ -376,7 +376,7 @@ XLogNextRecord(XLogReaderState *state, char **errormsg) * XLogRecXXX(xlogreader) macros, which work with the decoder rather than * the record for historical reasons. */ - state->record = state->decode_queue_tail; + state->record = state->decode_queue_head; /* * Update the pointers to the beginning and one-past-the-end of this @@ -428,12 +428,12 @@ XLogReadRecord(XLogReaderState *state, char **errormsg) if (!XLogReaderHasQueuedRecordOrError(state)) XLogReadAhead(state, false /* nonblocking */ ); - /* Consume the tail record or error. */ + /* Consume the head record or error. */ decoded = XLogNextRecord(state, errormsg); if (decoded) { /* - * XLogReadRecord() returns a pointer to the record's header, not the + * This function returns a pointer to the record's header, not the * actual decoded record. The caller will access the decoded record * through the XLogRecGetXXX() macros, which reach the decoded * recorded as xlogreader->record. @@ -451,6 +451,11 @@ XLogReadRecord(XLogReaderState *state, char **errormsg) * decoded record wouldn't fit in the decode buffer and must eventually be * freed explicitly. * + * The caller is responsible for adjusting decode_buffer_tail with the real + * size after successfully decoding a record into this space. This way, if + * decoding fails, then there is nothing to undo unless the 'oversized' flag + * was set and pfree() must be called. + * * Return NULL if there is no space in the decode buffer and allow_oversized * is false, or if memory allocation fails for an oversized buffer. */ @@ -470,21 +475,23 @@ XLogReadRecordAlloc(XLogReaderState *state, size_t xl_tot_len, bool allow_oversi state->decode_buffer_tail = state->decode_buffer; state->free_decode_buffer = true; } - if (state->decode_buffer_head >= state->decode_buffer_tail) + + /* Try to allocate space in the circular decode buffer. */ + if (state->decode_buffer_tail >= state->decode_buffer_head) { - /* Empty, or head is to the right of tail. */ - if (state->decode_buffer_head + required_space <= + /* Empty, or tail is to the right of head. */ + if (state->decode_buffer_tail + required_space <= state->decode_buffer + state->decode_buffer_size) { - /* There is space between head and end. */ - decoded = (DecodedXLogRecord *) state->decode_buffer_head; + /* There is space between tail and end. */ + decoded = (DecodedXLogRecord *) state->decode_buffer_tail; decoded->oversized = false; return decoded; } else if (state->decode_buffer + required_space < - state->decode_buffer_tail) + state->decode_buffer_head) { - /* There is space between start and tail. */ + /* There is space between start and head. */ decoded = (DecodedXLogRecord *) state->decode_buffer; decoded->oversized = false; return decoded; @@ -492,12 +499,12 @@ XLogReadRecordAlloc(XLogReaderState *state, size_t xl_tot_len, bool allow_oversi } else { - /* Head is to the left of tail. */ - if (state->decode_buffer_head + required_space < - state->decode_buffer_tail) + /* Tail is to the left of head. */ + if (state->decode_buffer_tail + required_space < + state->decode_buffer_head) { - /* There is space between head and tail. */ - decoded = (DecodedXLogRecord *) state->decode_buffer_head; + /* There is space between tail and heade. */ + decoded = (DecodedXLogRecord *) state->decode_buffer_tail; decoded->oversized = false; return decoded; } @@ -513,7 +520,7 @@ XLogReadRecordAlloc(XLogReaderState *state, size_t xl_tot_len, bool allow_oversi return decoded; } - return decoded; + return NULL; } static XLogPageReadResult @@ -748,7 +755,6 @@ restart: if (pageHeader->xlp_info & XLP_FIRST_IS_OVERWRITE_CONTRECORD) { state->overwrittenRecPtr = RecPtr; - //ResetDecoder(state); RecPtr = targetPagePtr; goto restart; } @@ -865,18 +871,18 @@ restart: /* The new decode buffer head must be MAXALIGNed. */ Assert(decoded->size == MAXALIGN(decoded->size)); if ((char *) decoded == state->decode_buffer) - state->decode_buffer_head = state->decode_buffer + decoded->size; + state->decode_buffer_tail = state->decode_buffer + decoded->size; else - state->decode_buffer_head += decoded->size; + state->decode_buffer_tail += decoded->size; } /* Insert it into the queue of decoded records. */ - Assert(state->decode_queue_head != decoded); - if (state->decode_queue_head) - state->decode_queue_head->next = decoded; - state->decode_queue_head = decoded; - if (!state->decode_queue_tail) - state->decode_queue_tail = decoded; + Assert(state->decode_queue_tail != decoded); + if (state->decode_queue_tail) + state->decode_queue_tail->next = decoded; + state->decode_queue_tail = decoded; + if (!state->decode_queue_head) + state->decode_queue_head = decoded; return XLREAD_SUCCESS; } else @@ -935,8 +941,8 @@ XLogReadAhead(XLogReaderState *state, bool nonblocking) result = XLogDecodeNextRecord(state, nonblocking); if (result == XLREAD_SUCCESS) { - Assert(state->decode_queue_head != NULL); - return state->decode_queue_head; + Assert(state->decode_queue_tail != NULL); + return state->decode_queue_tail; } return NULL; @@ -946,8 +952,14 @@ XLogReadAhead(XLogReaderState *state, bool nonblocking) * Read a single xlog page including at least [pageptr, reqLen] of valid data * via the page_read() callback. * - * Returns -1 if the required page cannot be read for some reason; errormsg_buf - * is set in that case (unless the error occurs in the page_read callback). + * Returns XLREAD_FAIL if the required page cannot be read for some + * reason; errormsg_buf is set in that case (unless the error occurs in the + * page_read callback). + * + * Returns XLREAD_WOULDBLOCK if he requested data can't be read without + * waiting. This can be returned only if the installed page_read callback + * respects the state->nonblocking flag, and cannot read the requested data + * immediately. * * We fetch the page from a reader-local cache if we know we have the required * data and if there hasn't been any error since caching the data. @@ -1334,6 +1346,9 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) Assert(!XLogRecPtrIsInvalid(RecPtr)); + /* Make sure ReadPageInternal() can't return XLREAD_WOULDBLOCK. */ + state->nonblocking = false; + /* * skip over potential continuation data, keeping in mind that it may span * multiple pages @@ -1544,19 +1559,19 @@ ResetDecoder(XLogReaderState *state) DecodedXLogRecord *r; /* Reset the decoded record queue, freeing any oversized records. */ - while ((r = state->decode_queue_tail)) + while ((r = state->decode_queue_head) != NULL) { - state->decode_queue_tail = r->next; + state->decode_queue_head = r->next; if (r->oversized) pfree(r); } - state->decode_queue_head = NULL; state->decode_queue_tail = NULL; + state->decode_queue_head = NULL; state->record = NULL; /* Reset the decode buffer to empty. */ - state->decode_buffer_head = state->decode_buffer; state->decode_buffer_tail = state->decode_buffer; + state->decode_buffer_head = state->decode_buffer; /* Clear error state. */ state->errormsg_buf[0] = '\0'; diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 44d9313422..ea22577b41 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -373,7 +373,7 @@ XLogReadBufferForRedoExtended(XLogReaderState *record, * going to initialize it. And vice versa. */ zeromode = (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK); - willinit = (record->record->blocks[block_id].flags & BKPBLOCK_WILL_INIT) != 0; + willinit = (XLogRecGetBlock(record, block_id)->flags & BKPBLOCK_WILL_INIT) != 0; if (willinit && !zeromode) elog(PANIC, "block with WILL_INIT flag in WAL record must be zeroed by redo routine"); if (!willinit && zeromode) diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index c129df44ac..a33ad034c0 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -403,14 +403,13 @@ XLogDumpRecordLen(XLogReaderState *record, uint32 *rec_len, uint32 *fpi_len) * Calculate the amount of FPI data in the record. * * XXX: We peek into xlogreader's private decoded backup blocks for the - * bimg_len indicating the length of FPI data. It doesn't seem worth it to - * add an accessor macro for this. + * bimg_len indicating the length of FPI data. */ *fpi_len = 0; for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++) { if (XLogRecHasBlockImage(record, block_id)) - *fpi_len += record->record->blocks[block_id].bimg_len; + *fpi_len += XLogRecGetBlock(record, block_id)->bimg_len; } /* @@ -552,7 +551,7 @@ XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record) blk); if (XLogRecHasBlockImage(record, block_id)) { - uint8 bimg_info = record->record->blocks[block_id].bimg_info; + uint8 bimg_info = XLogRecGetBlock(record, block_id)->bimg_info; if (BKPIMAGE_COMPRESSED(bimg_info)) { @@ -569,11 +568,11 @@ XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record) "compression saved: %u, method: %s", XLogRecBlockImageApply(record, block_id) ? "" : " for WAL verification", - record->record->blocks[block_id].hole_offset, - record->record->blocks[block_id].hole_length, + XLogRecGetBlock(record, block_id)->hole_offset, + XLogRecGetBlock(record, block_id)->hole_length, BLCKSZ - - record->record->blocks[block_id].hole_length - - record->record->blocks[block_id].bimg_len, + XLogRecGetBlock(record, block_id)->hole_length - + XLogRecGetBlock(record, block_id)->bimg_len, method); } else @@ -581,8 +580,8 @@ XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record) printf(" (FPW%s); hole: offset: %u, length: %u", XLogRecBlockImageApply(record, block_id) ? "" : " for WAL verification", - record->record->blocks[block_id].hole_offset, - record->record->blocks[block_id].hole_length); + XLogRecGetBlock(record, block_id)->hole_offset, + XLogRecGetBlock(record, block_id)->hole_length); } } putchar('\n'); diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index 86a26a9231..8446050225 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -249,16 +249,16 @@ struct XLogReaderState char *decode_buffer; size_t decode_buffer_size; bool free_decode_buffer; /* need to free? */ - char *decode_buffer_head; /* write head */ - char *decode_buffer_tail; /* read head */ + char *decode_buffer_head; /* data is read from the head */ + char *decode_buffer_tail; /* new data is written at the tail */ /* * Queue of records that have been decoded. This is a linked list that * usually consists of consecutive records in decode_buffer, but may also * contain oversized records allocated with palloc(). */ - DecodedXLogRecord *decode_queue_head; /* newest decoded record */ - DecodedXLogRecord *decode_queue_tail; /* oldest decoded record */ + DecodedXLogRecord *decode_queue_head; /* oldest decoded record */ + DecodedXLogRecord *decode_queue_tail; /* newest decoded record */ /* * Buffer for currently read page (XLOG_BLCKSZ bytes, valid up to at least @@ -350,7 +350,7 @@ extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr); #endif /* FRONTEND */ /* Return values from XLogPageReadCB. */ -typedef enum XLogPageReadResultResult +typedef enum XLogPageReadResult { XLREAD_SUCCESS = 0, /* record is successfully read */ XLREAD_FAIL = -1, /* failed during reading a record */