77 * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
88 * Portions Copyright (c) 1994, Regents of the University of California
99 *
10- * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.217 2005/08/2200:41:28 tgl Exp $
10+ * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.218 2005/08/2223:59:04 tgl Exp $
1111 *
1212 *-------------------------------------------------------------------------
1313 */
7070 * default method.We assume that fsync() is always available, and that
7171 * configure determined whether fdatasync() is.
7272 */
73- #ifdef O_SYNC
74- #define CMP_OPEN_SYNC_FLAG O_SYNC
73+ #if defined( O_SYNC )
74+ #define BARE_OPEN_SYNC_FLAG O_SYNC
7575#elif defined(O_FSYNC )
76- #define CMP_OPEN_SYNC_FLAG O_FSYNC
76+ #define BARE_OPEN_SYNC_FLAG O_FSYNC
7777#endif
78- #ifdef CMP_OPEN_SYNC_FLAG
79- #define OPEN_SYNC_FLAG (CMP_OPEN_SYNC_FLAG | PG_O_DIRECT)
78+ #ifdef BARE_OPEN_SYNC_FLAG
79+ #define OPEN_SYNC_FLAG (BARE_OPEN_SYNC_FLAG | PG_O_DIRECT)
8080#endif
8181
82- #ifdef O_DSYNC
83- #ifdef OPEN_SYNC_FLAG
82+ #if defined( O_DSYNC )
83+ #if defined( OPEN_SYNC_FLAG )
8484/* O_DSYNC is distinct? */
85- #if O_DSYNC != CMP_OPEN_SYNC_FLAG
85+ #if O_DSYNC != BARE_OPEN_SYNC_FLAG
8686#define OPEN_DATASYNC_FLAG (O_DSYNC | PG_O_DIRECT)
8787#endif
8888#else /* !defined(OPEN_SYNC_FLAG) */
9191#endif
9292#endif
9393
94- #ifdef OPEN_DATASYNC_FLAG
94+ #if defined( OPEN_DATASYNC_FLAG )
9595#define DEFAULT_SYNC_METHOD_STR "open_datasync"
9696#define DEFAULT_SYNC_METHOD SYNC_METHOD_OPEN
9797#define DEFAULT_SYNC_FLAGBIT OPEN_DATASYNC_FLAG
@@ -469,7 +469,7 @@ static bool recoveryStopsHere(XLogRecord *record, bool *includeThis);
469469static bool XLogCheckBuffer (XLogRecData * rdata ,
470470XLogRecPtr * lsn ,BkpBlock * bkpb );
471471static bool AdvanceXLInsertBuffer (void );
472- static void XLogWrite (XLogwrtRqst WriteRqst );
472+ static void XLogWrite (XLogwrtRqst WriteRqst , bool flexible );
473473static int XLogFileInit (uint32 log ,uint32 seg ,
474474bool * use_existent ,bool use_lock );
475475static bool InstallXLogFileSegment (uint32 * log ,uint32 * seg ,char * tmppath ,
@@ -497,18 +497,6 @@ static void ReadControlFile(void);
497497static char * str_time (time_t tnow );
498498static void issue_xlog_fsync (void );
499499
500- /* XLog gather-write stuff */
501- typedef struct XLogPages
502- {
503- char * head ;/* Start of first page to write */
504- Size size ;/* Total bytes to write == count(pages) * BLCKSZ */
505- uint32 offset ;/* Starting offset in xlog segment file */
506- }XLogPages ;
507-
508- static void XLogPageReset (XLogPages * pages );
509- static void XLogPageWrite (XLogPages * pages ,int index );
510- static void XLogPageFlush (XLogPages * pages ,int index );
511-
512500#ifdef WAL_DEBUG
513501static void xlog_outrec (char * buf ,XLogRecord * record );
514502#endif
@@ -726,9 +714,17 @@ begin:;
726714{
727715if (LWLockConditionalAcquire (WALWriteLock ,LW_EXCLUSIVE ))
728716{
717+ /*
718+ * Since the amount of data we write here is completely optional
719+ * anyway, tell XLogWrite it can be "flexible" and stop at a
720+ * convenient boundary. This allows writes triggered by this
721+ * mechanism to synchronize with the cache boundaries, so that
722+ * in a long transaction we'll basically dump alternating halves
723+ * of the buffer array.
724+ */
729725LogwrtResult = XLogCtl -> Write .LogwrtResult ;
730726if (XLByteLT (LogwrtResult .Write ,LogwrtRqst .Write ))
731- XLogWrite (LogwrtRqst );
727+ XLogWrite (LogwrtRqst , true );
732728LWLockRelease (WALWriteLock );
733729}
734730}
@@ -1219,7 +1215,7 @@ AdvanceXLInsertBuffer(void)
12191215WriteRqst .Write = OldPageRqstPtr ;
12201216WriteRqst .Flush .xlogid = 0 ;
12211217WriteRqst .Flush .xrecoff = 0 ;
1222- XLogWrite (WriteRqst );
1218+ XLogWrite (WriteRqst , false );
12231219LWLockRelease (WALWriteLock );
12241220Insert -> LogwrtResult = LogwrtResult ;
12251221}
@@ -1279,16 +1275,24 @@ AdvanceXLInsertBuffer(void)
12791275/*
12801276 * Write and/or fsync the log at least as far as WriteRqst indicates.
12811277 *
1278+ * If flexible == TRUE, we don't have to write as far as WriteRqst, but
1279+ * may stop at any convenient boundary (such as a cache or logfile boundary).
1280+ * This option allows us to avoid uselessly issuing multiple writes when a
1281+ * single one would do.
1282+ *
12821283 * Must be called with WALWriteLock held.
12831284 */
12841285static void
1285- XLogWrite (XLogwrtRqst WriteRqst )
1286+ XLogWrite (XLogwrtRqst WriteRqst , bool flexible )
12861287{
12871288XLogCtlWrite * Write = & XLogCtl -> Write ;
12881289bool ispartialpage ;
1290+ bool finishing_seg ;
12891291bool use_existent ;
1290- int currentIndex = Write -> curridx ;
1291- XLogPages pages ;
1292+ int curridx ;
1293+ int npages ;
1294+ int startidx ;
1295+ uint32 startoffset ;
12921296
12931297/* We should always be inside a critical section here */
12941298Assert (CritSectionCount > 0 );
@@ -1299,7 +1303,27 @@ XLogWrite(XLogwrtRqst WriteRqst)
12991303 */
13001304LogwrtResult = Write -> LogwrtResult ;
13011305
1302- XLogPageReset (& pages );
1306+ /*
1307+ * Since successive pages in the xlog cache are consecutively allocated,
1308+ * we can usually gather multiple pages together and issue just one
1309+ * write() call. npages is the number of pages we have determined can
1310+ * be written together; startidx is the cache block index of the first
1311+ * one, and startoffset is the file offset at which it should go.
1312+ * The latter two variables are only valid when npages > 0, but we must
1313+ * initialize all of them to keep the compiler quiet.
1314+ */
1315+ npages = 0 ;
1316+ startidx = 0 ;
1317+ startoffset = 0 ;
1318+
1319+ /*
1320+ * Within the loop, curridx is the cache block index of the page to
1321+ * consider writing. We advance Write->curridx only after successfully
1322+ * writing pages. (Right now, this refinement is useless since we are
1323+ * going to PANIC if any error occurs anyway; but someday it may come
1324+ * in useful.)
1325+ */
1326+ curridx = Write -> curridx ;
13031327
13041328while (XLByteLT (LogwrtResult .Write ,WriteRqst .Write ))
13051329{
@@ -1309,22 +1333,23 @@ XLogWrite(XLogwrtRqst WriteRqst)
13091333 * end of the last page that's been initialized by
13101334 * AdvanceXLInsertBuffer.
13111335 */
1312- if (!XLByteLT (LogwrtResult .Write ,XLogCtl -> xlblocks [currentIndex ]))
1336+ if (!XLByteLT (LogwrtResult .Write ,XLogCtl -> xlblocks [curridx ]))
13131337elog (PANIC ,"xlog write request %X/%X is past end of log %X/%X" ,
13141338LogwrtResult .Write .xlogid ,LogwrtResult .Write .xrecoff ,
1315- XLogCtl -> xlblocks [currentIndex ].xlogid ,
1316- XLogCtl -> xlblocks [currentIndex ].xrecoff );
1339+ XLogCtl -> xlblocks [curridx ].xlogid ,
1340+ XLogCtl -> xlblocks [curridx ].xrecoff );
13171341
13181342/* Advance LogwrtResult.Write to end of current buffer page */
1319- LogwrtResult .Write = XLogCtl -> xlblocks [currentIndex ];
1343+ LogwrtResult .Write = XLogCtl -> xlblocks [curridx ];
13201344ispartialpage = XLByteLT (WriteRqst .Write ,LogwrtResult .Write );
13211345
13221346if (!XLByteInPrevSeg (LogwrtResult .Write ,openLogId ,openLogSeg ))
13231347{
13241348/*
1325- * Switch to new logfile segment.
1349+ * Switch to new logfile segment. We cannot have any pending
1350+ * pages here (since we dump what we have at segment end).
13261351 */
1327- XLogPageFlush ( & pages , currentIndex );
1352+ Assert ( npages == 0 );
13281353if (openLogFile >=0 )
13291354{
13301355if (close (openLogFile ))
@@ -1391,34 +1416,91 @@ XLogWrite(XLogwrtRqst WriteRqst)
13911416LWLockRelease (ControlFileLock );
13921417}
13931418
1419+ /* Make sure we have the current logfile open */
13941420if (openLogFile < 0 )
13951421{
13961422XLByteToPrevSeg (LogwrtResult .Write ,openLogId ,openLogSeg );
13971423openLogFile = XLogFileOpen (openLogId ,openLogSeg );
13981424openLogOff = 0 ;
13991425}
14001426
1401- /* Add a page to buffer */
1402- XLogPageWrite (& pages ,currentIndex );
1427+ /* Add current page to the set of pending pages-to-dump */
1428+ if (npages == 0 )
1429+ {
1430+ /* first of group */
1431+ startidx = curridx ;
1432+ startoffset = (LogwrtResult .Write .xrecoff - BLCKSZ ) %XLogSegSize ;
1433+ }
1434+ npages ++ ;
14031435
14041436/*
1405- * If we just wrote the whole last page of a logfile segment,
1406- * fsync the segment immediately. This avoids having to go back
1407- * and re-open prior segments when an fsync request comes along
1408- * later. Doing it here ensures that one and only one backend will
1409- * perform this fsync.
1410- *
1411- * This is also the right place to notify the Archiver that the
1412- * segment is ready to copy to archival storage.
1437+ * Dump the set if this will be the last loop iteration, or if
1438+ * we are at the last page of the cache area (since the next page
1439+ * won't be contiguous in memory), or if we are at the end of the
1440+ * logfile segment.
14131441 */
1414- if (openLogOff + pages .size >=XLogSegSize && !ispartialpage )
1442+ finishing_seg = !ispartialpage &&
1443+ (startoffset + npages * BLCKSZ ) >=XLogSegSize ;
1444+
1445+ if (!XLByteLT (LogwrtResult .Write ,WriteRqst .Write )||
1446+ curridx == XLogCtl -> XLogCacheBlck ||
1447+ finishing_seg )
14151448{
1416- XLogPageFlush (& pages ,currentIndex );
1417- issue_xlog_fsync ();
1418- LogwrtResult .Flush = LogwrtResult .Write ;/* end of current page */
1449+ char * from ;
1450+ Size nbytes ;
14191451
1420- if (XLogArchivingActive ())
1421- XLogArchiveNotifySeg (openLogId ,openLogSeg );
1452+ /* Need to seek in the file? */
1453+ if (openLogOff != startoffset )
1454+ {
1455+ if (lseek (openLogFile , (off_t )startoffset ,SEEK_SET )< 0 )
1456+ ereport (PANIC ,
1457+ (errcode_for_file_access (),
1458+ errmsg ("could not seek in log file %u, "
1459+ "segment %u to offset %u: %m" ,
1460+ openLogId ,openLogSeg ,startoffset )));
1461+ openLogOff = startoffset ;
1462+ }
1463+
1464+ /* OK to write the page(s) */
1465+ from = XLogCtl -> pages + startidx * (Size )BLCKSZ ;
1466+ nbytes = npages * (Size )BLCKSZ ;
1467+ errno = 0 ;
1468+ if (write (openLogFile ,from ,nbytes )!= nbytes )
1469+ {
1470+ /* if write didn't set errno, assume no disk space */
1471+ if (errno == 0 )
1472+ errno = ENOSPC ;
1473+ ereport (PANIC ,
1474+ (errcode_for_file_access (),
1475+ errmsg ("could not write to log file %u, segment %u "
1476+ "at offset %u length %lu: %m" ,
1477+ openLogId ,openLogSeg ,
1478+ openLogOff , (unsigned long )nbytes )));
1479+ }
1480+
1481+ /* Update state for write */
1482+ openLogOff += nbytes ;
1483+ Write -> curridx = ispartialpage ?curridx :NextBufIdx (curridx );
1484+ npages = 0 ;
1485+
1486+ /*
1487+ * If we just wrote the whole last page of a logfile segment,
1488+ * fsync the segment immediately. This avoids having to go back
1489+ * and re-open prior segments when an fsync request comes along
1490+ * later. Doing it here ensures that one and only one backend will
1491+ * perform this fsync.
1492+ *
1493+ * This is also the right place to notify the Archiver that the
1494+ * segment is ready to copy to archival storage.
1495+ */
1496+ if (finishing_seg )
1497+ {
1498+ issue_xlog_fsync ();
1499+ LogwrtResult .Flush = LogwrtResult .Write ;/* end of page */
1500+
1501+ if (XLogArchivingActive ())
1502+ XLogArchiveNotifySeg (openLogId ,openLogSeg );
1503+ }
14221504}
14231505
14241506if (ispartialpage )
@@ -1427,9 +1509,15 @@ XLogWrite(XLogwrtRqst WriteRqst)
14271509LogwrtResult .Write = WriteRqst .Write ;
14281510break ;
14291511}
1430- currentIndex = NextBufIdx (currentIndex );
1512+ curridx = NextBufIdx (curridx );
1513+
1514+ /* If flexible, break out of loop as soon as we wrote something */
1515+ if (flexible && npages == 0 )
1516+ break ;
14311517}
1432- XLogPageFlush (& pages ,currentIndex );
1518+
1519+ Assert (npages == 0 );
1520+ Assert (curridx == Write -> curridx );
14331521
14341522/*
14351523 * If asked to flush, do so
@@ -1572,7 +1660,7 @@ XLogFlush(XLogRecPtr record)
15721660WriteRqst .Write = WriteRqstPtr ;
15731661WriteRqst .Flush = record ;
15741662}
1575- XLogWrite (WriteRqst );
1663+ XLogWrite (WriteRqst , false );
15761664}
15771665LWLockRelease (WALWriteLock );
15781666}
@@ -5898,72 +5986,3 @@ remove_backup_label(void)
58985986errmsg ("could not remove file \"%s\": %m" ,
58995987BACKUP_LABEL_FILE )));
59005988}
5901-
5902-
5903- /* XLog gather-write stuff */
5904-
5905- static void
5906- XLogPageReset (XLogPages * pages )
5907- {
5908- memset (pages ,0 ,sizeof (* pages ));
5909- }
5910-
5911- static void
5912- XLogPageWrite (XLogPages * pages ,int index )
5913- {
5914- char * page = XLogCtl -> pages + index * (Size )BLCKSZ ;
5915- Size size = BLCKSZ ;
5916- uint32 offset = (LogwrtResult .Write .xrecoff - BLCKSZ ) %XLogSegSize ;
5917-
5918- if (pages -> head + pages -> size == page &&
5919- pages -> offset + pages -> size == offset )
5920- {/* Pages are continuous. Append new page. */
5921- pages -> size += size ;
5922- }
5923- else
5924- {/* Pages are not continuous. Flush and clear. */
5925- XLogPageFlush (pages ,PrevBufIdx (index ));
5926- pages -> head = page ;
5927- pages -> size = size ;
5928- pages -> offset = offset ;
5929- }
5930- }
5931-
5932- static void
5933- XLogPageFlush (XLogPages * pages ,int index )
5934- {
5935- if (!pages -> head )
5936- {/* Nothing to write */
5937- XLogCtl -> Write .curridx = index ;
5938- return ;
5939- }
5940-
5941- /* Need to seek in the file? */
5942- if (openLogOff != pages -> offset )
5943- {
5944- openLogOff = pages -> offset ;
5945- if (lseek (openLogFile , (off_t )openLogOff ,SEEK_SET )< 0 )
5946- ereport (PANIC ,
5947- (errcode_for_file_access (),
5948- errmsg ("could not seek in log file %u, segment %u to offset %u: %m" ,
5949- openLogId ,openLogSeg ,openLogOff )));
5950- }
5951-
5952- /* OK to write the page */
5953- errno = 0 ;
5954- if (write (openLogFile ,pages -> head ,pages -> size )!= pages -> size )
5955- {
5956- /* if write didn't set errno, assume problem is no disk space */
5957- if (errno == 0 )
5958- errno = ENOSPC ;
5959- ereport (PANIC ,
5960- (errcode_for_file_access (),
5961- errmsg ("could not write to log file %u, segment %u length %u at offset %u: %m" ,
5962- openLogId ,openLogSeg ,
5963- (unsignedint )pages -> size ,openLogOff )));
5964- }
5965-
5966- openLogOff += pages -> size ;
5967- XLogCtl -> Write .curridx = index ;
5968- XLogPageReset (pages );
5969- }