Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit5737c12

Browse files
Report catalog_xmin separately in hot_standby_feedback
If the upstream walsender is using a physical replication slot, store thecatalog_xmin in the slot's catalog_xmin field. If the upstream doesn't use aslot and has only a PGPROC entry behaviour doesn't change, as we store thecombined xmin and catalog_xmin in the PGPROC entry.Author: Craig Ringer
1 parent4dd3abe commit5737c12

File tree

7 files changed

+199
-51
lines changed

7 files changed

+199
-51
lines changed

‎doc/src/sgml/protocol.sgml

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1916,10 +1916,11 @@ The commands accepted in walsender mode are:
19161916
</term>
19171917
<listitem>
19181918
<para>
1919-
The standby's current xmin. This may be 0, if the standby is
1920-
sending notification that Hot Standby feedback will no longer
1921-
be sent on this connection. Later non-zero messages may
1922-
reinitiate the feedback mechanism.
1919+
The standby's current global xmin, excluding the catalog_xmin from any
1920+
replication slots. If both this value and the following
1921+
catalog_xmin are 0 this is treated as a notification that Hot Standby
1922+
feedback will no longer be sent on this connection. Later non-zero
1923+
messages may reinitiate the feedback mechanism.
19231924
</para>
19241925
</listitem>
19251926
</varlistentry>
@@ -1929,7 +1930,29 @@ The commands accepted in walsender mode are:
19291930
</term>
19301931
<listitem>
19311932
<para>
1932-
The standby's current epoch.
1933+
The epoch of the global xmin xid on the standby.
1934+
</para>
1935+
</listitem>
1936+
</varlistentry>
1937+
<varlistentry>
1938+
<term>
1939+
Int32
1940+
</term>
1941+
<listitem>
1942+
<para>
1943+
The lowest catalog_xmin of any replication slots on the standby. Set to 0
1944+
if no catalog_xmin exists on the standby or if hot standby feedback is being
1945+
disabled.
1946+
</para>
1947+
</listitem>
1948+
</varlistentry>
1949+
<varlistentry>
1950+
<term>
1951+
Int32
1952+
</term>
1953+
<listitem>
1954+
<para>
1955+
The epoch of the catalog_xmin xid on the standby.
19331956
</para>
19341957
</listitem>
19351958
</varlistentry>

‎src/backend/replication/walreceiver.c

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1175,8 +1175,8 @@ XLogWalRcvSendHSFeedback(bool immed)
11751175
{
11761176
TimestampTznow;
11771177
TransactionIdnextXid;
1178-
uint32nextEpoch;
1179-
TransactionIdxmin;
1178+
uint32xmin_epoch,catalog_xmin_epoch;
1179+
TransactionIdxmin,catalog_xmin;
11801180
staticTimestampTzsendTime=0;
11811181
/* initially true so we always send at least one feedback message */
11821182
staticboolmaster_has_standby_xmin= true;
@@ -1221,29 +1221,54 @@ XLogWalRcvSendHSFeedback(bool immed)
12211221
* everything else has been checked.
12221222
*/
12231223
if (hot_standby_feedback)
1224-
xmin=GetOldestXmin(NULL,PROCARRAY_FLAGS_DEFAULT);
1224+
{
1225+
TransactionIdslot_xmin;
1226+
1227+
/*
1228+
* Usually GetOldestXmin() would include both global replication slot
1229+
* xmin and catalog_xmin in its calculations, but we want to derive
1230+
* separate values for each of those. So we ask for an xmin that
1231+
* excludes the catalog_xmin.
1232+
*/
1233+
xmin=GetOldestXmin(NULL,
1234+
PROCARRAY_FLAGS_DEFAULT|PROCARRAY_SLOTS_XMIN);
1235+
1236+
ProcArrayGetReplicationSlotXmin(&slot_xmin,&catalog_xmin);
1237+
1238+
if (TransactionIdIsValid(slot_xmin)&&
1239+
TransactionIdPrecedes(slot_xmin,xmin))
1240+
xmin=slot_xmin;
1241+
}
12251242
else
1243+
{
12261244
xmin=InvalidTransactionId;
1245+
catalog_xmin=InvalidTransactionId;
1246+
}
12271247

12281248
/*
12291249
* Get epoch and adjust if nextXid and oldestXmin are different sides of
12301250
* the epoch boundary.
12311251
*/
1232-
GetNextXidAndEpoch(&nextXid,&nextEpoch);
1252+
GetNextXidAndEpoch(&nextXid,&xmin_epoch);
1253+
catalog_xmin_epoch=xmin_epoch;
12331254
if (nextXid<xmin)
1234-
nextEpoch--;
1255+
xmin_epoch--;
1256+
if (nextXid<catalog_xmin)
1257+
catalog_xmin_epoch--;
12351258

1236-
elog(DEBUG2,"sending hot standby feedback xmin %u epoch %u",
1237-
xmin,nextEpoch);
1259+
elog(DEBUG2,"sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1260+
xmin,xmin_epoch,catalog_xmin,catalog_xmin_epoch);
12381261

12391262
/* Construct the message and send it. */
12401263
resetStringInfo(&reply_message);
12411264
pq_sendbyte(&reply_message,'h');
12421265
pq_sendint64(&reply_message,GetCurrentTimestamp());
12431266
pq_sendint(&reply_message,xmin,4);
1244-
pq_sendint(&reply_message,nextEpoch,4);
1267+
pq_sendint(&reply_message,xmin_epoch,4);
1268+
pq_sendint(&reply_message,catalog_xmin,4);
1269+
pq_sendint(&reply_message,catalog_xmin_epoch,4);
12451270
walrcv_send(wrconn,reply_message.data,reply_message.len);
1246-
if (TransactionIdIsValid(xmin))
1271+
if (TransactionIdIsValid(xmin)||TransactionIdIsValid(catalog_xmin))
12471272
master_has_standby_xmin= true;
12481273
else
12491274
master_has_standby_xmin= false;

‎src/backend/replication/walsender.c

Lines changed: 79 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,7 @@ static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, Tran
242242
staticvoidWalSndWriteData(LogicalDecodingContext*ctx,XLogRecPtrlsn,TransactionIdxid,boollast_write);
243243
staticXLogRecPtrWalSndWaitForWal(XLogRecPtrloc);
244244
staticTimeOffsetLagTrackerRead(inthead,XLogRecPtrlsn,TimestampTznow);
245+
staticboolTransactionIdInRecentPast(TransactionIdxid,uint32epoch);
245246

246247
staticvoidXLogRead(char*buf,XLogRecPtrstartptr,Sizecount);
247248

@@ -1756,7 +1757,7 @@ ProcessStandbyReplyMessage(void)
17561757

17571758
/* compute new replication slot xmin horizon if needed */
17581759
staticvoid
1759-
PhysicalReplicationSlotNewXmin(TransactionIdfeedbackXmin)
1760+
PhysicalReplicationSlotNewXmin(TransactionIdfeedbackXmin,TransactionIdfeedbackCatalogXmin)
17601761
{
17611762
boolchanged= false;
17621763
ReplicationSlot*slot=MyReplicationSlot;
@@ -1777,6 +1778,14 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
17771778
slot->data.xmin=feedbackXmin;
17781779
slot->effective_xmin=feedbackXmin;
17791780
}
1781+
if (!TransactionIdIsNormal(slot->data.catalog_xmin)||
1782+
!TransactionIdIsNormal(feedbackCatalogXmin)||
1783+
TransactionIdPrecedes(slot->data.catalog_xmin,feedbackCatalogXmin))
1784+
{
1785+
changed= true;
1786+
slot->data.catalog_xmin=feedbackCatalogXmin;
1787+
slot->effective_catalog_xmin=feedbackCatalogXmin;
1788+
}
17801789
SpinLockRelease(&slot->mutex);
17811790

17821791
if (changed)
@@ -1786,60 +1795,93 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
17861795
}
17871796
}
17881797

1798+
/*
1799+
* Check that the provided xmin/epoch are sane, that is, not in the future
1800+
* and not so far back as to be already wrapped around.
1801+
*
1802+
* Epoch of nextXid should be same as standby, or if the counter has
1803+
* wrapped, then one greater than standby.
1804+
*
1805+
* This check doesn't care about whether clog exists for these xids
1806+
* at all.
1807+
*/
1808+
staticbool
1809+
TransactionIdInRecentPast(TransactionIdxid,uint32epoch)
1810+
{
1811+
TransactionIdnextXid;
1812+
uint32nextEpoch;
1813+
1814+
GetNextXidAndEpoch(&nextXid,&nextEpoch);
1815+
1816+
if (xid <=nextXid)
1817+
{
1818+
if (epoch!=nextEpoch)
1819+
return false;
1820+
}
1821+
else
1822+
{
1823+
if (epoch+1!=nextEpoch)
1824+
return false;
1825+
}
1826+
1827+
if (!TransactionIdPrecedesOrEquals(xid,nextXid))
1828+
return false;/* epoch OK, but it's wrapped around */
1829+
1830+
return true;
1831+
}
1832+
17891833
/*
17901834
* Hot Standby feedback
17911835
*/
17921836
staticvoid
17931837
ProcessStandbyHSFeedbackMessage(void)
17941838
{
1795-
TransactionIdnextXid;
1796-
uint32nextEpoch;
17971839
TransactionIdfeedbackXmin;
17981840
uint32feedbackEpoch;
1841+
TransactionIdfeedbackCatalogXmin;
1842+
uint32feedbackCatalogEpoch;
17991843

18001844
/*
18011845
* Decipher the reply message. The caller already consumed the msgtype
1802-
* byte.
1846+
* byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
1847+
* of this message.
18031848
*/
18041849
(void)pq_getmsgint64(&reply_message);/* sendTime; not used ATM */
18051850
feedbackXmin=pq_getmsgint(&reply_message,4);
18061851
feedbackEpoch=pq_getmsgint(&reply_message,4);
1852+
feedbackCatalogXmin=pq_getmsgint(&reply_message,4);
1853+
feedbackCatalogEpoch=pq_getmsgint(&reply_message,4);
18071854

1808-
elog(DEBUG2,"hot standby feedback xmin %u epoch %u",
1855+
elog(DEBUG2,"hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u",
18091856
feedbackXmin,
1810-
feedbackEpoch);
1857+
feedbackEpoch,
1858+
feedbackCatalogXmin,
1859+
feedbackCatalogEpoch);
18111860

1812-
/* Unset WalSender's xmin if the feedback message value is invalid */
1813-
if (!TransactionIdIsNormal(feedbackXmin))
1861+
/*
1862+
* Unset WalSender's xmins if the feedback message values are invalid.
1863+
* This happens when the downstream turned hot_standby_feedback off.
1864+
*/
1865+
if (!TransactionIdIsNormal(feedbackXmin)
1866+
&& !TransactionIdIsNormal(feedbackCatalogXmin))
18141867
{
18151868
MyPgXact->xmin=InvalidTransactionId;
18161869
if (MyReplicationSlot!=NULL)
1817-
PhysicalReplicationSlotNewXmin(feedbackXmin);
1870+
PhysicalReplicationSlotNewXmin(feedbackXmin,feedbackCatalogXmin);
18181871
return;
18191872
}
18201873

18211874
/*
18221875
* Check that the provided xmin/epoch are sane, that is, not in the future
18231876
* and not so far back as to be already wrapped around. Ignore if not.
1824-
*
1825-
* Epoch of nextXid should be same as standby, or if the counter has
1826-
* wrapped, then one greater than standby.
18271877
*/
1828-
GetNextXidAndEpoch(&nextXid,&nextEpoch);
1829-
1830-
if (feedbackXmin <=nextXid)
1831-
{
1832-
if (feedbackEpoch!=nextEpoch)
1833-
return;
1834-
}
1835-
else
1836-
{
1837-
if (feedbackEpoch+1!=nextEpoch)
1838-
return;
1839-
}
1878+
if (TransactionIdIsNormal(feedbackXmin)&&
1879+
!TransactionIdInRecentPast(feedbackXmin,feedbackEpoch))
1880+
return;
18401881

1841-
if (!TransactionIdPrecedesOrEquals(feedbackXmin,nextXid))
1842-
return;/* epoch OK, but it's wrapped around */
1882+
if (TransactionIdIsNormal(feedbackCatalogXmin)&&
1883+
!TransactionIdInRecentPast(feedbackCatalogXmin,feedbackCatalogEpoch))
1884+
return;
18431885

18441886
/*
18451887
* Set the WalSender's xmin equal to the standby's requested xmin, so that
@@ -1864,15 +1906,23 @@ ProcessStandbyHSFeedbackMessage(void)
18641906
* already since a VACUUM could have just finished calling GetOldestXmin.)
18651907
*
18661908
* If we're using a replication slot we reserve the xmin via that,
1867-
* otherwise via the walsender's PGXACT entry.
1909+
* otherwise via the walsender's PGXACT entry. We can only track the
1910+
* catalog xmin separately when using a slot, so we store the least
1911+
* of the two provided when not using a slot.
18681912
*
18691913
* XXX: It might make sense to generalize the ephemeral slot concept and
18701914
* always use the slot mechanism to handle the feedback xmin.
18711915
*/
18721916
if (MyReplicationSlot!=NULL)/* XXX: persistency configurable? */
1873-
PhysicalReplicationSlotNewXmin(feedbackXmin);
1917+
PhysicalReplicationSlotNewXmin(feedbackXmin,feedbackCatalogXmin);
18741918
else
1875-
MyPgXact->xmin=feedbackXmin;
1919+
{
1920+
if (TransactionIdIsNormal(feedbackCatalogXmin)
1921+
&&TransactionIdPrecedes(feedbackCatalogXmin,feedbackXmin))
1922+
MyPgXact->xmin=feedbackCatalogXmin;
1923+
else
1924+
MyPgXact->xmin=feedbackXmin;
1925+
}
18761926
}
18771927

18781928
/*

‎src/backend/storage/ipc/procarray.c

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1264,6 +1264,10 @@ TransactionIdIsActive(TransactionId xid)
12641264
* corresponding flags is set. Typically, if you want to ignore ones with
12651265
* PROC_IN_VACUUM flag, you can use PROCARRAY_FLAGS_VACUUM.
12661266
*
1267+
* PROCARRAY_SLOTS_XMIN causes GetOldestXmin to ignore the xmin and
1268+
* catalog_xmin of any replication slots that exist in the system when
1269+
* calculating the oldest xmin.
1270+
*
12671271
* This is used by VACUUM to decide which deleted tuples must be preserved in
12681272
* the passed in table. For shared relations backends in all databases must be
12691273
* considered, but for non-shared relations that's not required, since only
@@ -1342,7 +1346,7 @@ GetOldestXmin(Relation rel, int flags)
13421346
volatilePGPROC*proc=&allProcs[pgprocno];
13431347
volatilePGXACT*pgxact=&allPgXact[pgprocno];
13441348

1345-
if (pgxact->vacuumFlags&flags)
1349+
if (pgxact->vacuumFlags&(flags&PROCARRAY_PROC_FLAGS_MASK))
13461350
continue;
13471351

13481352
if (allDbs||
@@ -1418,7 +1422,8 @@ GetOldestXmin(Relation rel, int flags)
14181422
/*
14191423
* Check whether there are replication slots requiring an older xmin.
14201424
*/
1421-
if (TransactionIdIsValid(replication_slot_xmin)&&
1425+
if (!(flags&PROCARRAY_SLOTS_XMIN)&&
1426+
TransactionIdIsValid(replication_slot_xmin)&&
14221427
NormalTransactionIdPrecedes(replication_slot_xmin,result))
14231428
result=replication_slot_xmin;
14241429

@@ -1428,7 +1433,8 @@ GetOldestXmin(Relation rel, int flags)
14281433
* possible. We need to do so if we're computing the global limit (rel =
14291434
* NULL) or if the passed relation is a catalog relation of some kind.
14301435
*/
1431-
if ((rel==NULL||
1436+
if (!(flags&PROCARRAY_SLOTS_XMIN)&&
1437+
(rel==NULL||
14321438
RelationIsAccessibleInLogicalDecoding(rel))&&
14331439
TransactionIdIsValid(replication_slot_catalog_xmin)&&
14341440
NormalTransactionIdPrecedes(replication_slot_catalog_xmin,result))

‎src/include/storage/proc.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,18 @@ struct XidCache
4444
*
4545
* Note: If you modify these flags, you need to modify PROCARRAY_XXX flags
4646
* in src/include/storage/procarray.h.
47+
*
48+
* PROC_RESERVED may later be assigned for use in vacuumFlags, but its value is
49+
* used for PROCARRAY_SLOTS_XMIN in procarray.h, so GetOldestXmin won't be able
50+
* to match and ignore processes with this flag set.
4751
*/
4852
#definePROC_IS_AUTOVACUUM0x01/* is it an autovac worker? */
4953
#definePROC_IN_VACUUM0x02/* currently running lazy vacuum */
5054
#definePROC_IN_ANALYZE0x04/* currently running analyze */
5155
#definePROC_VACUUM_FOR_WRAPAROUND0x08/* set by autovac only */
5256
#definePROC_IN_LOGICAL_DECODING0x10/* currently doing logical
5357
* decoding outside xact */
58+
#definePROC_RESERVED0x20/* reserved for procarray */
5459

5560
/* flags reset at EOXact */
5661
#definePROC_VACUUM_STATE_MASK \

‎src/include/storage/procarray.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,17 @@
3232
#definePROCARRAY_LOGICAL_DECODING_FLAG0x10/* currently doing logical
3333
* decoding outside xact */
3434

35+
#definePROCARRAY_SLOTS_XMIN0x20/* replication slot xmin,
36+
* catalog_xmin */
37+
/*
38+
* Only flags in PROCARRAY_PROC_FLAGS_MASK are considered when matching
39+
* PGXACT->vacuumFlags. Other flags are used for different purposes and
40+
* have no corresponding PROC flag equivalent.
41+
*/
42+
#definePROCARRAY_PROC_FLAGS_MASK(PROCARRAY_VACUUM_FLAG | \
43+
PROCARRAY_ANALYZE_FLAG | \
44+
PROCARRAY_LOGICAL_DECODING_FLAG)
45+
3546
/* Use the following flags as an input "flags" to GetOldestXmin function */
3647
/* Consider all backends except for logical decoding ones which manage xmin separately */
3748
#definePROCARRAY_FLAGS_DEFAULTPROCARRAY_LOGICAL_DECODING_FLAG

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp