Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitf41d846

Browse files
author
Amit Kapila
committed
Raise an error while trying to acquire an invalid slot.
Once a replication slot is invalidated, it cannot be altered or used tofetch changes. However, a process could still acquire an invalid slot andfail later.For example, if a process acquires a logical slot that was invalidated dueto wal_removed, it will eventually fail in CreateDecodingContext() whenattempting to access the removed WAL. Similarly, for physical replicationslots, even if the slot is invalidated and invalidation_reason is set towal_removed, the walsender does not currently check for invalidation whenstarting physical replication. Instead, replication starts, and an erroris only reported later while trying to access WAL. Similarly, we prohibitmodifying slot properties for invalid slots but give the error for thesame after acquiring the slot.This patch improves error handling by detecting invalid slots earlier atthe time of slot acquisition which is the first step. This also helped inunifying different ERROR messages at different places and gave aconsistent message for invalid slots. This means that the message forinvalid slots will change to a generic message.This will also be helpful for future patches where we are planning toinvalidate slots due to more reasons like idle_timeout because we don'thave to modify multiple places in such cases and avoid the chances ofmissing out on a particular place.Author: Nisha Moond <nisha.moond412@gmail.com>Author: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>Reviewed-by: Vignesh C <vignesh21@gmail.com>Reviewed-by: Peter Smith <smithpb2250@gmail.com>Reviewed-by: Hayato Kuroda <kuroda.hayato@fujitsu.com>Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>Discussion:https://postgr.es/m/CABdArM6pBL5hPnSQ+5nEVMANcF4FCH7LQmgskXyiLY75TMnKpw@mail.gmail.com
1 parenta632cd3 commitf41d846

File tree

10 files changed

+39
-50
lines changed

10 files changed

+39
-50
lines changed

‎src/backend/replication/logical/logical.c

Lines changed: 3 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -542,28 +542,9 @@ CreateDecodingContext(XLogRecPtr start_lsn,
542542
errdetail("This replication slot is being synchronized from the primary server."),
543543
errhint("Specify another replication slot."));
544544

545-
/*
546-
* Check if slot has been invalidated due to max_slot_wal_keep_size. Avoid
547-
* "cannot get changes" wording in this errmsg because that'd be
548-
* confusingly ambiguous about no changes being available when called from
549-
* pg_logical_slot_get_changes_guts().
550-
*/
551-
if (MyReplicationSlot->data.invalidated==RS_INVAL_WAL_REMOVED)
552-
ereport(ERROR,
553-
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
554-
errmsg("can no longer get changes from replication slot \"%s\"",
555-
NameStr(MyReplicationSlot->data.name)),
556-
errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
557-
558-
if (MyReplicationSlot->data.invalidated!=RS_INVAL_NONE)
559-
ereport(ERROR,
560-
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
561-
errmsg("can no longer get changes from replication slot \"%s\"",
562-
NameStr(MyReplicationSlot->data.name)),
563-
errdetail("This slot has been invalidated because it was conflicting with recovery.")));
564-
565-
Assert(MyReplicationSlot->data.invalidated==RS_INVAL_NONE);
566-
Assert(MyReplicationSlot->data.restart_lsn!=InvalidXLogRecPtr);
545+
/* slot must be valid to allow decoding */
546+
Assert(slot->data.invalidated==RS_INVAL_NONE);
547+
Assert(slot->data.restart_lsn!=InvalidXLogRecPtr);
567548

568549
if (start_lsn==InvalidXLogRecPtr)
569550
{

‎src/backend/replication/logical/logicalfuncs.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
197197
else
198198
end_of_wal=GetXLogReplayRecPtr(NULL);
199199

200-
ReplicationSlotAcquire(NameStr(*name), true);
200+
ReplicationSlotAcquire(NameStr(*name), true, true);
201201

202202
PG_TRY();
203203
{

‎src/backend/replication/logical/slotsync.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,7 @@ drop_local_obsolete_slots(List *remote_slot_list)
446446

447447
if (synced_slot)
448448
{
449-
ReplicationSlotAcquire(NameStr(local_slot->data.name), true);
449+
ReplicationSlotAcquire(NameStr(local_slot->data.name), true, false);
450450
ReplicationSlotDropAcquired();
451451
}
452452

@@ -665,7 +665,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
665665
* pre-check to ensure that at least one of the slot properties is
666666
* changed before acquiring the slot.
667667
*/
668-
ReplicationSlotAcquire(remote_slot->name, true);
668+
ReplicationSlotAcquire(remote_slot->name, true, false);
669669

670670
Assert(slot==MyReplicationSlot);
671671

‎src/backend/replication/slot.c

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -535,9 +535,13 @@ ReplicationSlotName(int index, Name name)
535535
*
536536
* An error is raised if nowait is true and the slot is currently in use. If
537537
* nowait is false, we sleep until the slot is released by the owning process.
538+
*
539+
* An error is raised if error_if_invalid is true and the slot is found to
540+
* be invalid. It should always be set to true, except when we are temporarily
541+
* acquiring the slot and don't intend to change it.
538542
*/
539543
void
540-
ReplicationSlotAcquire(constchar*name,boolnowait)
544+
ReplicationSlotAcquire(constchar*name,boolnowait,boolerror_if_invalid)
541545
{
542546
ReplicationSlot*s;
543547
intactive_pid;
@@ -561,6 +565,19 @@ ReplicationSlotAcquire(const char *name, bool nowait)
561565
name)));
562566
}
563567

568+
/* Invalid slots can't be modified or used before accessing the WAL. */
569+
if (error_if_invalid&&s->data.invalidated!=RS_INVAL_NONE)
570+
{
571+
LWLockRelease(ReplicationSlotControlLock);
572+
573+
ereport(ERROR,
574+
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
575+
errmsg("can no longer access replication slot \"%s\"",
576+
NameStr(s->data.name)),
577+
errdetail("This replication slot has been invalidated due to \"%s\".",
578+
SlotInvalidationCauses[s->data.invalidated]));
579+
}
580+
564581
/*
565582
* This is the slot we want; check if it's active under some other
566583
* process. In single user mode, we don't need this check.
@@ -785,7 +802,7 @@ ReplicationSlotDrop(const char *name, bool nowait)
785802
{
786803
Assert(MyReplicationSlot==NULL);
787804

788-
ReplicationSlotAcquire(name,nowait);
805+
ReplicationSlotAcquire(name,nowait, false);
789806

790807
/*
791808
* Do not allow users to drop the slots which are currently being synced
@@ -812,21 +829,14 @@ ReplicationSlotAlter(const char *name, const bool *failover,
812829
Assert(MyReplicationSlot==NULL);
813830
Assert(failover||two_phase);
814831

815-
ReplicationSlotAcquire(name, false);
832+
ReplicationSlotAcquire(name, false, true);
816833

817834
if (SlotIsPhysical(MyReplicationSlot))
818835
ereport(ERROR,
819836
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
820837
errmsg("cannot use %s with a physical replication slot",
821838
"ALTER_REPLICATION_SLOT"));
822839

823-
if (MyReplicationSlot->data.invalidated!=RS_INVAL_NONE)
824-
ereport(ERROR,
825-
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
826-
errmsg("cannot alter invalid replication slot \"%s\"",name),
827-
errdetail("This replication slot has been invalidated due to \"%s\".",
828-
SlotInvalidationCauses[MyReplicationSlot->data.invalidated]));
829-
830840
if (RecoveryInProgress())
831841
{
832842
/*

‎src/backend/replication/slotfuncs.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -536,7 +536,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
536536
moveto=Min(moveto,GetXLogReplayRecPtr(NULL));
537537

538538
/* Acquire the slot so we "own" it */
539-
ReplicationSlotAcquire(NameStr(*slotname), true);
539+
ReplicationSlotAcquire(NameStr(*slotname), true, true);
540540

541541
/* A slot whose restart_lsn has never been reserved cannot be advanced */
542542
if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))

‎src/backend/replication/walsender.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -816,7 +816,7 @@ StartReplication(StartReplicationCmd *cmd)
816816

817817
if (cmd->slotname)
818818
{
819-
ReplicationSlotAcquire(cmd->slotname, true);
819+
ReplicationSlotAcquire(cmd->slotname, true, true);
820820
if (SlotIsLogical(MyReplicationSlot))
821821
ereport(ERROR,
822822
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1434,7 +1434,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
14341434

14351435
Assert(!MyReplicationSlot);
14361436

1437-
ReplicationSlotAcquire(cmd->slotname, true);
1437+
ReplicationSlotAcquire(cmd->slotname, true, true);
14381438

14391439
/*
14401440
* Force a disconnect, so that the decoding code doesn't need to care

‎src/backend/utils/adt/pg_upgrade_support.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS)
298298
slot_name=PG_GETARG_NAME(0);
299299

300300
/* Acquire the given slot */
301-
ReplicationSlotAcquire(NameStr(*slot_name), true);
301+
ReplicationSlotAcquire(NameStr(*slot_name), true, true);
302302

303303
Assert(SlotIsLogical(MyReplicationSlot));
304304

‎src/include/replication/slot.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,8 @@ extern void ReplicationSlotDropAcquired(void);
253253
externvoidReplicationSlotAlter(constchar*name,constbool*failover,
254254
constbool*two_phase);
255255

256-
externvoidReplicationSlotAcquire(constchar*name,boolnowait);
256+
externvoidReplicationSlotAcquire(constchar*name,boolnowait,
257+
boolerror_if_invalid);
257258
externvoidReplicationSlotRelease(void);
258259
externvoidReplicationSlotCleanup(boolsynced_only);
259260
externvoidReplicationSlotSave(void);

‎src/test/recovery/t/019_replslot_limit.pl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@
234234
for (my$i = 0;$i < 10 *$PostgreSQL::Test::Utils::timeout_default;$i++)
235235
{
236236
if ($node_standby->log_contains(
237-
"requested WAL segment [0-9A-F]+hasalreadybeenremoved",
237+
"This replication slothas beeninvalidated due to\"wal_removed\".",
238238
$logstart))
239239
{
240240
$failed = 1;

‎src/test/recovery/t/035_standby_logical_decoding.pl

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -533,7 +533,7 @@ sub wait_until_vacuum_can_remove
533533
qq[ALTER_REPLICATION_SLOT vacuum_full_inactiveslot (failover);],
534534
replication=>'database');
535535
ok($stderr =~
536-
/ERROR:cannot alter invalid replication slot "vacuum_full_inactiveslot"/
536+
/ERROR:can no longer access replication slot "vacuum_full_inactiveslot"/
537537
&&$stderr =~
538538
/DETAIL: This replication slot has been invalidated due to "rows_removed"./,
539539
"invalidated slot cannot be altered");
@@ -551,8 +551,7 @@ sub wait_until_vacuum_can_remove
551551

552552
# We are not able to read from the slot as it has been invalidated
553553
check_pg_recvlogical_stderr($handle,
554-
"can no longer get changes from replication slot\"vacuum_full_activeslot\""
555-
);
554+
"can no longer access replication slot\"vacuum_full_activeslot\"");
556555

557556
# Turn hot_standby_feedback back on
558557
change_hot_standby_feedback_and_wait_for_xmins(1, 1);
@@ -632,8 +631,7 @@ sub wait_until_vacuum_can_remove
632631

633632
# We are not able to read from the slot as it has been invalidated
634633
check_pg_recvlogical_stderr($handle,
635-
"can no longer get changes from replication slot\"row_removal_activeslot\""
636-
);
634+
"can no longer access replication slot\"row_removal_activeslot\"");
637635

638636
##################################################
639637
# Recovery conflict: Same as Scenario 2 but on a shared catalog table
@@ -668,7 +666,7 @@ sub wait_until_vacuum_can_remove
668666

669667
# We are not able to read from the slot as it has been invalidated
670668
check_pg_recvlogical_stderr($handle,
671-
"can no longerget changes from replication slot\"shared_row_removal_activeslot\""
669+
"can no longeraccess replication slot\"shared_row_removal_activeslot\""
672670
);
673671

674672
##################################################
@@ -759,7 +757,7 @@ sub wait_until_vacuum_can_remove
759757

760758
# We are not able to read from the slot as it has been invalidated
761759
check_pg_recvlogical_stderr($handle,
762-
"can no longerget changes from replication slot\"pruning_activeslot\"");
760+
"can no longeraccess replication slot\"pruning_activeslot\"");
763761

764762
# Turn hot_standby_feedback back on
765763
change_hot_standby_feedback_and_wait_for_xmins(1, 1);
@@ -818,8 +816,7 @@ sub wait_until_vacuum_can_remove
818816
make_slot_active($node_standby,'wal_level_', 0, \$stdout, \$stderr);
819817
# as the slot has been invalidated we should not be able to read
820818
check_pg_recvlogical_stderr($handle,
821-
"can no longer get changes from replication slot\"wal_level_activeslot\""
822-
);
819+
"can no longer access replication slot\"wal_level_activeslot\"");
823820

824821
##################################################
825822
# DROP DATABASE should drop its slots, including active slots.

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp