Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit2bef06d

Browse files
committed
Preserve required !catalog tuples while computing initial decoding snapshot.
The logical decoding machinery already preserved all the requiredcatalog tuples, which is sufficient in the course of normal logicaldecoding, but did not guarantee that non-catalog tuples were preservedduring computation of the initial snapshot when creating a slot overthe replication protocol.This could cause a corrupted initial snapshot being exported. Thetime window for issues is usually not terribly large, but on a busyserver it's perfectly possible to it hit it. Ongoing decoding is notaffected by this bug.To avoid increased overhead for the SQL API, only retain additionaltuples when a logical slot is being created over the replicationprotocol. To do so this commit changes the signature ofCreateInitDecodingContext(), but it seems unlikely that it's beingused in an extension, so that's probably ok.In a drive-by fix, fix handling ofReplicationSlotsComputeRequiredXmin's already_locked argument, whichshould only apply to ProcArrayLock, not ReplicationSlotControlLock.Reported-By: Erik RijkersAnalyzed-By: Petr JelinekAuthor: Petr Jelinek, heavily editorialized by Andres FreundReviewed-By: Andres FreundDiscussion:https://postgr.es/m/9a897b86-46e1-9915-ee4c-da02e4ff6a95@2ndquadrant.comBackport: 9.4, where logical decoding was introduced.
1 parentfa31b6f commit2bef06d

File tree

8 files changed

+66
-18
lines changed

8 files changed

+66
-18
lines changed

‎src/backend/replication/logical/logical.c

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ StartupDecodingContext(List *output_plugin_options,
210210
LogicalDecodingContext*
211211
CreateInitDecodingContext(char*plugin,
212212
List*output_plugin_options,
213+
boolneed_full_snapshot,
213214
XLogPageReadCBread_page,
214215
LogicalOutputPluginWriterPrepareWriteprepare_write,
215216
LogicalOutputPluginWriterWritedo_write)
@@ -267,23 +268,31 @@ CreateInitDecodingContext(char *plugin,
267268
* the slot machinery about the new limit. Once that's done the
268269
* ProcArrayLock can be released as the slot machinery now is
269270
* protecting against vacuum.
271+
*
272+
* Note that, temporarily, the data, not just the catalog, xmin has to be
273+
* reserved if a data snapshot is to be exported. Otherwise the initial
274+
* data snapshot created here is not guaranteed to be valid. After that
275+
* the data xmin doesn't need to be managed anymore and the global xmin
276+
* should be recomputed. As we are fine with losing the pegged data xmin
277+
* after crash - no chance a snapshot would get exported anymore - we can
278+
* get away with just setting the slot's
279+
* effective_xmin. ReplicationSlotRelease will reset it again.
280+
*
270281
* ----
271282
*/
272283
LWLockAcquire(ProcArrayLock,LW_EXCLUSIVE);
273284

274-
slot->effective_catalog_xmin=GetOldestSafeDecodingTransactionId();
275-
slot->data.catalog_xmin=slot->effective_catalog_xmin;
285+
xmin_horizon=GetOldestSafeDecodingTransactionId(need_full_snapshot);
286+
287+
slot->effective_catalog_xmin=xmin_horizon;
288+
slot->data.catalog_xmin=xmin_horizon;
289+
if (need_full_snapshot)
290+
slot->effective_xmin=xmin_horizon;
276291

277292
ReplicationSlotsComputeRequiredXmin(true);
278293

279294
LWLockRelease(ProcArrayLock);
280295

281-
/*
282-
* tell the snapshot builder to only assemble snapshot once reaching the
283-
* running_xact's record with the respective xmin.
284-
*/
285-
xmin_horizon=slot->data.catalog_xmin;
286-
287296
ReplicationSlotMarkDirty();
288297
ReplicationSlotSave();
289298

‎src/backend/replication/logical/snapbuild.c

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -533,6 +533,18 @@ SnapBuildInitialSnapshot(SnapBuild *builder)
533533
* mechanism. Due to that we can do this without locks, we're only
534534
* changing our own value.
535535
*/
536+
#ifdefUSE_ASSERT_CHECKING
537+
{
538+
TransactionIdsafeXid;
539+
540+
LWLockAcquire(ProcArrayLock,LW_SHARED);
541+
safeXid=GetOldestSafeDecodingTransactionId(true);
542+
LWLockRelease(ProcArrayLock);
543+
544+
Assert(TransactionIdPrecedesOrEquals(safeXid,snap->xmin));
545+
}
546+
#endif
547+
536548
MyPgXact->xmin=snap->xmin;
537549

538550
/* allocate in transaction context */

‎src/backend/replication/slot.c

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,22 @@ ReplicationSlotRelease(void)
398398
SpinLockRelease(&slot->mutex);
399399
}
400400

401+
402+
/*
403+
* If slot needed to temporarily restrain both data and catalog xmin to
404+
* create the catalog snapshot, remove that temporary constraint.
405+
* Snapshots can only be exported while the initial snapshot is still
406+
* acquired.
407+
*/
408+
if (!TransactionIdIsValid(slot->data.xmin)&&
409+
TransactionIdIsValid(slot->effective_xmin))
410+
{
411+
SpinLockAcquire(&slot->mutex);
412+
slot->effective_xmin=InvalidTransactionId;
413+
SpinLockRelease(&slot->mutex);
414+
ReplicationSlotsComputeRequiredXmin(false);
415+
}
416+
401417
MyReplicationSlot=NULL;
402418

403419
/* might not have been set when we've been a plain slot */
@@ -612,6 +628,9 @@ ReplicationSlotPersist(void)
612628

613629
/*
614630
* Compute the oldest xmin across all slots and store it in the ProcArray.
631+
*
632+
* If already_locked is true, ProcArrayLock has already been acquired
633+
* exclusively.
615634
*/
616635
void
617636
ReplicationSlotsComputeRequiredXmin(boolalready_locked)
@@ -622,8 +641,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
622641

623642
Assert(ReplicationSlotCtl!=NULL);
624643

625-
if (!already_locked)
626-
LWLockAcquire(ReplicationSlotControlLock,LW_SHARED);
644+
LWLockAcquire(ReplicationSlotControlLock,LW_SHARED);
627645

628646
for (i=0;i<max_replication_slots;i++)
629647
{
@@ -652,8 +670,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
652670
agg_catalog_xmin=effective_catalog_xmin;
653671
}
654672

655-
if (!already_locked)
656-
LWLockRelease(ReplicationSlotControlLock);
673+
LWLockRelease(ReplicationSlotControlLock);
657674

658675
ProcArraySetReplicationSlotXmin(agg_xmin,agg_catalog_xmin,already_locked);
659676
}

‎src/backend/replication/slotfuncs.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
131131
/*
132132
* Create logical decoding context, to build the initial snapshot.
133133
*/
134-
ctx=CreateInitDecodingContext(
135-
NameStr(*plugin),NIL,
134+
ctx=CreateInitDecodingContext(NameStr(*plugin),NIL,
135+
false,/* do not build snapshot */
136136
logical_read_local_xlog_page,NULL,NULL);
137137

138138
/* build initial snapshot, might take a while */

‎src/backend/replication/walsender.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -909,6 +909,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
909909
}
910910

911911
ctx=CreateInitDecodingContext(cmd->plugin,NIL,
912+
true,/* build snapshot */
912913
logical_read_xlog_page,
913914
WalSndPrepareWrite,WalSndWriteData);
914915

‎src/backend/storage/ipc/procarray.c

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2151,7 +2151,7 @@ GetOldestActiveTransactionId(void)
21512151
* that the caller will immediately use the xid to peg the xmin horizon.
21522152
*/
21532153
TransactionId
2154-
GetOldestSafeDecodingTransactionId(void)
2154+
GetOldestSafeDecodingTransactionId(boolcatalogOnly)
21552155
{
21562156
ProcArrayStruct*arrayP=procArray;
21572157
TransactionIdoldestSafeXid;
@@ -2174,9 +2174,17 @@ GetOldestSafeDecodingTransactionId(void)
21742174
/*
21752175
* If there's already a slot pegging the xmin horizon, we can start with
21762176
* that value, it's guaranteed to be safe since it's computed by this
2177-
* routine initially and has been enforced since.
2177+
* routine initially and has been enforced since. We can always use the
2178+
* slot's general xmin horizon, but the catalog horizon is only usable
2179+
* when we only catalog data is going to be looked at.
21782180
*/
2179-
if (TransactionIdIsValid(procArray->replication_slot_catalog_xmin)&&
2181+
if (TransactionIdIsValid(procArray->replication_slot_xmin)&&
2182+
TransactionIdPrecedes(procArray->replication_slot_xmin,
2183+
oldestSafeXid))
2184+
oldestSafeXid=procArray->replication_slot_xmin;
2185+
2186+
if (catalogOnly&&
2187+
TransactionIdIsValid(procArray->replication_slot_catalog_xmin)&&
21802188
TransactionIdPrecedes(procArray->replication_slot_catalog_xmin,
21812189
oldestSafeXid))
21822190
oldestSafeXid=procArray->replication_slot_catalog_xmin;

‎src/include/replication/logical.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ extern void CheckLogicalDecodingRequirements(void);
8282

8383
externLogicalDecodingContext*CreateInitDecodingContext(char*plugin,
8484
List*output_plugin_options,
85+
boolneed_full_snapshot,
8586
XLogPageReadCBread_page,
8687
LogicalOutputPluginWriterPrepareWriteprepare_write,
8788
LogicalOutputPluginWriterWritedo_write);

‎src/include/storage/procarray.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ extern bool TransactionIdIsInProgress(TransactionId xid);
8888
externboolTransactionIdIsActive(TransactionIdxid);
8989
externTransactionIdGetOldestXmin(Relationrel,intflags);
9090
externTransactionIdGetOldestActiveTransactionId(void);
91-
externTransactionIdGetOldestSafeDecodingTransactionId(void);
91+
externTransactionIdGetOldestSafeDecodingTransactionId(boolcatalogOnly);
9292

9393
externVirtualTransactionId*GetVirtualXIDsDelayingChkpt(int*nvxids);
9494
externboolHaveVirtualXIDsDelayingChkpt(VirtualTransactionId*vxids,intnvxids);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp