Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit6230912

Browse files
committed
Use FeBeWaitSet for walsender.c.
This avoids the need to set up and tear down a fresh WaitEventSet everytime we need need to wait. We have to add an explicit exit onpostmaster exit (FeBeWaitSet isn't set up to do that automatically), somove the code to do that into a new function to avoid repetition.Reviewed-by: Kyotaro Horiguchi <horikyota.ntt@gmail.com> (earlier version)Discussion:https://postgr.es/m/CA%2BhUKGJAC4Oqao%3DqforhNey20J8CiG2R%3DoBPqvfR0vOJrFysGw%40mail.gmail.com
1 parenta042ba2 commit6230912

File tree

1 file changed

+25
-18
lines changed

1 file changed

+25
-18
lines changed

‎src/backend/replication/walsender.c

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,7 @@ static void WalSndKeepalive(bool requestReply);
244244
staticvoidWalSndKeepaliveIfNecessary(void);
245245
staticvoidWalSndCheckTimeOut(void);
246246
staticlongWalSndComputeSleeptime(TimestampTznow);
247+
staticvoidWalSndWait(uint32socket_events,longtimeout,uint32wait_event);
247248
staticvoidWalSndPrepareWrite(LogicalDecodingContext*ctx,XLogRecPtrlsn,TransactionIdxid,boollast_write);
248249
staticvoidWalSndWriteData(LogicalDecodingContext*ctx,XLogRecPtrlsn,TransactionIdxid,boollast_write);
249250
staticvoidWalSndUpdateProgress(LogicalDecodingContext*ctx,XLogRecPtrlsn,TransactionIdxid);
@@ -1287,7 +1288,6 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
12871288
/* If we have pending write here, go to slow path */
12881289
for (;;)
12891290
{
1290-
intwakeEvents;
12911291
longsleeptime;
12921292

12931293
/* Check for input from the client */
@@ -1304,13 +1304,9 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
13041304

13051305
sleeptime=WalSndComputeSleeptime(GetCurrentTimestamp());
13061306

1307-
wakeEvents=WL_LATCH_SET |WL_EXIT_ON_PM_DEATH |
1308-
WL_SOCKET_WRITEABLE |WL_SOCKET_READABLE |WL_TIMEOUT;
1309-
13101307
/* Sleep until something happens or we time out */
1311-
(void)WaitLatchOrSocket(MyLatch,wakeEvents,
1312-
MyProcPort->sock,sleeptime,
1313-
WAIT_EVENT_WAL_SENDER_WRITE_DATA);
1308+
WalSndWait(WL_SOCKET_WRITEABLE |WL_SOCKET_READABLE,sleeptime,
1309+
WAIT_EVENT_WAL_SENDER_WRITE_DATA);
13141310

13151311
/* Clear any already-pending wakeups */
13161312
ResetLatch(MyLatch);
@@ -1480,15 +1476,12 @@ WalSndWaitForWal(XLogRecPtr loc)
14801476
*/
14811477
sleeptime=WalSndComputeSleeptime(GetCurrentTimestamp());
14821478

1483-
wakeEvents=WL_LATCH_SET |WL_EXIT_ON_PM_DEATH |
1484-
WL_SOCKET_READABLE |WL_TIMEOUT;
1479+
wakeEvents=WL_SOCKET_READABLE;
14851480

14861481
if (pq_is_send_pending())
14871482
wakeEvents |=WL_SOCKET_WRITEABLE;
14881483

1489-
(void)WaitLatchOrSocket(MyLatch,wakeEvents,
1490-
MyProcPort->sock,sleeptime,
1491-
WAIT_EVENT_WAL_SENDER_WAIT_WAL);
1484+
WalSndWait(wakeEvents,sleeptime,WAIT_EVENT_WAL_SENDER_WAIT_WAL);
14921485
}
14931486

14941487
/* reactivate latch so WalSndLoop knows to continue */
@@ -2348,10 +2341,10 @@ WalSndLoop(WalSndSendDataCallback send_data)
23482341
longsleeptime;
23492342
intwakeEvents;
23502343

2351-
wakeEvents=WL_LATCH_SET |WL_EXIT_ON_PM_DEATH |WL_TIMEOUT;
2352-
23532344
if (!streamingDoneReceiving)
2354-
wakeEvents |=WL_SOCKET_READABLE;
2345+
wakeEvents=WL_SOCKET_READABLE;
2346+
else
2347+
wakeEvents=0;
23552348

23562349
/*
23572350
* Use fresh timestamp, not last_processing, to reduce the chance
@@ -2363,9 +2356,7 @@ WalSndLoop(WalSndSendDataCallback send_data)
23632356
wakeEvents |=WL_SOCKET_WRITEABLE;
23642357

23652358
/* Sleep until something happens or we time out */
2366-
(void)WaitLatchOrSocket(MyLatch,wakeEvents,
2367-
MyProcPort->sock,sleeptime,
2368-
WAIT_EVENT_WAL_SENDER_MAIN);
2359+
WalSndWait(wakeEvents,sleeptime,WAIT_EVENT_WAL_SENDER_MAIN);
23692360
}
23702361
}
23712362
}
@@ -3121,6 +3112,22 @@ WalSndWakeup(void)
31213112
}
31223113
}
31233114

3115+
/*
3116+
* Wait for readiness on the FeBe socket, or a timeout. The mask should be
3117+
* composed of optional WL_SOCKET_WRITEABLE and WL_SOCKET_READABLE flags. Exit
3118+
* on postmaster death.
3119+
*/
3120+
staticvoid
3121+
WalSndWait(uint32socket_events,longtimeout,uint32wait_event)
3122+
{
3123+
WaitEventevent;
3124+
3125+
ModifyWaitEvent(FeBeWaitSet,FeBeWaitSetSocketPos,socket_events,NULL);
3126+
if (WaitEventSetWait(FeBeWaitSet,timeout,&event,1,wait_event)==1&&
3127+
(event.events&WL_POSTMASTER_DEATH))
3128+
proc_exit(1);
3129+
}
3130+
31243131
/*
31253132
* Signal all walsenders to move to stopping state.
31263133
*

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp