Index: src/backend/access/transam/xlog.c =================================================================== RCS file: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v retrieving revision 1.210 diff -c -c -r1.210 xlog.c *** src/backend/access/transam/xlog.c 23 Jul 2005 15:31:16 -0000 1.210 --- src/backend/access/transam/xlog.c 23 Jul 2005 16:09:12 -0000 *************** *** 48,77 **** /* * This chunk of hackery attempts to determine which file sync methods * are available on the current platform, and to choose an appropriate * default method. We assume that fsync() is always available, and that * configure determined whether fdatasync() is. */ #if defined(O_SYNC) ! #define OPEN_SYNC_FLAG O_SYNC #else #if defined(O_FSYNC) ! #define OPEN_SYNC_FLAG O_FSYNC #endif #endif #if defined(O_DSYNC) #if defined(OPEN_SYNC_FLAG) ! #if O_DSYNC != OPEN_SYNC_FLAG ! #define OPEN_DATASYNC_FLAG O_DSYNC #endif #else /* !defined(OPEN_SYNC_FLAG) */ /* Win32 only has O_DSYNC */ ! #define OPEN_DATASYNC_FLAG O_DSYNC #endif #endif #if defined(OPEN_DATASYNC_FLAG) #define DEFAULT_SYNC_METHOD_STR "open_datasync" #define DEFAULT_SYNC_METHOD SYNC_METHOD_OPEN --- 48,114 ---- /* + * Becauase O_DIRECT bypasses the kernel buffers, and because we never + * read those buffers except during crash recovery, it seems like + * a win to use it in all cases where we sync on each write(). + */ + #ifdef O_DIRECT + #define PG_O_DIRECT O_DIRECT + #else + #define PG_O_DIRECT 0 + #endif + + /* * This chunk of hackery attempts to determine which file sync methods * are available on the current platform, and to choose an appropriate * default method. We assume that fsync() is always available, and that * configure determined whether fdatasync() is. */ #if defined(O_SYNC) ! #define CMP_OPEN_SYNC_FLAG O_SYNC #else #if defined(O_FSYNC) ! #define CMP_OPEN_SYNC_FLAG O_FSYNC #endif #endif + #define OPEN_SYNC_FLAG (CMP_OPEN_SYNC_FLAG | PG_O_DIRECT) #if defined(O_DSYNC) #if defined(OPEN_SYNC_FLAG) ! #if O_DSYNC != CMP_OPEN_SYNC_FLAG ! #define OPEN_DATASYNC_FLAG (O_DSYNC | PG_O_DIRECT) #endif #else /* !defined(OPEN_SYNC_FLAG) */ /* Win32 only has O_DSYNC */ ! #define OPEN_DATASYNC_FLAG (O_DSYNC | PG_O_DIRECT) #endif #endif + /* + * Limitation of buffer-alignment for direct io depend on OS and filesystem, + * but BLCKSZ is assumed to be enough for it. + */ + #ifdef O_DIRECT + #define ALIGNOF_XLOG_BUFFER BLCKSZ + #else + #define ALIGNOF_XLOG_BUFFER MAXIMUM_ALIGNOF + #endif + + /* + * Switch the alignment routine because ShmemAlloc() returns a max-aligned + * buffer and ALIGNOF_XLOG_BUFFER may be greater than MAXIMUM_ALIGNOF. + */ + #if ALIGNOF_XLOG_BUFFER <= MAXIMUM_ALIGNOF + #define XLOG_BUFFER_ALIGN(LEN) MAXALIGN((LEN)) + #else + #define XLOG_BUFFER_ALIGN(LEN) ((LEN) + (ALIGNOF_XLOG_BUFFER)) + #endif + /* assume sizeof(ptrdiff_t) == sizeof(void*) */ + #define POINTERALIGN(ALIGNVAL,PTR) \ + ((char *)(((ptrdiff_t) (PTR) + (ALIGNVAL-1)) & ~((ptrdiff_t) (ALIGNVAL-1)))) + #define XLOG_BUFFER_POINTERALIGN(PTR) \ + POINTERALIGN((ALIGNOF_XLOG_BUFFER), (PTR)) + #if defined(OPEN_DATASYNC_FLAG) #define DEFAULT_SYNC_METHOD_STR "open_datasync" #define DEFAULT_SYNC_METHOD SYNC_METHOD_OPEN *************** *** 469,474 **** --- 506,522 ---- static char *str_time(time_t tnow); static void issue_xlog_fsync(void); + /* XLog gather-write staffs */ + typedef struct XLogPages + { + char *head; /* Head of first page */ + int size; /* Total bytes of pages == count(pages) * BLCKSZ */ + int offset; /* Offset in xlog segment file */ + } XLogPages; + static void XLogPageReset(XLogPages *pages); + static void XLogPageWrite(XLogPages *pages, int index); + static void XLogPageFlush(XLogPages *pages, int index); + #ifdef WAL_DEBUG static void xlog_outrec(char *buf, XLogRecord *record); #endif *************** *** 1245,1253 **** XLogWrite(XLogwrtRqst WriteRqst) { XLogCtlWrite *Write = &XLogCtl->Write; - char *from; bool ispartialpage; bool use_existent; /* We should always be inside a critical section here */ Assert(CritSectionCount > 0); --- 1293,1302 ---- XLogWrite(XLogwrtRqst WriteRqst) { XLogCtlWrite *Write = &XLogCtl->Write; bool ispartialpage; bool use_existent; + int currentIndex = Write->curridx; + XLogPages pages; /* We should always be inside a critical section here */ Assert(CritSectionCount > 0); *************** *** 1258,1263 **** --- 1307,1314 ---- */ LogwrtResult = Write->LogwrtResult; + XLogPageReset(&pages); + while (XLByteLT(LogwrtResult.Write, WriteRqst.Write)) { /* *************** *** 1266,1279 **** * end of the last page that's been initialized by * AdvanceXLInsertBuffer. */ ! if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[Write->curridx])) elog(PANIC, "xlog write request %X/%X is past end of log %X/%X", LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff, ! XLogCtl->xlblocks[Write->curridx].xlogid, ! XLogCtl->xlblocks[Write->curridx].xrecoff); /* Advance LogwrtResult.Write to end of current buffer page */ ! LogwrtResult.Write = XLogCtl->xlblocks[Write->curridx]; ispartialpage = XLByteLT(WriteRqst.Write, LogwrtResult.Write); if (!XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg)) --- 1317,1330 ---- * end of the last page that's been initialized by * AdvanceXLInsertBuffer. */ ! if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[currentIndex])) elog(PANIC, "xlog write request %X/%X is past end of log %X/%X", LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff, ! XLogCtl->xlblocks[currentIndex].xlogid, ! XLogCtl->xlblocks[currentIndex].xrecoff); /* Advance LogwrtResult.Write to end of current buffer page */ ! LogwrtResult.Write = XLogCtl->xlblocks[currentIndex]; ispartialpage = XLByteLT(WriteRqst.Write, LogwrtResult.Write); if (!XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg)) *************** *** 1281,1286 **** --- 1332,1338 ---- /* * Switch to new logfile segment. */ + XLogPageFlush(&pages, currentIndex); if (openLogFile >= 0) { if (close(openLogFile)) *************** *** 1354,1384 **** openLogOff = 0; } ! /* Need to seek in the file? */ ! if (openLogOff != (LogwrtResult.Write.xrecoff - BLCKSZ) % XLogSegSize) ! { ! openLogOff = (LogwrtResult.Write.xrecoff - BLCKSZ) % XLogSegSize; ! if (lseek(openLogFile, (off_t) openLogOff, SEEK_SET) < 0) ! ereport(PANIC, ! (errcode_for_file_access(), ! errmsg("could not seek in log file %u, segment %u to offset %u: %m", ! openLogId, openLogSeg, openLogOff))); ! } ! ! /* OK to write the page */ ! from = XLogCtl->pages + Write->curridx * BLCKSZ; ! errno = 0; ! if (write(openLogFile, from, BLCKSZ) != BLCKSZ) ! { ! /* if write didn't set errno, assume problem is no disk space */ ! if (errno == 0) ! errno = ENOSPC; ! ereport(PANIC, ! (errcode_for_file_access(), ! errmsg("could not write to log file %u, segment %u at offset %u: %m", ! openLogId, openLogSeg, openLogOff))); ! } ! openLogOff += BLCKSZ; /* * If we just wrote the whole last page of a logfile segment, --- 1406,1413 ---- openLogOff = 0; } ! /* Add a page to buffer */ ! XLogPageWrite(&pages, currentIndex); /* * If we just wrote the whole last page of a logfile segment, *************** *** 1390,1397 **** * This is also the right place to notify the Archiver that the * segment is ready to copy to archival storage. */ ! if (openLogOff >= XLogSegSize && !ispartialpage) { issue_xlog_fsync(); LogwrtResult.Flush = LogwrtResult.Write; /* end of current page */ --- 1419,1427 ---- * This is also the right place to notify the Archiver that the * segment is ready to copy to archival storage. */ ! if (openLogOff + pages.size >= XLogSegSize && !ispartialpage) { + XLogPageFlush(&pages, currentIndex); issue_xlog_fsync(); LogwrtResult.Flush = LogwrtResult.Write; /* end of current page */ *************** *** 1405,1412 **** LogwrtResult.Write = WriteRqst.Write; break; } ! Write->curridx = NextBufIdx(Write->curridx); } /* * If asked to flush, do so --- 1435,1443 ---- LogwrtResult.Write = WriteRqst.Write; break; } ! currentIndex = NextBufIdx(currentIndex); } + XLogPageFlush(&pages, currentIndex); /* * If asked to flush, do so *************** *** 3584,3590 **** if (XLOGbuffers < MinXLOGbuffers) XLOGbuffers = MinXLOGbuffers; ! return MAXALIGN(sizeof(XLogCtlData) + sizeof(XLogRecPtr) * XLOGbuffers) + BLCKSZ * XLOGbuffers + MAXALIGN(sizeof(ControlFileData)); } --- 3615,3621 ---- if (XLOGbuffers < MinXLOGbuffers) XLOGbuffers = MinXLOGbuffers; ! return XLOG_BUFFER_ALIGN(sizeof(XLogCtlData) + sizeof(XLogRecPtr) * XLOGbuffers) + BLCKSZ * XLOGbuffers + MAXALIGN(sizeof(ControlFileData)); } *************** *** 3601,3607 **** XLogCtl = (XLogCtlData *) ShmemInitStruct("XLOG Ctl", ! MAXALIGN(sizeof(XLogCtlData) + sizeof(XLogRecPtr) * XLOGbuffers) + BLCKSZ * XLOGbuffers, &foundXLog); --- 3632,3638 ---- XLogCtl = (XLogCtlData *) ShmemInitStruct("XLOG Ctl", ! XLOG_BUFFER_ALIGN(sizeof(XLogCtlData) + sizeof(XLogRecPtr) * XLOGbuffers) + BLCKSZ * XLOGbuffers, &foundXLog); *************** *** 3630,3638 **** * Here, on the other hand, we must MAXALIGN to ensure the page * buffers have worst-case alignment. */ ! XLogCtl->pages = ! ((char *) XLogCtl) + MAXALIGN(sizeof(XLogCtlData) + ! sizeof(XLogRecPtr) * XLOGbuffers); memset(XLogCtl->pages, 0, BLCKSZ * XLOGbuffers); /* --- 3661,3669 ---- * Here, on the other hand, we must MAXALIGN to ensure the page * buffers have worst-case alignment. */ ! XLogCtl->pages = XLOG_BUFFER_POINTERALIGN( ! ((char *) XLogCtl) ! + sizeof(XLogCtlData) + sizeof(XLogRecPtr) * XLOGbuffers); memset(XLogCtl->pages, 0, BLCKSZ * XLOGbuffers); /* *************** *** 3662,3667 **** --- 3693,3699 ---- { CheckPoint checkPoint; char *buffer; + char buffer0[BLCKSZ + ALIGNOF_XLOG_BUFFER]; XLogPageHeader page; XLogLongPageHeader longpage; XLogRecord *record; *************** *** 3690,3697 **** /* First timeline ID is always 1 */ ThisTimeLineID = 1; ! /* Use malloc() to ensure buffer is MAXALIGNED */ ! buffer = (char *) malloc(BLCKSZ); page = (XLogPageHeader) buffer; memset(buffer, 0, BLCKSZ); --- 3722,3728 ---- /* First timeline ID is always 1 */ ThisTimeLineID = 1; ! buffer = XLOG_BUFFER_POINTERALIGN(buffer0); page = (XLogPageHeader) buffer; memset(buffer, 0, BLCKSZ); *************** *** 5837,5839 **** --- 5868,5938 ---- errmsg("could not remove file \"%s\": %m", BACKUP_LABEL_FILE))); } + + + /* XLog gather-write staffs */ + + static void + XLogPageReset(XLogPages *pages) + { + memset(pages, 0, sizeof(*pages)); + } + + static void + XLogPageWrite(XLogPages *pages, int index) + { + char *page = XLogCtl->pages + index * BLCKSZ; + int size = BLCKSZ; + int offset = (LogwrtResult.Write.xrecoff - BLCKSZ) % XLogSegSize; + + if (pages->head + pages->size == page + && pages->offset + pages->size == offset) + { /* Pages are continuous. Append new page. */ + pages->size += size; + } + else + { /* Pages are not continuous. Flush and clear. */ + XLogPageFlush(pages, PrevBufIdx(index)); + pages->head = page; + pages->size = size; + pages->offset = offset; + } + } + + static void + XLogPageFlush(XLogPages *pages, int index) + { + if (!pages->head) + { /* No needs to write pages. */ + XLogCtl->Write.curridx = index; + return; + } + + /* Need to seek in the file? */ + if (openLogOff != pages->offset) + { + openLogOff = pages->offset; + if (lseek(openLogFile, (off_t) openLogOff, SEEK_SET) < 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not seek in log file %u, segment %u to offset %u: %m", + openLogId, openLogSeg, openLogOff))); + } + + /* OK to write the page */ + errno = 0; + if (write(openLogFile, pages->head, pages->size) != pages->size) + { + /* if write didn't set errno, assume problem is no disk space */ + if (errno == 0) + errno = ENOSPC; + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not write to log file %u, segment %u at offset %u: %m", + openLogId, openLogSeg, openLogOff))); + } + + openLogOff += pages->size; + XLogCtl->Write.curridx = index; + XLogPageReset(pages); + }