Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitff539da

Browse files
Cleanup slots during drop database
Automatically drop all logical replication slots associated with adatabase when the database is dropped. Previously we threw an ERRORif a slot existed. Now we throw ERROR only if a slot is active inthe database being dropped.Craig Ringer
1 parent4d33a7f commitff539da

File tree

7 files changed

+182
-14
lines changed

7 files changed

+182
-14
lines changed

‎doc/src/sgml/func.sgml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18876,7 +18876,8 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup());
1887618876
<entry>
1887718877
Drops the physical or logical replication slot
1887818878
named <parameter>slot_name</parameter>. Same as replication protocol
18879-
command <literal>DROP_REPLICATION_SLOT</>.
18879+
command <literal>DROP_REPLICATION_SLOT</>. For logical slots, this must
18880+
be called when connected to the same database the slot was created on.
1888018881
</entry>
1888118882
</row>
1888218883

‎doc/src/sgml/protocol.sgml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2034,6 +2034,8 @@ The commands accepted in walsender mode are:
20342034
<para>
20352035
Drops a replication slot, freeing any reserved server-side resources. If
20362036
the slot is currently in use by an active connection, this command fails.
2037+
If the slot is a logical slot that was created in a database other than
2038+
the database the walsender is connected to, this command fails.
20372039
</para>
20382040
<variablelist>
20392041
<varlistentry>

‎src/backend/commands/dbcommands.c

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -845,19 +845,22 @@ dropdb(const char *dbname, bool missing_ok)
845845
errmsg("cannot drop the currently open database")));
846846

847847
/*
848-
* Check whether there are, possibly unconnected, logical slots that refer
849-
* to the to-be-dropped database. The database lock we are holding
850-
* prevents the creation of new slots using the database.
848+
* Check whether there are active logical slots that refer to the
849+
* to-be-dropped database. The database lock we are holding prevents the
850+
* creation of new slots using the database or existing slots becoming
851+
* active.
851852
*/
852-
if (ReplicationSlotsCountDBSlots(db_id,&nslots,&nslots_active))
853+
(void)ReplicationSlotsCountDBSlots(db_id,&nslots,&nslots_active);
854+
if (nslots_active)
855+
{
853856
ereport(ERROR,
854857
(errcode(ERRCODE_OBJECT_IN_USE),
855-
errmsg("database \"%s\" is used bya logical replication slot",
858+
errmsg("database \"%s\" is used byan active logical replication slot",
856859
dbname),
857-
errdetail_plural("There is %dslot, %d of themactive.",
858-
"There are %dslots, %d of themactive.",
859-
nslots,
860-
nslots,nslots_active)));
860+
errdetail_plural("There is %d active slot",
861+
"There are %d active slots",
862+
nslots_active,nslots_active)));
863+
}
861864

862865
/*
863866
* Check for other backends in the target database. (Because we hold the
@@ -914,6 +917,11 @@ dropdb(const char *dbname, bool missing_ok)
914917
*/
915918
dropDatabaseDependencies(db_id);
916919

920+
/*
921+
* Drop db-specific replication slots.
922+
*/
923+
ReplicationSlotsDropDBSlots(db_id);
924+
917925
/*
918926
* Drop pages for this database that are in the shared buffer cache. This
919927
* is important to ensure that no remaining backend tries to write out a
@@ -2124,11 +2132,17 @@ dbase_redo(XLogReaderState *record)
21242132
* InitPostgres() cannot fully re-execute concurrently. This
21252133
* avoids backends re-connecting automatically to same database,
21262134
* which can happen in some cases.
2135+
*
2136+
* This will lock out walsenders trying to connect to db-specific
2137+
* slots for logical decoding too, so it's safe for us to drop slots.
21272138
*/
21282139
LockSharedObjectForSession(DatabaseRelationId,xlrec->db_id,0,AccessExclusiveLock);
21292140
ResolveRecoveryConflictWithDatabase(xlrec->db_id);
21302141
}
21312142

2143+
/* Drop any database-specific replication slots */
2144+
ReplicationSlotsDropDBSlots(xlrec->db_id);
2145+
21322146
/* Drop pages for this database that are in the shared buffer cache */
21332147
DropDatabaseBuffers(xlrec->db_id);
21342148

‎src/backend/replication/slot.c

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -796,6 +796,94 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
796796
return false;
797797
}
798798

799+
/*
800+
* ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
801+
* passed database oid. The caller should hold an exclusive lock on the
802+
* pg_database oid for the database to prevent creation of new slots on the db
803+
* or replay from existing slots.
804+
*
805+
* This routine isn't as efficient as it could be - but we don't drop databases
806+
* often, especially databases with lots of slots.
807+
*
808+
* Another session that concurrently acquires an existing slot on the target DB
809+
* (most likely to drop it) may cause this function to ERROR. If that happens
810+
* it may have dropped some but not all slots.
811+
*/
812+
void
813+
ReplicationSlotsDropDBSlots(Oiddboid)
814+
{
815+
inti;
816+
817+
if (max_replication_slots <=0)
818+
return;
819+
820+
restart:
821+
LWLockAcquire(ReplicationSlotControlLock,LW_SHARED);
822+
for (i=0;i<max_replication_slots;i++)
823+
{
824+
ReplicationSlot*s;
825+
NameDataslotname;
826+
intactive_pid;
827+
828+
s=&ReplicationSlotCtl->replication_slots[i];
829+
830+
/* cannot change while ReplicationSlotCtlLock is held */
831+
if (!s->in_use)
832+
continue;
833+
834+
/* only logical slots are database specific, skip */
835+
if (!SlotIsLogical(s))
836+
continue;
837+
838+
/* not our database, skip */
839+
if (s->data.database!=dboid)
840+
continue;
841+
842+
/* Claim the slot, as if ReplicationSlotAcquire()ing. */
843+
SpinLockAcquire(&s->mutex);
844+
strncpy(NameStr(slotname),NameStr(s->data.name),NAMEDATALEN);
845+
NameStr(slotname)[NAMEDATALEN-1]='\0';
846+
active_pid=s->active_pid;
847+
if (active_pid==0)
848+
{
849+
MyReplicationSlot=s;
850+
s->active_pid=MyProcPid;
851+
}
852+
SpinLockRelease(&s->mutex);
853+
854+
/*
855+
* We might fail here if the slot was active. Even though we hold an
856+
* exclusive lock on the database object a logical slot for that DB can
857+
* still be active if it's being dropped by a backend connected to
858+
* another DB or is otherwise acquired.
859+
*
860+
* It's an unlikely race that'll only arise from concurrent user action,
861+
* so we'll just bail out.
862+
*/
863+
if (active_pid)
864+
elog(ERROR,"replication slot %s is in use by pid %d",
865+
NameStr(slotname),active_pid);
866+
867+
/*
868+
* To avoid largely duplicating ReplicationSlotDropAcquired() or
869+
* complicating it with already_locked flags for ProcArrayLock,
870+
* ReplicationSlotControlLock and ReplicationSlotAllocationLock, we
871+
* just release our ReplicationSlotControlLock to drop the slot.
872+
*
873+
* For safety we'll restart our scan from the beginning each
874+
* time we release the lock.
875+
*/
876+
LWLockRelease(ReplicationSlotControlLock);
877+
ReplicationSlotDropAcquired();
878+
gotorestart;
879+
}
880+
LWLockRelease(ReplicationSlotControlLock);
881+
882+
/* recompute limits once after all slots are dropped */
883+
ReplicationSlotsComputeRequiredXmin(false);
884+
ReplicationSlotsComputeRequiredLSN();
885+
}
886+
799887

800888
/*
801889
* Check whether the server's configuration supports using replication

‎src/include/replication/slot.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
177177
externvoidReplicationSlotsComputeRequiredLSN(void);
178178
externXLogRecPtrReplicationSlotsComputeLogicalRestartLSN(void);
179179
externboolReplicationSlotsCountDBSlots(Oiddboid,int*nslots,int*nactive);
180+
externvoidReplicationSlotsDropDBSlots(Oiddboid);
180181

181182
externvoidStartupReplicationSlots(void);
182183
externvoidCheckPointReplicationSlots(void);

‎src/test/recovery/t/006_logical_decoding.pl

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
use warnings;
88
use PostgresNode;
99
use TestLib;
10-
use Test::Moretests=>5;
10+
use Test::Moretests=>16;
1111

1212
# Initialize master node
1313
my$node_master = get_new_node('master');
@@ -54,7 +54,7 @@
5454
is($stdout_sql,$expected,'got expected output from SQL decoding session');
5555

5656
my$endpos =$node_master->safe_psql('postgres',"SELECT location FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY location DESC LIMIT 1;");
57-
diag"waiting to replay$endpos";
57+
print"waiting to replay$endpos\n";
5858

5959
my$stdout_recv =$node_master->pg_recvlogical_upto('postgres','test_slot',$endpos, 10,'include-xids'=>'0','skip-empty-xacts'=>'1');
6060
chomp($stdout_recv);
@@ -64,5 +64,41 @@
6464
chomp($stdout_recv);
6565
is($stdout_recv,'','pg_recvlogical acknowledged changes, nothing pending on slot');
6666

67+
$node_master->safe_psql('postgres','CREATE DATABASE otherdb');
68+
69+
is($node_master->psql('otherdb',"SELECT location FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY location DESC LIMIT 1;"), 3,
70+
'replaying logical slot from another database fails');
71+
72+
$node_master->safe_psql('otherdb',qq[SELECT pg_create_logical_replication_slot('otherdb_slot', 'test_decoding');]);
73+
74+
# make sure you can't drop a slot while active
75+
my$pg_recvlogical = IPC::Run::start(['pg_recvlogical','-d',$node_master->connstr('otherdb'),'-S','otherdb_slot','-f','-','--start']);
76+
$node_master->poll_query_until('otherdb',"SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'otherdb_slot' AND active_pid IS NOT NULL)");
77+
is($node_master->psql('postgres','DROP DATABASE otherdb'), 3,
78+
'dropping a DB with inactive logical slots fails');
79+
$pg_recvlogical->kill_kill;
80+
is($node_master->slot('otherdb_slot')->{'slot_name'},undef,
81+
'logical slot still exists');
82+
83+
$node_master->poll_query_until('otherdb',"SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'otherdb_slot' AND active_pid IS NULL)");
84+
is($node_master->psql('postgres','DROP DATABASE otherdb'), 0,
85+
'dropping a DB with inactive logical slots succeeds');
86+
is($node_master->slot('otherdb_slot')->{'slot_name'},undef,
87+
'logical slot was actually dropped with DB');
88+
89+
# Restarting a node with wal_level = logical that has existing
90+
# slots must succeed, but decoding from those slots must fail.
91+
$node_master->safe_psql('postgres','ALTER SYSTEM SET wal_level = replica');
92+
is($node_master->safe_psql('postgres','SHOW wal_level'),'logical','wal_level is still logical before restart');
93+
$node_master->restart;
94+
is($node_master->safe_psql('postgres','SHOW wal_level'),'replica','wal_level is replica');
95+
isnt($node_master->slot('test_slot')->{'catalog_xmin'},'0',
96+
'restored slot catalog_xmin is nonzero');
97+
is($node_master->psql('postgres',qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]), 3,
98+
'reading from slot with wal_level < logical fails');
99+
is($node_master->psql('postgres',q[SELECT pg_drop_replication_slot('test_slot')]), 0,
100+
'can drop logical slot while wal_level = replica');
101+
is($node_master->slot('test_slot')->{'catalog_xmin'},'','slot was dropped');
102+
67103
# done with the node
68104
$node_master->stop;

‎src/test/recovery/t/010_logical_decoding_timelines.pl

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,15 @@
1515
# This module uses the first approach to show that timeline following
1616
# on a logical slot works.
1717
#
18+
# (For convenience, it also tests some recovery-related operations
19+
# on logical slots).
20+
#
1821
use strict;
1922
use warnings;
2023

2124
use PostgresNode;
2225
use TestLib;
23-
use Test::Moretests=>10;
26+
use Test::Moretests=>13;
2427
use RecursiveCopy;
2528
use File::Copy;
2629
use IPC::Run ();
@@ -50,6 +53,16 @@
5053
$node_master->safe_psql('postgres',"CREATE TABLE decoding(blah text);");
5154
$node_master->safe_psql('postgres',
5255
"INSERT INTO decoding(blah) VALUES ('beforebb');");
56+
57+
# We also want to verify that DROP DATABASE on a standby with a logical
58+
# slot works. This isn't strictly related to timeline following, but
59+
# the only way to get a logical slot on a standby right now is to use
60+
# the same physical copy trick, so:
61+
$node_master->safe_psql('postgres','CREATE DATABASE dropme;');
62+
$node_master->safe_psql('dropme',
63+
"SELECT pg_create_logical_replication_slot('dropme_slot', 'test_decoding');"
64+
);
65+
5366
$node_master->safe_psql('postgres','CHECKPOINT;');
5467

5568
my$backup_name ='b1';
@@ -68,6 +81,17 @@
6881

6982
$node_replica->start;
7083

84+
# If we drop 'dropme' on the master, the standby should drop the
85+
# db and associated slot.
86+
is($node_master->psql('postgres','DROP DATABASE dropme'), 0,
87+
'dropped DB with logical slot OK on master');
88+
$node_master->wait_for_catchup($node_replica,'replay',$node_master->lsn('insert'));
89+
is($node_replica->safe_psql('postgres',q[SELECT 1 FROM pg_database WHERE datname = 'dropme']),'',
90+
'dropped DB dropme on standby');
91+
is($node_master->slot('dropme_slot')->{'slot_name'},undef,
92+
'logical slot was actually dropped on standby');
93+
94+
# Back to testing failover...
7195
$node_master->safe_psql('postgres',
7296
"SELECT pg_create_logical_replication_slot('after_basebackup', 'test_decoding');"
7397
);
@@ -99,10 +123,13 @@
99123
cmp_ok($phys_slot->{'xmin'},'>=',$phys_slot->{'catalog_xmin'},
100124
'xmin on physical slot must not be lower than catalog_xmin');
101125

126+
$node_master->safe_psql('postgres','CHECKPOINT');
127+
102128
# Boom, crash
103129
$node_master->stop('immediate');
104130

105131
$node_replica->promote;
132+
print"waiting for replica to come up\n";
106133
$node_replica->poll_query_until('postgres',
107134
"SELECT NOT pg_is_in_recovery();");
108135

@@ -154,5 +181,4 @@ BEGIN
154181
chomp($stdout);
155182
is($stdout,$final_expected_output_bb,'got same output from walsender via pg_recvlogical on before_basebackup');
156183

157-
# We don't need the standby anymore
158184
$node_replica->teardown_node();

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp