Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitc3430a6

Browse files
committed
[PBCKP-270] fetch current log position directly from streamer.
We have streamer in our hand, so we may ask it directly, what are itstreaming now.But in case we looks into previous segment, we fallback to log readingsince it is already flushed.
1 parent3e75df6 commitc3430a6

File tree

5 files changed

+74
-33
lines changed

5 files changed

+74
-33
lines changed

‎src/backup.c

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1304,6 +1304,50 @@ wait_wal_lsn(const char *wal_segment_dir, XLogRecPtr target_lsn, bool is_start_l
13041304
elog(LOG,"Looking for LSN %X/%X in segment: %s",
13051305
(uint32) (target_lsn >>32), (uint32)target_lsn,wal_segment);
13061306

1307+
if (in_stream_dir&& !in_prev_segment)
1308+
{
1309+
/* separate simple loop for streaming */
1310+
for (;;)
1311+
{
1312+
TimeLineIDcurtli;
1313+
XLogRecPtrcurptr;
1314+
XLogRecPtrprevptr;
1315+
1316+
getCurrentStreamPosition(&curtli,&curptr,&prevptr);
1317+
if (curtli>tli|| (curtli==tli&&curptr>target_lsn))
1318+
returntarget_lsn;
1319+
1320+
sleep(1);
1321+
if (interrupted||thread_interrupted)
1322+
elog(ERROR,"Interrupted during waiting for WAL streaming");
1323+
try_count++;
1324+
1325+
/* Inform user if WAL segment is absent in first attempt */
1326+
if (try_count==1)
1327+
{
1328+
if (segment_only)
1329+
elog(INFO,"Wait for WAL segment %s to be %s",
1330+
wal_segment_path,wal_delivery_str);
1331+
else
1332+
elog(INFO,"Wait for LSN %X/%X in %s WAL segment %s",
1333+
(uint32) (target_lsn >>32), (uint32)target_lsn,
1334+
wal_delivery_str,wal_segment_path);
1335+
}
1336+
1337+
if (current.from_replica&&
1338+
(XRecOffIsNull(target_lsn)||try_count>timeout /2))
1339+
{
1340+
if (!XLogRecPtrIsInvalid(prevptr))
1341+
{
1342+
/* LSN of the prior record was found */
1343+
elog(LOG,"Abuse prior LSN from stream: %X/%X",
1344+
(uint32) (prevptr >>32), (uint32)prevptr);
1345+
returnprevptr;
1346+
}
1347+
}
1348+
}
1349+
}
1350+
13071351
#ifdefHAVE_LIBZ
13081352
snprintf(gz_wal_segment_path,sizeof(gz_wal_segment_path),"%s.gz",
13091353
wal_segment_path);
@@ -1327,16 +1371,6 @@ wait_wal_lsn(const char *wal_segment_dir, XLogRecPtr target_lsn, bool is_start_l
13271371
}
13281372
else
13291373
elog(LOG,"Found WAL segment: %s",wal_segment_path);
1330-
1331-
1332-
/* Check current file for stream. It may be not exist in S3 */
1333-
if (!file_exists&&segment_only&&is_start_lsn&&in_stream_dir&&try_count>1)
1334-
{
1335-
if(isStreamProccessed(wal_segment))
1336-
returnInvalidXLogRecPtr;
1337-
1338-
}
1339-
13401374
}
13411375

13421376
if (file_exists)

‎src/compatibility/receivelog.c

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -495,6 +495,8 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
495495
* responsibility that that's sane.
496496
*/
497497
lastFlushPosition=stream->startpos;
498+
stream->currentpos=0;
499+
stream->prevpos=0;
498500

499501
while (1)
500502
{
@@ -779,7 +781,10 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
779781
}
780782
elseif (copybuf[0]=='w')
781783
{
782-
if (!ProcessXLogDataMsg(conn,stream,copybuf,r,&blockpos))
784+
boolok=ProcessXLogDataMsg(conn,stream,copybuf,r,&blockpos);
785+
stream->prevpos=stream->currentpos;
786+
stream->currentpos=blockpos;
787+
if (!ok)
783788
gotoerror;
784789

785790
/*
@@ -1221,10 +1226,4 @@ CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout,
12211226
sleeptime=-1;
12221227

12231228
returnsleeptime;
1224-
}
1225-
1226-
1227-
boolisStreamProccessed(char*seg_filename)
1228-
{
1229-
returnstill_sending&& !strcmp(current_walfile_name,seg_filename);
12301229
}

‎src/compatibility/receivelog.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ typedef bool (*stream_stop_callback) (XLogRecPtr segendpos, uint32 timeline, boo
2929
typedefstructStreamCtl
3030
{
3131
XLogRecPtrstartpos;/* Start position for streaming */
32-
TimeLineIDtimeline;/* Timeline to stream data from */
32+
volatileXLogRecPtrcurrentpos;/* current position */
33+
volatileXLogRecPtrprevpos;/* current position */
34+
volatileTimeLineIDtimeline;/* Timeline to stream data from */
3335
char*sysidentifier;/* Validate this system identifier and
3436
* timeline */
3537
intstandby_message_timeout;/* Send status messages this often */

‎src/pg_probackup.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1221,7 +1221,7 @@ extern XLogRecPtr wait_wal_lsn(const char *wal_segment_dir, XLogRecPtr lsn, bool
12211221
externvoidwait_wal_and_calculate_stop_lsn(constchar*xlog_path,XLogRecPtrstop_lsn,pgBackup*backup);
12221222
externint64calculate_datasize_of_filelist(parray*filelist);
12231223

1224-
externboolisStreamProccessed(char*seg_filename);/* Checks, that this file is stream processing. File name without path. */
1224+
externvoidgetCurrentStreamPosition(TimeLineID*timeline,XLogRecPtr*ptr,XLogRecPtr*prev);
12251225

12261226
/*
12271227
* Slices and arrays for C strings

‎src/stream.c

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ static uint32 stream_stop_timeout = 0;
3737
/* Time in which we started to wait for streaming end */
3838
statictime_tstream_stop_begin=0;
3939

40+
staticStreamCtlstream_ctl= {0};
41+
4042
/*
4143
* We need to wait end of WAL streaming before execute pg_stop_backup().
4244
*/
@@ -234,32 +236,28 @@ StreamLog(void *arg)
234236
stream_arg->starttli);
235237

236238
{
237-
StreamCtlctl;
238-
239-
MemSet(&ctl,0,sizeof(ctl));
239+
stream_ctl.startpos=stream_arg->startpos;
240+
stream_ctl.timeline=stream_arg->starttli;
241+
stream_ctl.sysidentifier=NULL;
242+
stream_ctl.stream_stop=stop_streaming;
243+
stream_ctl.standby_message_timeout=standby_message_timeout;
240244

241-
ctl.startpos=stream_arg->startpos;
242-
ctl.timeline=stream_arg->starttli;
243-
ctl.sysidentifier=NULL;
244-
ctl.stream_stop=stop_streaming;
245-
ctl.standby_message_timeout=standby_message_timeout;
246-
247-
ctl.walmethod=CreateWalDirectoryMethod(
245+
stream_ctl.walmethod=CreateWalDirectoryMethod(
248246
stream_arg->basedir,
249247
0,
250248
false,
251249
pioDriveForLocation(FIO_BACKUP_HOST));
252250

253-
ctl.replication_slot=replication_slot;
254-
ctl.stop_socket=PGINVALID_SOCKET;
251+
stream_ctl.replication_slot=replication_slot;
252+
stream_ctl.stop_socket=PGINVALID_SOCKET;
255253

256-
if (ReceiveXlogStream(stream_arg->conn,&ctl)== false)
254+
if (ReceiveXlogStream(stream_arg->conn,&stream_ctl)== false)
257255
{
258256
interrupted= true;
259257
elog(ERROR,"Problem in receivexlog");
260258
}
261259

262-
if (!ctl.walmethod->finish())
260+
if (!stream_ctl.walmethod->finish())
263261
{
264262
interrupted= true;
265263
elog(ERROR,"Could not finish writing WAL files: %s",
@@ -704,3 +702,11 @@ add_history_file_to_filelist(parray *filelist, uint32 timeline, char *basedir)
704702
file=pgFileNew(fullpath,relpath, false,do_crc,drive);
705703
parray_append(filelist,file);
706704
}
705+
706+
void
707+
getCurrentStreamPosition(TimeLineID*timeline,XLogRecPtr*ptr,XLogRecPtr*prev)
708+
{
709+
*ptr=stream_ctl.currentpos;
710+
*prev=stream_ctl.prevpos;
711+
*timeline=stream_ctl.timeline;
712+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp