Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit314cbfc

Browse files
committed
Add new replication mode synchronous_commit = 'remote_apply'.
In this mode, the master waits for the transaction to be applied onthe remote side, not just written to disk. That means that you cancount on a transaction started on the standby to see all commitspreviously acknowledged by the master.To make this work, the standby sends a reply after replaying eachcommit record generated with synchronous_commit >= 'remote_apply'.This introduces a small inefficiency: the extra replies will be senteven by standbys that aren't the current synchronous standby. Butpreviously-existing synchronous_commit levels make no attempt at allto optimize which replies are sent based on what the primary caresabout, so this is no worse, and at least avoids any extra replies forpeople not using the feature at all.Thomas Munro, reviewed by Michael Paquier and by me. Some additionaltweaks by me.
1 parenta898b40 commit314cbfc

File tree

16 files changed

+209
-67
lines changed

16 files changed

+209
-67
lines changed

‎doc/src/sgml/config.sgml

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2143,8 +2143,8 @@ include_dir 'conf.d'
21432143
Specifies whether transaction commit will wait for WAL records
21442144
to be written to disk before the command returns a <quote>success</>
21452145
indication to the client. Valid values are <literal>on</>,
2146-
<literal>remote_write</>, <literal>local</>,and<literal>off</>.
2147-
The default, and safe, setting
2146+
<literal>remote_write</>, <literal>remote_apply</>, <literal>local</>,
2147+
and <literal>off</>.The default, and safe, setting
21482148
is <literal>on</>. When <literal>off</>, there can be a delay between
21492149
when success is reported to the client and when the transaction is
21502150
really guaranteed to be safe against a server crash. (The maximum
@@ -2169,6 +2169,10 @@ include_dir 'conf.d'
21692169
the commit record of the transaction and flushed it to disk. This
21702170
ensures the transaction will not be lost unless both primary and
21712171
standby suffer corruption of their database storage.
2172+
When set to <literal>remote_apply</>, commits will wait until a reply
2173+
from the current synchronous standby indicates it has received the
2174+
commit record of the transaction and applied it, so that it has become
2175+
visible to queries.
21722176
When set to <literal>remote_write</>, commits will wait
21732177
until a reply from the current synchronous standby indicates it has
21742178
received the commit record of the transaction and written it out to
@@ -2186,9 +2190,9 @@ include_dir 'conf.d'
21862190
setting <literal>local</> is available for transactions that
21872191
wish to wait for local flush to disk, but not synchronous replication.
21882192
If <varname>synchronous_standby_names</> is not set, the settings
2189-
<literal>on</>, <literal>remote_write</> and<literal>local</> all
2190-
provide the same synchronization level: transaction commits only wait
2191-
for local flush to disk.
2193+
<literal>on</>, <literal>remote_apply</>,<literal>remote_write</>
2194+
and <literal>local</> allprovide the same synchronization level:
2195+
transaction commits only waitfor local flush to disk.
21922196
</para>
21932197
<para>
21942198
This parameter can be changed at any time; the behavior for any

‎doc/src/sgml/high-availability.sgml

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1081,6 +1081,9 @@ primary_slot_name = 'node_a_slot'
10811081
WAL record is then sent to the standby. The standby sends reply
10821082
messages each time a new batch of WAL data is written to disk, unless
10831083
<varname>wal_receiver_status_interval</> is set to zero on the standby.
1084+
In the case that <varname>synchronous_commit</> is set to
1085+
<literal>remote_apply</>, the standby sends reply messages when the commit
1086+
record is replayed, making the transaction visible.
10841087
If the standby is the first matching standby, as specified in
10851088
<varname>synchronous_standby_names</> on the primary, the reply
10861089
messages from that standby will be used to wake users waiting for
@@ -1106,6 +1109,14 @@ primary_slot_name = 'node_a_slot'
11061109
the database of the primary gets corrupted at the same time.
11071110
</para>
11081111

1112+
<para>
1113+
Setting <varname>synchronous_commit</> to <literal>remote_apply</> will
1114+
cause each commit to wait until the current synchronous standby reports
1115+
that it has replayed the transaction, making it visible to user queries.
1116+
In simple cases, this allows for load balancing with causal consistency
1117+
on a single hot standby.
1118+
</para>
1119+
11091120
<para>
11101121
Users will stop waiting if a fast shutdown is requested. However, as
11111122
when using asynchronous replication, the server will not fully
@@ -1160,9 +1171,10 @@ primary_slot_name = 'node_a_slot'
11601171
<title>Planning for High Availability</title>
11611172

11621173
<para>
1163-
Commits made when <varname>synchronous_commit</> is set to <literal>on</>
1164-
or <literal>remote_write</> will wait until the synchronous standby responds. The response
1165-
may never occur if the last, or only, standby should crash.
1174+
Commits made when <varname>synchronous_commit</> is set to <literal>on</>,
1175+
<literal>remote_apply</> or <literal>remote_write</> will wait until the
1176+
synchronous standby responds. The response may never occur if the last, or
1177+
only, standby should crash.
11661178
</para>
11671179

11681180
<para>

‎src/backend/access/transam/twophase.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1107,7 +1107,7 @@ EndPrepare(GlobalTransaction gxact)
11071107
* Note that at this stage we have marked the prepare, but still show as
11081108
* running in the procarray (twice!) and continue to hold locks.
11091109
*/
1110-
SyncRepWaitForLSN(gxact->prepare_end_lsn);
1110+
SyncRepWaitForLSN(gxact->prepare_end_lsn, false);
11111111

11121112
records.tail=records.head=NULL;
11131113
records.num_chunks=0;
@@ -2103,7 +2103,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
21032103
* Note that at this stage we have marked clog, but still show as running
21042104
* in the procarray and continue to hold locks.
21052105
*/
2106-
SyncRepWaitForLSN(recptr);
2106+
SyncRepWaitForLSN(recptr, true);
21072107
}
21082108

21092109
/*
@@ -2156,5 +2156,5 @@ RecordTransactionAbortPrepared(TransactionId xid,
21562156
* Note that at this stage we have marked clog, but still show as running
21572157
* in the procarray and continue to hold locks.
21582158
*/
2159-
SyncRepWaitForLSN(recptr);
2159+
SyncRepWaitForLSN(recptr, false);
21602160
}

‎src/backend/access/transam/xact.c

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1324,7 +1324,7 @@ RecordTransactionCommit(void)
13241324
* in the procarray and continue to hold locks.
13251325
*/
13261326
if (wrote_xlog&&markXidCommitted)
1327-
SyncRepWaitForLSN(XactLastRecEnd);
1327+
SyncRepWaitForLSN(XactLastRecEnd, true);
13281328

13291329
/* remember end of last commit record */
13301330
XactLastCommitEnd=XactLastRecEnd;
@@ -5122,6 +5122,13 @@ XactLogCommitRecord(TimestampTz commit_time,
51225122
if (forceSyncCommit)
51235123
xl_xinfo.xinfo |=XACT_COMPLETION_FORCE_SYNC_COMMIT;
51245124

5125+
/*
5126+
* Check if the caller would like to ask standbys for immediate feedback
5127+
* once this commit is applied.
5128+
*/
5129+
if (synchronous_commit >=SYNCHRONOUS_COMMIT_REMOTE_APPLY)
5130+
xl_xinfo.xinfo |=XACT_COMPLETION_APPLY_FEEDBACK;
5131+
51255132
/*
51265133
* Relcache invalidations requires information about the current database
51275134
* and so does logical decoding.
@@ -5459,6 +5466,13 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
54595466
if (XactCompletionForceSyncCommit(parsed->xinfo))
54605467
XLogFlush(lsn);
54615468

5469+
/*
5470+
* If asked by the primary (because someone is waiting for a synchronous
5471+
* commit = remote_apply), we will need to ask walreceiver to send a
5472+
* reply immediately.
5473+
*/
5474+
if (XactCompletionApplyFeedback(parsed->xinfo))
5475+
XLogRequestWalReceiverReply();
54625476
}
54635477

54645478
/*

‎src/backend/access/transam/xlog.c

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,9 @@ static XLogRecPtr RedoRecPtr;
345345
*/
346346
staticbooldoPageWrites;
347347

348+
/* Has the recovery code requested a walreceiver wakeup? */
349+
staticbooldoRequestWalReceiverReply;
350+
348351
/*
349352
* RedoStartLSN points to the checkpoint's REDO location which is specified
350353
* in a backup label file, backup history file or control file. In standby
@@ -6879,6 +6882,17 @@ StartupXLOG(void)
68796882
XLogCtl->lastReplayedTLI=ThisTimeLineID;
68806883
SpinLockRelease(&XLogCtl->info_lck);
68816884

6885+
/*
6886+
* If rm_redo called XLogRequestWalReceiverReply, then we
6887+
* wake up the receiver so that it notices the updated
6888+
* lastReplayedEndRecPtr and sends a reply to the master.
6889+
*/
6890+
if (doRequestWalReceiverReply)
6891+
{
6892+
doRequestWalReceiverReply= false;
6893+
WalRcvForceReply();
6894+
}
6895+
68826896
/* Remember this record as the last-applied one */
68836897
LastRec=ReadRecPtr;
68846898

@@ -11594,3 +11608,12 @@ SetWalWriterSleeping(bool sleeping)
1159411608
XLogCtl->WalWriterSleeping=sleeping;
1159511609
SpinLockRelease(&XLogCtl->info_lck);
1159611610
}
11611+
11612+
/*
11613+
* Schedule a walreceiver wakeup in the main recovery loop.
11614+
*/
11615+
void
11616+
XLogRequestWalReceiverReply(void)
11617+
{
11618+
doRequestWalReceiverReply= true;
11619+
}

‎src/backend/replication/README

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,16 @@ bool walrcv_connect(char *conninfo, XLogRecPtr startpoint)
1616
Establish connection to the primary, and starts streaming from 'startpoint'.
1717
Returns true on success.
1818

19-
bool walrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
20-
21-
Retrieve any message available through the connection, blocking for
22-
maximum of 'timeout' ms. If a message was successfully read, returns true,
23-
otherwise false. On success, a pointer to the message payload is stored in
24-
*buffer, length in *len, and the type of message received in *type. The
25-
returned buffer is valid until the next call to walrcv_* functions, the
26-
caller should not attempt freeing it.
19+
int walrcv_receive(char **buffer, int *wait_fd)
20+
21+
Retrieve any message available without blocking through the
22+
connection. If a message was successfully read, returns its
23+
length. If the connection is closed, returns -1. Otherwise returns 0
24+
to indicate that no data is available, and sets *wait_fd to a file
25+
descriptor which can be waited on before trying again. On success, a
26+
pointer to the message payload is stored in *buffer. The returned
27+
buffer is valid until the next call to walrcv_* functions, and the
28+
caller should not attempt to free it.
2729

2830
void walrcv_send(const char *buffer, int nbytes)
2931

‎src/backend/replication/libpqwalreceiver/libpqwalreceiver.c

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ static void libpqrcv_readtimelinehistoryfile(TimeLineID tli, char **filename, ch
5252
staticboollibpqrcv_startstreaming(TimeLineIDtli,XLogRecPtrstartpoint,
5353
char*slotname);
5454
staticvoidlibpqrcv_endstreaming(TimeLineID*next_tli);
55-
staticintlibpqrcv_receive(inttimeout,char**buffer);
55+
staticintlibpqrcv_receive(char**buffer,int*wait_fd);
5656
staticvoidlibpqrcv_send(constchar*buffer,intnbytes);
5757
staticvoidlibpqrcv_disconnect(void);
5858

@@ -463,24 +463,23 @@ libpqrcv_disconnect(void)
463463
}
464464

465465
/*
466-
* Receive a message available from XLOG stream, blocking for
467-
* maximum of 'timeout' ms.
466+
* Receive a message available from XLOG stream.
468467
*
469468
* Returns:
470469
*
471470
* If data was received, returns the length of the data. *buffer is set to
472471
* point to a buffer holding the received message. The buffer is only valid
473472
* until the next libpqrcv_* call.
474473
*
475-
*0 ifno data was availablewithin timeout, or wait was interrupted
476-
*by signal.
474+
*Ifno data was availableimmediately, returns 0, and *wait_fd is set to a
475+
*file descriptor which can be waited on before trying again.
477476
*
478477
* -1 if the server ended the COPY.
479478
*
480479
* ereports on error.
481480
*/
482481
staticint
483-
libpqrcv_receive(inttimeout,char**buffer)
482+
libpqrcv_receive(char**buffer,int*wait_fd)
484483
{
485484
intrawlen;
486485

@@ -492,16 +491,7 @@ libpqrcv_receive(int timeout, char **buffer)
492491
rawlen=PQgetCopyData(streamConn,&recvBuf,1);
493492
if (rawlen==0)
494493
{
495-
/*
496-
* No data available yet. If the caller requested to block, wait for
497-
* more data to arrive.
498-
*/
499-
if (timeout>0)
500-
{
501-
if (!libpq_select(timeout))
502-
return0;
503-
}
504-
494+
/* Try consuming some data. */
505495
if (PQconsumeInput(streamConn)==0)
506496
ereport(ERROR,
507497
(errmsg("could not receive data from WAL stream: %s",
@@ -510,7 +500,11 @@ libpqrcv_receive(int timeout, char **buffer)
510500
/* Now that we've consumed some input, try again */
511501
rawlen=PQgetCopyData(streamConn,&recvBuf,1);
512502
if (rawlen==0)
503+
{
504+
/* Tell caller to try again when our socket is ready. */
505+
*wait_fd=PQsocket(streamConn);
513506
return0;
507+
}
514508
}
515509
if (rawlen==-1)/* end-of-streaming or error */
516510
{

‎src/backend/replication/syncrep.c

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -91,13 +91,24 @@ static bool SyncRepQueueIsOrderedByLSN(int mode);
9191
* to the wait queue. During SyncRepWakeQueue() a WALSender changes
9292
* the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
9393
* This backend then resets its state to SYNC_REP_NOT_WAITING.
94+
*
95+
* 'lsn' represents the LSN to wait for. 'commit' indicates whether this LSN
96+
* represents a commit record. If it doesn't, then we wait only for the WAL
97+
* to be flushed if synchronous_commit is set to the higher level of
98+
* remote_apply, because only commit records provide apply feedback.
9499
*/
95100
void
96-
SyncRepWaitForLSN(XLogRecPtrXactCommitLSN)
101+
SyncRepWaitForLSN(XLogRecPtrlsn,boolcommit)
97102
{
98103
char*new_status=NULL;
99104
constchar*old_status;
100-
intmode=SyncRepWaitMode;
105+
intmode;
106+
107+
/* Cap the level for anything other than commit to remote flush only. */
108+
if (commit)
109+
mode=SyncRepWaitMode;
110+
else
111+
mode=Min(SyncRepWaitMode,SYNC_REP_WAIT_FLUSH);
101112

102113
/*
103114
* Fast exit if user has not requested sync replication, or there are no
@@ -122,7 +133,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
122133
* to be a low cost check.
123134
*/
124135
if (!WalSndCtl->sync_standbys_defined||
125-
XactCommitLSN <=WalSndCtl->lsn[mode])
136+
lsn <=WalSndCtl->lsn[mode])
126137
{
127138
LWLockRelease(SyncRepLock);
128139
return;
@@ -132,7 +143,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
132143
* Set our waitLSN so WALSender will know when to wake us, and add
133144
* ourselves to the queue.
134145
*/
135-
MyProc->waitLSN=XactCommitLSN;
146+
MyProc->waitLSN=lsn;
136147
MyProc->syncRepState=SYNC_REP_WAITING;
137148
SyncRepQueueInsert(mode);
138149
Assert(SyncRepQueueIsOrderedByLSN(mode));
@@ -147,7 +158,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
147158
new_status= (char*)palloc(len+32+1);
148159
memcpy(new_status,old_status,len);
149160
sprintf(new_status+len," waiting for %X/%X",
150-
(uint32) (XactCommitLSN >>32), (uint32)XactCommitLSN);
161+
(uint32) (lsn >>32), (uint32)lsn);
151162
set_ps_display(new_status, false);
152163
new_status[len]='\0';/* truncate off " waiting ..." */
153164
}
@@ -416,6 +427,7 @@ SyncRepReleaseWaiters(void)
416427
WalSnd*syncWalSnd;
417428
intnumwrite=0;
418429
intnumflush=0;
430+
intnumapply=0;
419431

420432
/*
421433
* If this WALSender is serving a standby that is not on the list of
@@ -462,12 +474,18 @@ SyncRepReleaseWaiters(void)
462474
walsndctl->lsn[SYNC_REP_WAIT_FLUSH]=MyWalSnd->flush;
463475
numflush=SyncRepWakeQueue(false,SYNC_REP_WAIT_FLUSH);
464476
}
477+
if (walsndctl->lsn[SYNC_REP_WAIT_APPLY]<MyWalSnd->apply)
478+
{
479+
walsndctl->lsn[SYNC_REP_WAIT_APPLY]=MyWalSnd->apply;
480+
numapply=SyncRepWakeQueue(false,SYNC_REP_WAIT_APPLY);
481+
}
465482

466483
LWLockRelease(SyncRepLock);
467484

468-
elog(DEBUG3,"released %d procs up to write %X/%X, %d procs up to flush %X/%X",
485+
elog(DEBUG3,"released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%x",
469486
numwrite, (uint32) (MyWalSnd->write >>32), (uint32)MyWalSnd->write,
470-
numflush, (uint32) (MyWalSnd->flush >>32), (uint32)MyWalSnd->flush);
487+
numflush, (uint32) (MyWalSnd->flush >>32), (uint32)MyWalSnd->flush,
488+
numapply, (uint32) (MyWalSnd->apply >>32), (uint32)MyWalSnd->apply);
471489

472490
/*
473491
* If we are managing the highest priority standby, though we weren't
@@ -728,6 +746,9 @@ assign_synchronous_commit(int newval, void *extra)
728746
caseSYNCHRONOUS_COMMIT_REMOTE_FLUSH:
729747
SyncRepWaitMode=SYNC_REP_WAIT_FLUSH;
730748
break;
749+
caseSYNCHRONOUS_COMMIT_REMOTE_APPLY:
750+
SyncRepWaitMode=SYNC_REP_WAIT_APPLY;
751+
break;
731752
default:
732753
SyncRepWaitMode=SYNC_REP_NO_WAIT;
733754
break;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp