@@ -648,6 +648,8 @@ static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
648648
649649static bool XLogCheckBuffer (XLogRecData * rdata ,bool doPageWrites ,
650650XLogRecPtr * lsn ,BkpBlock * bkpb );
651+ static Buffer RestoreBackupBlockContents (XLogRecPtr lsn ,BkpBlock bkpb ,
652+ char * blk ,bool get_cleanup_lock ,bool keep_buffer );
651653static bool AdvanceXLInsertBuffer (bool new_segment );
652654static bool XLogCheckpointNeeded (XLogSegNo new_segno );
653655static void XLogWrite (XLogwrtRqst WriteRqst ,bool flexible ,bool xlog_switch );
@@ -731,7 +733,6 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
731733bool updrqst ;
732734bool doPageWrites ;
733735bool isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH );
734- bool isHint = (rmid == RM_XLOG_ID && info == XLOG_HINT );
735736uint8 info_orig = info ;
736737static XLogRecord * rechdr ;
737738
@@ -1001,18 +1002,6 @@ begin:;
10011002gotobegin ;
10021003}
10031004
1004- /*
1005- * If this is a hint record and we don't need a backup block then
1006- * we have no more work to do and can exit quickly without inserting
1007- * a WAL record at all. In that case return InvalidXLogRecPtr.
1008- */
1009- if (isHint && !(info & XLR_BKP_BLOCK_MASK ))
1010- {
1011- LWLockRelease (WALInsertLock );
1012- END_CRIT_SECTION ();
1013- return InvalidXLogRecPtr ;
1014- }
1015-
10161005/*
10171006 * If the current page is completely full, the record goes to the next
10181007 * page, right after the page header.
@@ -3158,8 +3147,6 @@ Buffer
31583147RestoreBackupBlock (XLogRecPtr lsn ,XLogRecord * record ,int block_index ,
31593148bool get_cleanup_lock ,bool keep_buffer )
31603149{
3161- Buffer buffer ;
3162- Page page ;
31633150BkpBlock bkpb ;
31643151char * blk ;
31653152int i ;
@@ -3177,42 +3164,8 @@ RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record, int block_index,
31773164if (i == block_index )
31783165{
31793166/* Found it, apply the update */
3180- buffer = XLogReadBufferExtended (bkpb .node ,bkpb .fork ,bkpb .block ,
3181- RBM_ZERO );
3182- Assert (BufferIsValid (buffer ));
3183- if (get_cleanup_lock )
3184- LockBufferForCleanup (buffer );
3185- else
3186- LockBuffer (buffer ,BUFFER_LOCK_EXCLUSIVE );
3187-
3188- page = (Page )BufferGetPage (buffer );
3189-
3190- if (bkpb .hole_length == 0 )
3191- {
3192- memcpy ((char * )page ,blk ,BLCKSZ );
3193- }
3194- else
3195- {
3196- memcpy ((char * )page ,blk ,bkpb .hole_offset );
3197- /* must zero-fill the hole */
3198- MemSet ((char * )page + bkpb .hole_offset ,0 ,bkpb .hole_length );
3199- memcpy ((char * )page + (bkpb .hole_offset + bkpb .hole_length ),
3200- blk + bkpb .hole_offset ,
3201- BLCKSZ - (bkpb .hole_offset + bkpb .hole_length ));
3202- }
3203-
3204- /*
3205- * Any checksum set on this page will be invalid. We don't need
3206- * to reset it here since it will be set before being written.
3207- */
3208-
3209- PageSetLSN (page ,lsn );
3210- MarkBufferDirty (buffer );
3211-
3212- if (!keep_buffer )
3213- UnlockReleaseBuffer (buffer );
3214-
3215- return buffer ;
3167+ return RestoreBackupBlockContents (lsn ,bkpb ,blk ,get_cleanup_lock ,
3168+ keep_buffer );
32163169}
32173170
32183171blk += BLCKSZ - bkpb .hole_length ;
@@ -3223,6 +3176,56 @@ RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record, int block_index,
32233176return InvalidBuffer ;/* keep compiler quiet */
32243177}
32253178
3179+ /*
3180+ * Workhorse for RestoreBackupBlock usable without an xlog record
3181+ *
3182+ * Restores a full-page image from BkpBlock and a data pointer.
3183+ */
3184+ static Buffer
3185+ RestoreBackupBlockContents (XLogRecPtr lsn ,BkpBlock bkpb ,char * blk ,
3186+ bool get_cleanup_lock ,bool keep_buffer )
3187+ {
3188+ Buffer buffer ;
3189+ Page page ;
3190+
3191+ buffer = XLogReadBufferExtended (bkpb .node ,bkpb .fork ,bkpb .block ,
3192+ RBM_ZERO );
3193+ Assert (BufferIsValid (buffer ));
3194+ if (get_cleanup_lock )
3195+ LockBufferForCleanup (buffer );
3196+ else
3197+ LockBuffer (buffer ,BUFFER_LOCK_EXCLUSIVE );
3198+
3199+ page = (Page )BufferGetPage (buffer );
3200+
3201+ if (bkpb .hole_length == 0 )
3202+ {
3203+ memcpy ((char * )page ,blk ,BLCKSZ );
3204+ }
3205+ else
3206+ {
3207+ memcpy ((char * )page ,blk ,bkpb .hole_offset );
3208+ /* must zero-fill the hole */
3209+ MemSet ((char * )page + bkpb .hole_offset ,0 ,bkpb .hole_length );
3210+ memcpy ((char * )page + (bkpb .hole_offset + bkpb .hole_length ),
3211+ blk + bkpb .hole_offset ,
3212+ BLCKSZ - (bkpb .hole_offset + bkpb .hole_length ));
3213+ }
3214+
3215+ /*
3216+ * The checksum value on this page is currently invalid. We don't
3217+ * need to reset it here since it will be set before being written.
3218+ */
3219+
3220+ PageSetLSN (page ,lsn );
3221+ MarkBufferDirty (buffer );
3222+
3223+ if (!keep_buffer )
3224+ UnlockReleaseBuffer (buffer );
3225+
3226+ return buffer ;
3227+ }
3228+
32263229/*
32273230 * Attempt to read an XLOG record.
32283231 *
@@ -7635,45 +7638,86 @@ XLogRestorePoint(const char *rpName)
76357638 * Write a backup block if needed when we are setting a hint. Note that
76367639 * this may be called for a variety of page types, not just heaps.
76377640 *
7638- * Deciding the "if needed" part is delicate and requires us to either
7639- * grab WALInsertLock or check the info_lck spinlock. If we check the
7640- * spinlock and it says Yes then we will need to get WALInsertLock as well,
7641- * so the design choice here is to just go straight for the WALInsertLock
7642- * and trust that calls to this function are minimised elsewhere.
7643- *
76447641 * Callable while holding just share lock on the buffer content.
76457642 *
7646- * Possible that multiple concurrent backends could attempt to write
7647- * WAL records. In that case, more than one backup block may be recorded
7648- * though that isn't important to the outcome and the backup blocks are
7649- * likely to be identical anyway.
7643+ * We can't use the plain backup block mechanism since that relies on the
7644+ * Buffer being exclusively locked. Since some modifications (setting LSN, hint
7645+ * bits) are allowed in a sharelocked buffer that can lead to wal checksum
7646+ * failures. So instead we copy the page and insert the copied data as normal
7647+ * record data.
7648+ *
7649+ * We only need to do something if page has not yet been full page written in
7650+ * this checkpoint round. The LSN of the inserted wal record is returned if we
7651+ * had to write, InvalidXLogRecPtr otherwise.
7652+ *
7653+ * It is possible that multiple concurrent backends could attempt to write WAL
7654+ * records. In that case, multiple copies of the same block would be recorded
7655+ * in separate WAL records by different backends, though that is still OK from
7656+ * a correctness perspective.
7657+ *
7658+ * Note that this only works for buffers that fit the standard page model,
7659+ * i.e. those for which buffer_std == true
76507660 */
7651- #define XLOG_HINT_WATERMARK 13579
76527661XLogRecPtr
76537662XLogSaveBufferForHint (Buffer buffer )
76547663{
7664+ XLogRecPtr recptr = InvalidXLogRecPtr ;
7665+ XLogRecPtr lsn ;
7666+ XLogRecData rdata [2 ];
7667+ BkpBlock bkpb ;
7668+
76557669/*
7656- *Make an XLOG entry reporting the hint
7670+ *Ensure no checkpoint can change our view of RedoRecPtr.
76577671 */
7658- XLogRecData rdata [2 ];
7659- int watermark = XLOG_HINT_WATERMARK ;
7672+ Assert (MyPgXact -> delayChkpt );
7673+
7674+ /*
7675+ * Update RedoRecPtr so XLogCheckBuffer can make the right decision
7676+ */
7677+ GetRedoRecPtr ();
76607678
76617679/*
7662- * Not allowed to have zero-length records, so use a small watermark
7680+ * Setup phony rdata element for use within XLogCheckBuffer only.
7681+ * We reuse and reset rdata for any actual WAL record insert.
76637682 */
7664- rdata [0 ].data = (char * ) (& watermark );
7665- rdata [0 ].len = sizeof (int );
7666- rdata [0 ].buffer = InvalidBuffer ;
7667- rdata [0 ].buffer_std = false;
7668- rdata [0 ].next = & (rdata [1 ]);
7683+ rdata [0 ].buffer = buffer ;
7684+ rdata [0 ].buffer_std = true;
7685+
7686+ if (XLogCheckBuffer (rdata , true,& lsn ,& bkpb ))
7687+ {
7688+ char copied_buffer [BLCKSZ ];
7689+ char * origdata = (char * )BufferGetBlock (buffer );
7690+
7691+ /*
7692+ * Copy buffer so we don't have to worry about concurrent hint bit or
7693+ * lsn updates. We assume pd_lower/upper cannot be changed without an
7694+ * exclusive lock, so the contents bkp are not racy.
7695+ */
7696+ memcpy (copied_buffer ,origdata ,bkpb .hole_offset );
7697+ memcpy (copied_buffer + bkpb .hole_offset ,
7698+ origdata + bkpb .hole_offset + bkpb .hole_length ,
7699+ BLCKSZ - bkpb .hole_offset - bkpb .hole_length );
7700+
7701+ /*
7702+ * Header for backup block.
7703+ */
7704+ rdata [0 ].data = (char * )& bkpb ;
7705+ rdata [0 ].len = sizeof (BkpBlock );
7706+ rdata [0 ].buffer = InvalidBuffer ;
7707+ rdata [0 ].next = & (rdata [1 ]);
76697708
7670- rdata [1 ].data = NULL ;
7671- rdata [1 ].len = 0 ;
7672- rdata [1 ].buffer = buffer ;
7673- rdata [1 ].buffer_std = true;
7674- rdata [1 ].next = NULL ;
7709+ /*
7710+ * Save copy of the buffer.
7711+ */
7712+ rdata [1 ].data = copied_buffer ;
7713+ rdata [1 ].len = BLCKSZ - bkpb .hole_length ;
7714+ rdata [1 ].buffer = InvalidBuffer ;
7715+ rdata [1 ].next = NULL ;
76757716
7676- return XLogInsert (RM_XLOG_ID ,XLOG_HINT ,rdata );
7717+ recptr = XLogInsert (RM_XLOG_ID ,XLOG_HINT ,rdata );
7718+ }
7719+
7720+ return recptr ;
76777721}
76787722
76797723/*
@@ -7842,8 +7886,8 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
78427886{
78437887uint8 info = record -> xl_info & ~XLR_INFO_MASK ;
78447888
7845- /* Backup blocks are not usedin most xlog records */
7846- Assert (info == XLOG_HINT || !(record -> xl_info & XLR_BKP_BLOCK_MASK ));
7889+ /* Backup blocks are not usedby XLOG rmgr */
7890+ Assert (!(record -> xl_info & XLR_BKP_BLOCK_MASK ));
78477891
78487892if (info == XLOG_NEXTOID )
78497893{
@@ -8038,31 +8082,27 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
80388082}
80398083else if (info == XLOG_HINT )
80408084{
8041- #ifdef USE_ASSERT_CHECKING
8042- int * watermark = (int * )XLogRecGetData (record );
8043- #endif
8044-
8045- /* Check the watermark is correct for the hint record */
8046- Assert (* watermark == XLOG_HINT_WATERMARK );
8047-
8048- /* Backup blocks must be present for smgr hint records */
8049- Assert (record -> xl_info & XLR_BKP_BLOCK_MASK );
8085+ char * data ;
8086+ BkpBlock bkpb ;
80508087
80518088/*
8052- * Hint records have no information that needs to be replayed.
8053- * The sole purpose of them is to ensure that a hint bit does
8054- * not cause a checksum invalidation if a hint bit write should
8055- * cause a torn page. So the body of the record is empty but
8056- * there must be one backup block.
8089+ * Hint bit records contain a backup block stored "inline" in the normal
8090+ * data since the locking when writing hint records isn't sufficient to
8091+ * use the normal backup block mechanism, which assumes exclusive lock
8092+ * on the buffer supplied.
80578093 *
8058- * Since the only change inthe backup blockis a hintbit,
8059- *there is noconfict with Hot Standby .
8094+ * Since the only change inthese backup blockare hintbits, there are
8095+ * norecovery conflicts generated .
80608096 *
80618097 * This also means there is no corresponding API call for this,
80628098 * so an smgr implementation has no need to implement anything.
80638099 * Which means nothing is needed in md.c etc
80648100 */
8065- RestoreBackupBlock (lsn ,record ,0 , false, false);
8101+ data = XLogRecGetData (record );
8102+ memcpy (& bkpb ,data ,sizeof (BkpBlock ));
8103+ data += sizeof (BkpBlock );
8104+
8105+ RestoreBackupBlockContents (lsn ,bkpb ,data , false, false);
80668106}
80678107else if (info == XLOG_BACKUP_END )
80688108{