@@ -733,6 +733,18 @@ typedef struct XLogCtlData
733733XLogRecPtr lastFpwDisableRecPtr ;
734734
735735slock_t info_lck ;/* locks shared variables shown above */
736+
737+ /*
738+ * Variables used to track segment-boundary-crossing WAL records. See
739+ * RegisterSegmentBoundary. Protected by segtrack_lck.
740+ */
741+ XLogSegNo lastNotifiedSeg ;
742+ XLogSegNo earliestSegBoundary ;
743+ XLogRecPtr earliestSegBoundaryEndPtr ;
744+ XLogSegNo latestSegBoundary ;
745+ XLogRecPtr latestSegBoundaryEndPtr ;
746+
747+ slock_t segtrack_lck ;/* locks shared variables shown above */
736748}XLogCtlData ;
737749
738750static XLogCtlData * XLogCtl = NULL ;
@@ -931,6 +943,7 @@ static void RemoveXlogFile(const char *segname, XLogSegNo recycleSegNo,
931943XLogSegNo * endlogSegNo );
932944static void UpdateLastRemovedPtr (char * filename );
933945static void ValidateXLOGDirectoryStructure (void );
946+ static void RegisterSegmentBoundary (XLogSegNo seg ,XLogRecPtr pos );
934947static void CleanupBackupHistory (void );
935948static void UpdateMinRecoveryPoint (XLogRecPtr lsn ,bool force );
936949static XLogRecord * ReadRecord (XLogReaderState * xlogreader ,
@@ -1165,23 +1178,56 @@ XLogInsertRecord(XLogRecData *rdata,
11651178END_CRIT_SECTION ();
11661179
11671180/*
1168- * Update shared LogwrtRqst.Write, if we crossed page boundary.
1181+ * If we crossed page boundary, update LogwrtRqst.Write; if we crossed
1182+ * segment boundary, register that and wake up walwriter.
11691183 */
11701184if (StartPos /XLOG_BLCKSZ != EndPos /XLOG_BLCKSZ )
11711185{
1186+ XLogSegNo StartSeg ;
1187+ XLogSegNo EndSeg ;
1188+
1189+ XLByteToSeg (StartPos ,StartSeg ,wal_segment_size );
1190+ XLByteToSeg (EndPos ,EndSeg ,wal_segment_size );
1191+
1192+ /*
1193+ * Register our crossing the segment boundary if that occurred.
1194+ *
1195+ * Note that we did not use XLByteToPrevSeg() for determining the
1196+ * ending segment. This is so that a record that fits perfectly into
1197+ * the end of the segment causes the latter to get marked ready for
1198+ * archival immediately.
1199+ */
1200+ if (StartSeg != EndSeg && XLogArchivingActive ())
1201+ RegisterSegmentBoundary (EndSeg ,EndPos );
1202+
1203+ /*
1204+ * Advance LogwrtRqst.Write so that it includes new block(s).
1205+ *
1206+ * We do this after registering the segment boundary so that the
1207+ * comparison with the flushed pointer below can use the latest value
1208+ * known globally.
1209+ */
11721210SpinLockAcquire (& XLogCtl -> info_lck );
1173- /* advance global request to include new block(s) */
11741211if (XLogCtl -> LogwrtRqst .Write < EndPos )
11751212XLogCtl -> LogwrtRqst .Write = EndPos ;
11761213/* update local result copy while I have the chance */
11771214LogwrtResult = XLogCtl -> LogwrtResult ;
11781215SpinLockRelease (& XLogCtl -> info_lck );
1216+
1217+ /*
1218+ * There's a chance that the record was already flushed to disk and we
1219+ * missed marking segments as ready for archive. If this happens, we
1220+ * nudge the WALWriter, which will take care of notifying segments as
1221+ * needed.
1222+ */
1223+ if (StartSeg != EndSeg && XLogArchivingActive ()&&
1224+ LogwrtResult .Flush >=EndPos && ProcGlobal -> walwriterLatch )
1225+ SetLatch (ProcGlobal -> walwriterLatch );
11791226}
11801227
11811228/*
11821229 * If this was an XLOG_SWITCH record, flush the record and the empty
1183- * padding space that fills the rest of the segment, and perform
1184- * end-of-segment actions (eg, notifying archiver).
1230+ * padding space that fills the rest of the segment.
11851231 */
11861232if (isLogSwitch )
11871233{
@@ -2433,6 +2479,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
24332479
24342480/* We should always be inside a critical section here */
24352481Assert (CritSectionCount > 0 );
2482+ Assert (LWLockHeldByMe (WALWriteLock ));
24362483
24372484/*
24382485 * Update local LogwrtResult (caller probably did this already, but...)
@@ -2599,11 +2646,12 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
25992646 * later. Doing it here ensures that one and only one backend will
26002647 * perform this fsync.
26012648 *
2602- * This is also the right place to notify the Archiver that the
2603- * segment is ready to copy to archival storage, and to update the
2604- * timer for archive_timeout, and to signal for a checkpoint if
2605- * too many logfile segments have been used since the last
2606- * checkpoint.
2649+ * If WAL archiving is active, we attempt to notify the archiver
2650+ * of any segments that are now ready for archival.
2651+ *
2652+ * This is also the right place to update the timer for
2653+ * archive_timeout and to signal for a checkpoint if too many
2654+ * logfile segments have been used since the last checkpoint.
26072655 */
26082656if (finishing_seg )
26092657{
@@ -2615,7 +2663,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
26152663LogwrtResult .Flush = LogwrtResult .Write ;/* end of page */
26162664
26172665if (XLogArchivingActive ())
2618- XLogArchiveNotifySeg ( openLogSegNo );
2666+ NotifySegmentsReadyForArchive ( LogwrtResult . Flush );
26192667
26202668XLogCtl -> lastSegSwitchTime = (pg_time_t )time (NULL );
26212669XLogCtl -> lastSegSwitchLSN = LogwrtResult .Flush ;
@@ -2703,6 +2751,9 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
27032751XLogCtl -> LogwrtRqst .Flush = LogwrtResult .Flush ;
27042752SpinLockRelease (& XLogCtl -> info_lck );
27052753}
2754+
2755+ if (XLogArchivingActive ())
2756+ NotifySegmentsReadyForArchive (LogwrtResult .Flush );
27062757}
27072758
27082759/*
@@ -4324,6 +4375,129 @@ ValidateXLOGDirectoryStructure(void)
43244375}
43254376}
43264377
4378+ /*
4379+ * RegisterSegmentBoundary
4380+ *
4381+ * WAL records that are split across a segment boundary require special
4382+ * treatment for archiving: the initial segment must not be archived until
4383+ * the end segment has been flushed, in case we crash before we have
4384+ * the chance to flush the end segment (because after recovery we would
4385+ * overwrite that WAL record with a different one, and so the file we
4386+ * archived no longer represents truth.) This also applies to streaming
4387+ * physical replication.
4388+ *
4389+ * To handle this, we keep track of the LSN of WAL records that cross
4390+ * segment boundaries. Two such are sufficient: the ones with the
4391+ * earliest and the latest end pointers we know about, since the flush
4392+ * position advances monotonically. WAL record writers register
4393+ * boundary-crossing records here, which is used by .ready file creation
4394+ * to delay until the end segment is known flushed.
4395+ */
4396+ static void
4397+ RegisterSegmentBoundary (XLogSegNo seg ,XLogRecPtr endpos )
4398+ {
4399+ XLogSegNo segno PG_USED_FOR_ASSERTS_ONLY ;
4400+
4401+ /* verify caller computed segment number correctly */
4402+ AssertArg ((XLByteToSeg (endpos ,segno ,wal_segment_size ),segno == seg ));
4403+
4404+ SpinLockAcquire (& XLogCtl -> segtrack_lck );
4405+
4406+ /*
4407+ * If no segment boundaries are registered, store the new segment boundary
4408+ * in earliestSegBoundary. Otherwise, store the greater segment
4409+ * boundaries in latestSegBoundary.
4410+ */
4411+ if (XLogCtl -> earliestSegBoundary == MaxXLogSegNo )
4412+ {
4413+ XLogCtl -> earliestSegBoundary = seg ;
4414+ XLogCtl -> earliestSegBoundaryEndPtr = endpos ;
4415+ }
4416+ else if (seg > XLogCtl -> earliestSegBoundary &&
4417+ (XLogCtl -> latestSegBoundary == MaxXLogSegNo ||
4418+ seg > XLogCtl -> latestSegBoundary ))
4419+ {
4420+ XLogCtl -> latestSegBoundary = seg ;
4421+ XLogCtl -> latestSegBoundaryEndPtr = endpos ;
4422+ }
4423+
4424+ SpinLockRelease (& XLogCtl -> segtrack_lck );
4425+ }
4426+
4427+ /*
4428+ * NotifySegmentsReadyForArchive
4429+ *
4430+ * Mark segments as ready for archival, given that it is safe to do so.
4431+ * This function is idempotent.
4432+ */
4433+ void
4434+ NotifySegmentsReadyForArchive (XLogRecPtr flushRecPtr )
4435+ {
4436+ XLogSegNo latest_boundary_seg ;
4437+ XLogSegNo last_notified ;
4438+ XLogSegNo flushed_seg ;
4439+ XLogSegNo seg ;
4440+ bool keep_latest ;
4441+
4442+ XLByteToSeg (flushRecPtr ,flushed_seg ,wal_segment_size );
4443+
4444+ SpinLockAcquire (& XLogCtl -> segtrack_lck );
4445+
4446+ if (XLogCtl -> latestSegBoundary <=flushed_seg &&
4447+ XLogCtl -> latestSegBoundaryEndPtr <=flushRecPtr )
4448+ {
4449+ latest_boundary_seg = XLogCtl -> latestSegBoundary ;
4450+ keep_latest = false;
4451+ }
4452+ else if (XLogCtl -> earliestSegBoundary <=flushed_seg &&
4453+ XLogCtl -> earliestSegBoundaryEndPtr <=flushRecPtr )
4454+ {
4455+ latest_boundary_seg = XLogCtl -> earliestSegBoundary ;
4456+ keep_latest = true;
4457+ }
4458+ else
4459+ {
4460+ SpinLockRelease (& XLogCtl -> segtrack_lck );
4461+ return ;
4462+ }
4463+
4464+ last_notified = XLogCtl -> lastNotifiedSeg ;
4465+
4466+ /*
4467+ * Update shared memory and discard segment boundaries that are no longer
4468+ * needed.
4469+ *
4470+ * It is safe to update shared memory before we attempt to create the
4471+ * .ready files. If our calls to XLogArchiveNotifySeg() fail,
4472+ * RemoveOldXlogFiles() will retry it as needed.
4473+ */
4474+ if (last_notified < latest_boundary_seg - 1 )
4475+ XLogCtl -> lastNotifiedSeg = latest_boundary_seg - 1 ;
4476+
4477+ if (keep_latest )
4478+ {
4479+ XLogCtl -> earliestSegBoundary = XLogCtl -> latestSegBoundary ;
4480+ XLogCtl -> earliestSegBoundaryEndPtr = XLogCtl -> latestSegBoundaryEndPtr ;
4481+ }
4482+ else
4483+ {
4484+ XLogCtl -> earliestSegBoundary = MaxXLogSegNo ;
4485+ XLogCtl -> earliestSegBoundaryEndPtr = InvalidXLogRecPtr ;
4486+ }
4487+
4488+ XLogCtl -> latestSegBoundary = MaxXLogSegNo ;
4489+ XLogCtl -> latestSegBoundaryEndPtr = InvalidXLogRecPtr ;
4490+
4491+ SpinLockRelease (& XLogCtl -> segtrack_lck );
4492+
4493+ /*
4494+ * Notify archiver about segments that are ready for archival (by creating
4495+ * the corresponding .ready files).
4496+ */
4497+ for (seg = last_notified + 1 ;seg < latest_boundary_seg ;seg ++ )
4498+ XLogArchiveNotifySeg (seg );
4499+ }
4500+
43274501/*
43284502 * Remove previous backup history files. This also retries creation of
43294503 * .ready files for any backup history files for which XLogArchiveNotify
@@ -5225,9 +5399,17 @@ XLOGShmemInit(void)
52255399
52265400SpinLockInit (& XLogCtl -> Insert .insertpos_lck );
52275401SpinLockInit (& XLogCtl -> info_lck );
5402+ SpinLockInit (& XLogCtl -> segtrack_lck );
52285403SpinLockInit (& XLogCtl -> ulsn_lck );
52295404InitSharedLatch (& XLogCtl -> recoveryWakeupLatch );
52305405ConditionVariableInit (& XLogCtl -> recoveryNotPausedCV );
5406+
5407+ /* Initialize stuff for marking segments as ready for archival. */
5408+ XLogCtl -> lastNotifiedSeg = MaxXLogSegNo ;
5409+ XLogCtl -> earliestSegBoundary = MaxXLogSegNo ;
5410+ XLogCtl -> earliestSegBoundaryEndPtr = InvalidXLogRecPtr ;
5411+ XLogCtl -> latestSegBoundary = MaxXLogSegNo ;
5412+ XLogCtl -> latestSegBoundaryEndPtr = InvalidXLogRecPtr ;
52315413}
52325414
52335415/*
@@ -7858,6 +8040,20 @@ StartupXLOG(void)
78588040XLogCtl -> LogwrtRqst .Write = EndOfLog ;
78598041XLogCtl -> LogwrtRqst .Flush = EndOfLog ;
78608042
8043+ /*
8044+ * Initialize XLogCtl->lastNotifiedSeg to the previous WAL file.
8045+ */
8046+ if (XLogArchivingActive ())
8047+ {
8048+ XLogSegNo EndOfLogSeg ;
8049+
8050+ XLByteToSeg (EndOfLog ,EndOfLogSeg ,wal_segment_size );
8051+
8052+ SpinLockAcquire (& XLogCtl -> segtrack_lck );
8053+ XLogCtl -> lastNotifiedSeg = EndOfLogSeg - 1 ;
8054+ SpinLockRelease (& XLogCtl -> segtrack_lck );
8055+ }
8056+
78618057/*
78628058 * Update full_page_writes in shared memory and write an XLOG_FPW_CHANGE
78638059 * record before resource manager writes cleanup WAL records or checkpoint