7
7
* Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
8
8
* Portions Copyright (c) 1994, Regents of the University of California
9
9
*
10
- * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.217 2005/08/2200:41:28 tgl Exp $
10
+ * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.218 2005/08/2223:59:04 tgl Exp $
11
11
*
12
12
*-------------------------------------------------------------------------
13
13
*/
70
70
* default method.We assume that fsync() is always available, and that
71
71
* configure determined whether fdatasync() is.
72
72
*/
73
- #ifdef O_SYNC
74
- #define CMP_OPEN_SYNC_FLAG O_SYNC
73
+ #if defined( O_SYNC )
74
+ #define BARE_OPEN_SYNC_FLAG O_SYNC
75
75
#elif defined(O_FSYNC )
76
- #define CMP_OPEN_SYNC_FLAG O_FSYNC
76
+ #define BARE_OPEN_SYNC_FLAG O_FSYNC
77
77
#endif
78
- #ifdef CMP_OPEN_SYNC_FLAG
79
- #define OPEN_SYNC_FLAG (CMP_OPEN_SYNC_FLAG | PG_O_DIRECT)
78
+ #ifdef BARE_OPEN_SYNC_FLAG
79
+ #define OPEN_SYNC_FLAG (BARE_OPEN_SYNC_FLAG | PG_O_DIRECT)
80
80
#endif
81
81
82
- #ifdef O_DSYNC
83
- #ifdef OPEN_SYNC_FLAG
82
+ #if defined( O_DSYNC )
83
+ #if defined( OPEN_SYNC_FLAG )
84
84
/* O_DSYNC is distinct? */
85
- #if O_DSYNC != CMP_OPEN_SYNC_FLAG
85
+ #if O_DSYNC != BARE_OPEN_SYNC_FLAG
86
86
#define OPEN_DATASYNC_FLAG (O_DSYNC | PG_O_DIRECT)
87
87
#endif
88
88
#else /* !defined(OPEN_SYNC_FLAG) */
91
91
#endif
92
92
#endif
93
93
94
- #ifdef OPEN_DATASYNC_FLAG
94
+ #if defined( OPEN_DATASYNC_FLAG )
95
95
#define DEFAULT_SYNC_METHOD_STR "open_datasync"
96
96
#define DEFAULT_SYNC_METHOD SYNC_METHOD_OPEN
97
97
#define DEFAULT_SYNC_FLAGBIT OPEN_DATASYNC_FLAG
@@ -469,7 +469,7 @@ static bool recoveryStopsHere(XLogRecord *record, bool *includeThis);
469
469
static bool XLogCheckBuffer (XLogRecData * rdata ,
470
470
XLogRecPtr * lsn ,BkpBlock * bkpb );
471
471
static bool AdvanceXLInsertBuffer (void );
472
- static void XLogWrite (XLogwrtRqst WriteRqst );
472
+ static void XLogWrite (XLogwrtRqst WriteRqst , bool flexible );
473
473
static int XLogFileInit (uint32 log ,uint32 seg ,
474
474
bool * use_existent ,bool use_lock );
475
475
static bool InstallXLogFileSegment (uint32 * log ,uint32 * seg ,char * tmppath ,
@@ -497,18 +497,6 @@ static void ReadControlFile(void);
497
497
static char * str_time (time_t tnow );
498
498
static void issue_xlog_fsync (void );
499
499
500
- /* XLog gather-write stuff */
501
- typedef struct XLogPages
502
- {
503
- char * head ;/* Start of first page to write */
504
- Size size ;/* Total bytes to write == count(pages) * BLCKSZ */
505
- uint32 offset ;/* Starting offset in xlog segment file */
506
- }XLogPages ;
507
-
508
- static void XLogPageReset (XLogPages * pages );
509
- static void XLogPageWrite (XLogPages * pages ,int index );
510
- static void XLogPageFlush (XLogPages * pages ,int index );
511
-
512
500
#ifdef WAL_DEBUG
513
501
static void xlog_outrec (char * buf ,XLogRecord * record );
514
502
#endif
@@ -726,9 +714,17 @@ begin:;
726
714
{
727
715
if (LWLockConditionalAcquire (WALWriteLock ,LW_EXCLUSIVE ))
728
716
{
717
+ /*
718
+ * Since the amount of data we write here is completely optional
719
+ * anyway, tell XLogWrite it can be "flexible" and stop at a
720
+ * convenient boundary. This allows writes triggered by this
721
+ * mechanism to synchronize with the cache boundaries, so that
722
+ * in a long transaction we'll basically dump alternating halves
723
+ * of the buffer array.
724
+ */
729
725
LogwrtResult = XLogCtl -> Write .LogwrtResult ;
730
726
if (XLByteLT (LogwrtResult .Write ,LogwrtRqst .Write ))
731
- XLogWrite (LogwrtRqst );
727
+ XLogWrite (LogwrtRqst , true );
732
728
LWLockRelease (WALWriteLock );
733
729
}
734
730
}
@@ -1219,7 +1215,7 @@ AdvanceXLInsertBuffer(void)
1219
1215
WriteRqst .Write = OldPageRqstPtr ;
1220
1216
WriteRqst .Flush .xlogid = 0 ;
1221
1217
WriteRqst .Flush .xrecoff = 0 ;
1222
- XLogWrite (WriteRqst );
1218
+ XLogWrite (WriteRqst , false );
1223
1219
LWLockRelease (WALWriteLock );
1224
1220
Insert -> LogwrtResult = LogwrtResult ;
1225
1221
}
@@ -1279,16 +1275,24 @@ AdvanceXLInsertBuffer(void)
1279
1275
/*
1280
1276
* Write and/or fsync the log at least as far as WriteRqst indicates.
1281
1277
*
1278
+ * If flexible == TRUE, we don't have to write as far as WriteRqst, but
1279
+ * may stop at any convenient boundary (such as a cache or logfile boundary).
1280
+ * This option allows us to avoid uselessly issuing multiple writes when a
1281
+ * single one would do.
1282
+ *
1282
1283
* Must be called with WALWriteLock held.
1283
1284
*/
1284
1285
static void
1285
- XLogWrite (XLogwrtRqst WriteRqst )
1286
+ XLogWrite (XLogwrtRqst WriteRqst , bool flexible )
1286
1287
{
1287
1288
XLogCtlWrite * Write = & XLogCtl -> Write ;
1288
1289
bool ispartialpage ;
1290
+ bool finishing_seg ;
1289
1291
bool use_existent ;
1290
- int currentIndex = Write -> curridx ;
1291
- XLogPages pages ;
1292
+ int curridx ;
1293
+ int npages ;
1294
+ int startidx ;
1295
+ uint32 startoffset ;
1292
1296
1293
1297
/* We should always be inside a critical section here */
1294
1298
Assert (CritSectionCount > 0 );
@@ -1299,7 +1303,27 @@ XLogWrite(XLogwrtRqst WriteRqst)
1299
1303
*/
1300
1304
LogwrtResult = Write -> LogwrtResult ;
1301
1305
1302
- XLogPageReset (& pages );
1306
+ /*
1307
+ * Since successive pages in the xlog cache are consecutively allocated,
1308
+ * we can usually gather multiple pages together and issue just one
1309
+ * write() call. npages is the number of pages we have determined can
1310
+ * be written together; startidx is the cache block index of the first
1311
+ * one, and startoffset is the file offset at which it should go.
1312
+ * The latter two variables are only valid when npages > 0, but we must
1313
+ * initialize all of them to keep the compiler quiet.
1314
+ */
1315
+ npages = 0 ;
1316
+ startidx = 0 ;
1317
+ startoffset = 0 ;
1318
+
1319
+ /*
1320
+ * Within the loop, curridx is the cache block index of the page to
1321
+ * consider writing. We advance Write->curridx only after successfully
1322
+ * writing pages. (Right now, this refinement is useless since we are
1323
+ * going to PANIC if any error occurs anyway; but someday it may come
1324
+ * in useful.)
1325
+ */
1326
+ curridx = Write -> curridx ;
1303
1327
1304
1328
while (XLByteLT (LogwrtResult .Write ,WriteRqst .Write ))
1305
1329
{
@@ -1309,22 +1333,23 @@ XLogWrite(XLogwrtRqst WriteRqst)
1309
1333
* end of the last page that's been initialized by
1310
1334
* AdvanceXLInsertBuffer.
1311
1335
*/
1312
- if (!XLByteLT (LogwrtResult .Write ,XLogCtl -> xlblocks [currentIndex ]))
1336
+ if (!XLByteLT (LogwrtResult .Write ,XLogCtl -> xlblocks [curridx ]))
1313
1337
elog (PANIC ,"xlog write request %X/%X is past end of log %X/%X" ,
1314
1338
LogwrtResult .Write .xlogid ,LogwrtResult .Write .xrecoff ,
1315
- XLogCtl -> xlblocks [currentIndex ].xlogid ,
1316
- XLogCtl -> xlblocks [currentIndex ].xrecoff );
1339
+ XLogCtl -> xlblocks [curridx ].xlogid ,
1340
+ XLogCtl -> xlblocks [curridx ].xrecoff );
1317
1341
1318
1342
/* Advance LogwrtResult.Write to end of current buffer page */
1319
- LogwrtResult .Write = XLogCtl -> xlblocks [currentIndex ];
1343
+ LogwrtResult .Write = XLogCtl -> xlblocks [curridx ];
1320
1344
ispartialpage = XLByteLT (WriteRqst .Write ,LogwrtResult .Write );
1321
1345
1322
1346
if (!XLByteInPrevSeg (LogwrtResult .Write ,openLogId ,openLogSeg ))
1323
1347
{
1324
1348
/*
1325
- * Switch to new logfile segment.
1349
+ * Switch to new logfile segment. We cannot have any pending
1350
+ * pages here (since we dump what we have at segment end).
1326
1351
*/
1327
- XLogPageFlush ( & pages , currentIndex );
1352
+ Assert ( npages == 0 );
1328
1353
if (openLogFile >=0 )
1329
1354
{
1330
1355
if (close (openLogFile ))
@@ -1391,34 +1416,91 @@ XLogWrite(XLogwrtRqst WriteRqst)
1391
1416
LWLockRelease (ControlFileLock );
1392
1417
}
1393
1418
1419
+ /* Make sure we have the current logfile open */
1394
1420
if (openLogFile < 0 )
1395
1421
{
1396
1422
XLByteToPrevSeg (LogwrtResult .Write ,openLogId ,openLogSeg );
1397
1423
openLogFile = XLogFileOpen (openLogId ,openLogSeg );
1398
1424
openLogOff = 0 ;
1399
1425
}
1400
1426
1401
- /* Add a page to buffer */
1402
- XLogPageWrite (& pages ,currentIndex );
1427
+ /* Add current page to the set of pending pages-to-dump */
1428
+ if (npages == 0 )
1429
+ {
1430
+ /* first of group */
1431
+ startidx = curridx ;
1432
+ startoffset = (LogwrtResult .Write .xrecoff - BLCKSZ ) %XLogSegSize ;
1433
+ }
1434
+ npages ++ ;
1403
1435
1404
1436
/*
1405
- * If we just wrote the whole last page of a logfile segment,
1406
- * fsync the segment immediately. This avoids having to go back
1407
- * and re-open prior segments when an fsync request comes along
1408
- * later. Doing it here ensures that one and only one backend will
1409
- * perform this fsync.
1410
- *
1411
- * This is also the right place to notify the Archiver that the
1412
- * segment is ready to copy to archival storage.
1437
+ * Dump the set if this will be the last loop iteration, or if
1438
+ * we are at the last page of the cache area (since the next page
1439
+ * won't be contiguous in memory), or if we are at the end of the
1440
+ * logfile segment.
1413
1441
*/
1414
- if (openLogOff + pages .size >=XLogSegSize && !ispartialpage )
1442
+ finishing_seg = !ispartialpage &&
1443
+ (startoffset + npages * BLCKSZ ) >=XLogSegSize ;
1444
+
1445
+ if (!XLByteLT (LogwrtResult .Write ,WriteRqst .Write )||
1446
+ curridx == XLogCtl -> XLogCacheBlck ||
1447
+ finishing_seg )
1415
1448
{
1416
- XLogPageFlush (& pages ,currentIndex );
1417
- issue_xlog_fsync ();
1418
- LogwrtResult .Flush = LogwrtResult .Write ;/* end of current page */
1449
+ char * from ;
1450
+ Size nbytes ;
1419
1451
1420
- if (XLogArchivingActive ())
1421
- XLogArchiveNotifySeg (openLogId ,openLogSeg );
1452
+ /* Need to seek in the file? */
1453
+ if (openLogOff != startoffset )
1454
+ {
1455
+ if (lseek (openLogFile , (off_t )startoffset ,SEEK_SET )< 0 )
1456
+ ereport (PANIC ,
1457
+ (errcode_for_file_access (),
1458
+ errmsg ("could not seek in log file %u, "
1459
+ "segment %u to offset %u: %m" ,
1460
+ openLogId ,openLogSeg ,startoffset )));
1461
+ openLogOff = startoffset ;
1462
+ }
1463
+
1464
+ /* OK to write the page(s) */
1465
+ from = XLogCtl -> pages + startidx * (Size )BLCKSZ ;
1466
+ nbytes = npages * (Size )BLCKSZ ;
1467
+ errno = 0 ;
1468
+ if (write (openLogFile ,from ,nbytes )!= nbytes )
1469
+ {
1470
+ /* if write didn't set errno, assume no disk space */
1471
+ if (errno == 0 )
1472
+ errno = ENOSPC ;
1473
+ ereport (PANIC ,
1474
+ (errcode_for_file_access (),
1475
+ errmsg ("could not write to log file %u, segment %u "
1476
+ "at offset %u length %lu: %m" ,
1477
+ openLogId ,openLogSeg ,
1478
+ openLogOff , (unsigned long )nbytes )));
1479
+ }
1480
+
1481
+ /* Update state for write */
1482
+ openLogOff += nbytes ;
1483
+ Write -> curridx = ispartialpage ?curridx :NextBufIdx (curridx );
1484
+ npages = 0 ;
1485
+
1486
+ /*
1487
+ * If we just wrote the whole last page of a logfile segment,
1488
+ * fsync the segment immediately. This avoids having to go back
1489
+ * and re-open prior segments when an fsync request comes along
1490
+ * later. Doing it here ensures that one and only one backend will
1491
+ * perform this fsync.
1492
+ *
1493
+ * This is also the right place to notify the Archiver that the
1494
+ * segment is ready to copy to archival storage.
1495
+ */
1496
+ if (finishing_seg )
1497
+ {
1498
+ issue_xlog_fsync ();
1499
+ LogwrtResult .Flush = LogwrtResult .Write ;/* end of page */
1500
+
1501
+ if (XLogArchivingActive ())
1502
+ XLogArchiveNotifySeg (openLogId ,openLogSeg );
1503
+ }
1422
1504
}
1423
1505
1424
1506
if (ispartialpage )
@@ -1427,9 +1509,15 @@ XLogWrite(XLogwrtRqst WriteRqst)
1427
1509
LogwrtResult .Write = WriteRqst .Write ;
1428
1510
break ;
1429
1511
}
1430
- currentIndex = NextBufIdx (currentIndex );
1512
+ curridx = NextBufIdx (curridx );
1513
+
1514
+ /* If flexible, break out of loop as soon as we wrote something */
1515
+ if (flexible && npages == 0 )
1516
+ break ;
1431
1517
}
1432
- XLogPageFlush (& pages ,currentIndex );
1518
+
1519
+ Assert (npages == 0 );
1520
+ Assert (curridx == Write -> curridx );
1433
1521
1434
1522
/*
1435
1523
* If asked to flush, do so
@@ -1572,7 +1660,7 @@ XLogFlush(XLogRecPtr record)
1572
1660
WriteRqst .Write = WriteRqstPtr ;
1573
1661
WriteRqst .Flush = record ;
1574
1662
}
1575
- XLogWrite (WriteRqst );
1663
+ XLogWrite (WriteRqst , false );
1576
1664
}
1577
1665
LWLockRelease (WALWriteLock );
1578
1666
}
@@ -5898,72 +5986,3 @@ remove_backup_label(void)
5898
5986
errmsg ("could not remove file \"%s\": %m" ,
5899
5987
BACKUP_LABEL_FILE )));
5900
5988
}
5901
-
5902
-
5903
- /* XLog gather-write stuff */
5904
-
5905
- static void
5906
- XLogPageReset (XLogPages * pages )
5907
- {
5908
- memset (pages ,0 ,sizeof (* pages ));
5909
- }
5910
-
5911
- static void
5912
- XLogPageWrite (XLogPages * pages ,int index )
5913
- {
5914
- char * page = XLogCtl -> pages + index * (Size )BLCKSZ ;
5915
- Size size = BLCKSZ ;
5916
- uint32 offset = (LogwrtResult .Write .xrecoff - BLCKSZ ) %XLogSegSize ;
5917
-
5918
- if (pages -> head + pages -> size == page &&
5919
- pages -> offset + pages -> size == offset )
5920
- {/* Pages are continuous. Append new page. */
5921
- pages -> size += size ;
5922
- }
5923
- else
5924
- {/* Pages are not continuous. Flush and clear. */
5925
- XLogPageFlush (pages ,PrevBufIdx (index ));
5926
- pages -> head = page ;
5927
- pages -> size = size ;
5928
- pages -> offset = offset ;
5929
- }
5930
- }
5931
-
5932
- static void
5933
- XLogPageFlush (XLogPages * pages ,int index )
5934
- {
5935
- if (!pages -> head )
5936
- {/* Nothing to write */
5937
- XLogCtl -> Write .curridx = index ;
5938
- return ;
5939
- }
5940
-
5941
- /* Need to seek in the file? */
5942
- if (openLogOff != pages -> offset )
5943
- {
5944
- openLogOff = pages -> offset ;
5945
- if (lseek (openLogFile , (off_t )openLogOff ,SEEK_SET )< 0 )
5946
- ereport (PANIC ,
5947
- (errcode_for_file_access (),
5948
- errmsg ("could not seek in log file %u, segment %u to offset %u: %m" ,
5949
- openLogId ,openLogSeg ,openLogOff )));
5950
- }
5951
-
5952
- /* OK to write the page */
5953
- errno = 0 ;
5954
- if (write (openLogFile ,pages -> head ,pages -> size )!= pages -> size )
5955
- {
5956
- /* if write didn't set errno, assume problem is no disk space */
5957
- if (errno == 0 )
5958
- errno = ENOSPC ;
5959
- ereport (PANIC ,
5960
- (errcode_for_file_access (),
5961
- errmsg ("could not write to log file %u, segment %u length %u at offset %u: %m" ,
5962
- openLogId ,openLogSeg ,
5963
- (unsignedint )pages -> size ,openLogOff )));
5964
- }
5965
-
5966
- openLogOff += pages -> size ;
5967
- XLogCtl -> Write .curridx = index ;
5968
- XLogPageReset (pages );
5969
- }