Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit74cbe96

Browse files
committed
Refactor pg_receivexlog main loop code, for readability.
Previously the source codes for receiving the data and forpolling the socket were included in pg_receivexlog main loop.This commit splits out them as separate functions. This isuseful for improving the readability of main loop code andmaking the future pg_receivexlog-related patch simpler.
1 parent644d853 commit74cbe96

File tree

1 file changed

+142
-67
lines changed

1 file changed

+142
-67
lines changed

‎src/bin/pg_basebackup/receivelog.c

Lines changed: 142 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
3535
uint32timeline,char*basedir,
3636
stream_stop_callbackstream_stop,intstandby_message_timeout,
3737
char*partial_suffix,XLogRecPtr*stoppos);
38+
staticintCopyStreamPoll(PGconn*conn,longtimeout_ms);
39+
staticintCopyStreamReceive(PGconn*conn,longtimeout,char**buffer);
3840

3941
staticboolReadEndOfStreamingResult(PGresult*res,XLogRecPtr*startpos,
4042
uint32*timeline);
@@ -744,12 +746,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
744746
intbytes_written;
745747
int64now;
746748
inthdr_len;
747-
748-
if (copybuf!=NULL)
749-
{
750-
PQfreemem(copybuf);
751-
copybuf=NULL;
752-
}
749+
longsleeptime;
753750

754751
/*
755752
* Check if we should continue streaming, or abort at this point.
@@ -784,67 +781,38 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
784781
last_status=now;
785782
}
786783

787-
r=PQgetCopyData(conn,&copybuf,1);
788-
if (r==0)
784+
/*
785+
* Compute how long send/receive loops should sleep
786+
*/
787+
if (standby_message_timeout&&still_sending)
789788
{
790-
/*
791-
* No data available. Wait for some to appear, but not longer than
792-
* the specified timeout, so that we can ping the server.
793-
*/
794-
fd_setinput_mask;
795-
structtimevaltimeout;
796-
structtimeval*timeoutptr;
797-
798-
FD_ZERO(&input_mask);
799-
FD_SET(PQsocket(conn),&input_mask);
800-
if (standby_message_timeout&&still_sending)
789+
int64targettime;
790+
longsecs;
791+
intusecs;
792+
793+
targettime=last_status+ (standby_message_timeout-1)* ((int64)1000);
794+
feTimestampDifference(now,
795+
targettime,
796+
&secs,
797+
&usecs);
798+
/* Always sleep at least 1 sec */
799+
if (secs <=0)
801800
{
802-
int64targettime;
803-
longsecs;
804-
intusecs;
805-
806-
targettime=last_status+ (standby_message_timeout-1)* ((int64)1000);
807-
feTimestampDifference(now,
808-
targettime,
809-
&secs,
810-
&usecs);
811-
if (secs <=0)
812-
timeout.tv_sec=1;/* Always sleep at least 1 sec */
813-
else
814-
timeout.tv_sec=secs;
815-
timeout.tv_usec=usecs;
816-
timeoutptr=&timeout;
801+
secs=1;
802+
usecs=0;
817803
}
818-
else
819-
timeoutptr=NULL;
820804

821-
r=select(PQsocket(conn)+1,&input_mask,NULL,NULL,timeoutptr);
822-
if (r==0|| (r<0&&errno==EINTR))
823-
{
824-
/*
825-
* Got a timeout or signal. Continue the loop and either
826-
* deliver a status packet to the server or just go back into
827-
* blocking.
828-
*/
829-
continue;
830-
}
831-
elseif (r<0)
832-
{
833-
fprintf(stderr,_("%s: select() failed: %s\n"),
834-
progname,strerror(errno));
835-
gotoerror;
836-
}
837-
/* Else there is actually data on the socket */
838-
if (PQconsumeInput(conn)==0)
839-
{
840-
fprintf(stderr,
841-
_("%s: could not receive data from WAL stream: %s"),
842-
progname,PQerrorMessage(conn));
843-
gotoerror;
844-
}
845-
continue;
805+
sleeptime=secs*1000+usecs /1000;
846806
}
807+
else
808+
sleeptime=-1;
809+
810+
r=CopyStreamReceive(conn,sleeptime,&copybuf);
811+
if (r==0)
812+
continue;
847813
if (r==-1)
814+
gotoerror;
815+
if (r==-2)
848816
{
849817
PGresult*res=PQgetResult(conn);
850818

@@ -877,15 +845,10 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
877845
}
878846
if (copybuf!=NULL)
879847
PQfreemem(copybuf);
848+
copybuf=NULL;
880849
*stoppos=blockpos;
881850
returnres;
882851
}
883-
if (r==-2)
884-
{
885-
fprintf(stderr,_("%s: could not read COPY data: %s"),
886-
progname,PQerrorMessage(conn));
887-
gotoerror;
888-
}
889852

890853
/* Check the message type. */
891854
if (copybuf[0]=='k')
@@ -1056,3 +1019,115 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
10561019
PQfreemem(copybuf);
10571020
returnNULL;
10581021
}
1022+
1023+
/*
1024+
* Wait until we can read CopyData message, or timeout.
1025+
*
1026+
* Returns 1 if data has become available for reading, 0 if timed out
1027+
* or interrupted by signal, and -1 on an error.
1028+
*/
1029+
staticint
1030+
CopyStreamPoll(PGconn*conn,longtimeout_ms)
1031+
{
1032+
intret;
1033+
fd_setinput_mask;
1034+
structtimevaltimeout;
1035+
structtimeval*timeoutptr;
1036+
1037+
if (PQsocket(conn)<0)
1038+
{
1039+
fprintf(stderr,_("%s: socket not open"),progname);
1040+
return-1;
1041+
}
1042+
1043+
FD_ZERO(&input_mask);
1044+
FD_SET(PQsocket(conn),&input_mask);
1045+
1046+
if (timeout_ms<0)
1047+
timeoutptr=NULL;
1048+
else
1049+
{
1050+
timeout.tv_sec=timeout_ms /1000L;
1051+
timeout.tv_usec= (timeout_ms %1000L)*1000L;
1052+
timeoutptr=&timeout;
1053+
}
1054+
1055+
ret=select(PQsocket(conn)+1,&input_mask,NULL,NULL,timeoutptr);
1056+
if (ret==0|| (ret<0&&errno==EINTR))
1057+
return0;/* Got a timeout or signal */
1058+
elseif (ret<0)
1059+
{
1060+
fprintf(stderr,_("%s: select() failed: %s\n"),
1061+
progname,strerror(errno));
1062+
return-1;
1063+
}
1064+
1065+
return1;
1066+
}
1067+
1068+
/*
1069+
* Receive CopyData message available from XLOG stream, blocking for
1070+
* maximum of 'timeout' ms.
1071+
*
1072+
* If data was received, returns the length of the data. *buffer is set to
1073+
* point to a buffer holding the received message. The buffer is only valid
1074+
* until the next CopyStreamReceive call.
1075+
*
1076+
* 0 if no data was available within timeout, or wait was interrupted
1077+
* by signal. -1 on error. -2 if the server ended the COPY.
1078+
*/
1079+
staticint
1080+
CopyStreamReceive(PGconn*conn,longtimeout,char**buffer)
1081+
{
1082+
staticchar*copybuf=NULL;
1083+
intrawlen;
1084+
1085+
if (copybuf!=NULL)
1086+
PQfreemem(copybuf);
1087+
copybuf=NULL;
1088+
*buffer=NULL;
1089+
1090+
/* Try to receive a CopyData message */
1091+
rawlen=PQgetCopyData(conn,&copybuf,1);
1092+
if (rawlen==0)
1093+
{
1094+
/*
1095+
* No data available. Wait for some to appear, but not longer than
1096+
* the specified timeout, so that we can ping the server.
1097+
*/
1098+
if (timeout>0)
1099+
{
1100+
intret;
1101+
1102+
ret=CopyStreamPoll(conn,timeout);
1103+
if (ret <=0)
1104+
returnret;
1105+
}
1106+
1107+
/* Else there is actually data on the socket */
1108+
if (PQconsumeInput(conn)==0)
1109+
{
1110+
fprintf(stderr,
1111+
_("%s: could not receive data from WAL stream: %s"),
1112+
progname,PQerrorMessage(conn));
1113+
return-1;
1114+
}
1115+
1116+
/* Now that we've consumed some input, try again */
1117+
rawlen=PQgetCopyData(conn,&copybuf,1);
1118+
if (rawlen==0)
1119+
return0;
1120+
}
1121+
if (rawlen==-1)/* end-of-streaming or error */
1122+
return-2;
1123+
if (rawlen==-2)
1124+
{
1125+
fprintf(stderr,_("%s: could not read COPY data: %s"),
1126+
progname,PQerrorMessage(conn));
1127+
return-1;
1128+
}
1129+
1130+
/* Return received messages to caller */
1131+
*buffer=copybuf;
1132+
returnrawlen;
1133+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp