Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitd811c03

Browse files
committed
Add 'active_in' column to pg_replication_slots.
Right now it is visible whether a replication slot is active in anysession, but not in which. Adding the active_in column, containing thepid of the backend having acquired the slot, makes it much easier toassociate pg_replication_slots entries with the correspondingpg_stat_replication/pg_stat_activity row.This should have been done from the start, but I (Andres) dropped theball there somehow.Author: Craig Ringer, revised by me Discussion:CAMsr+YFKgZca5_7_ouaMWxA5PneJC9LNViPzpDHusaPhU9pA7g@mail.gmail.com
1 parent528c2e4 commitd811c03

File tree

9 files changed

+47
-28
lines changed

9 files changed

+47
-28
lines changed

‎contrib/test_decoding/expected/ddl.out

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -603,7 +603,7 @@ SELECT pg_drop_replication_slot('regression_slot');
603603

604604
/* check that the slot is gone */
605605
SELECT * FROM pg_replication_slots;
606-
slot_name | plugin | slot_type | datoid | database | active | xmin | catalog_xmin | restart_lsn
607-
-----------+--------+-----------+--------+----------+--------+------+--------------+-------------
606+
slot_name | plugin | slot_type | datoid | database | active |active_in |xmin | catalog_xmin | restart_lsn
607+
-----------+--------+-----------+--------+----------+--------+-----------+------+--------------+-------------
608608
(0 rows)
609609

‎doc/src/sgml/catalogs.sgml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5400,6 +5400,16 @@
54005400
<entry>True if this slot is currently actively being used</entry>
54015401
</row>
54025402

5403+
<row>
5404+
<entry><structfield>active_in</structfield></entry>
5405+
<entry><type>integer</type></entry>
5406+
<entry></entry>
5407+
<entry>The process ID of the session using this slot if the slot
5408+
is currently actively being used. <literal>NULL</literal> if
5409+
inactive.
5410+
</entry>
5411+
</row>
5412+
54035413
<row>
54045414
<entry><structfield>xmin</structfield></entry>
54055415
<entry><type>xid</type></entry>

‎doc/src/sgml/logicaldecoding.sgml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,10 @@ postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', '
6262
regression_slot | 0/16B1970
6363
(1 row)
6464

65-
postgres=# SELECT* FROM pg_replication_slots;
66-
slot_name | plugin | slot_type |datoid |database | active | xmin | catalog_xmin | restart_lsn
67-
-----------------+---------------+-----------+--------+----------+--------+--------+--------------+-------------
68-
regression_slot | test_decoding | logical | 12052 |postgres | f | | 684 | 0/16A4408
65+
postgres=# SELECTslot_name, plugin, slot_type, database, active, restart_lsn FROM pg_replication_slots;
66+
slot_name | plugin | slot_type | database | active | restart_lsn
67+
-----------------+---------------+-----------+----------+--------+-------------
68+
regression_slot | test_decoding | logical | postgres | f | 0/16A4408
6969
(1 row)
7070

7171
postgres=# -- There are no changes to see yet

‎src/backend/catalog/system_views.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -665,6 +665,7 @@ CREATE VIEW pg_replication_slots AS
665665
L.datoid,
666666
D.datnameAS database,
667667
L.active,
668+
L.active_in,
668669
L.xmin,
669670
L.catalog_xmin,
670671
L.restart_lsn

‎src/backend/replication/slot.c

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
262262
* be doing that. So it's safe to initialize the slot.
263263
*/
264264
Assert(!slot->in_use);
265-
Assert(!slot->active);
265+
Assert(slot->active_pid==0);
266266
slot->data.persistency=persistency;
267267
slot->data.xmin=InvalidTransactionId;
268268
slot->effective_xmin=InvalidTransactionId;
@@ -291,8 +291,8 @@ ReplicationSlotCreate(const char *name, bool db_specific,
291291
volatileReplicationSlot*vslot=slot;
292292

293293
SpinLockAcquire(&slot->mutex);
294-
Assert(!vslot->active);
295-
vslot->active=true;
294+
Assert(vslot->active_pid==0);
295+
vslot->active_pid=MyProcPid;
296296
SpinLockRelease(&slot->mutex);
297297
MyReplicationSlot=slot;
298298
}
@@ -314,7 +314,7 @@ ReplicationSlotAcquire(const char *name)
314314
{
315315
ReplicationSlot*slot=NULL;
316316
inti;
317-
boolactive=false;
317+
intactive_pid=0;
318318

319319
Assert(MyReplicationSlot==NULL);
320320

@@ -331,8 +331,9 @@ ReplicationSlotAcquire(const char *name)
331331
volatileReplicationSlot*vslot=s;
332332

333333
SpinLockAcquire(&s->mutex);
334-
active=vslot->active;
335-
vslot->active= true;
334+
active_pid=vslot->active_pid;
335+
if (active_pid==0)
336+
vslot->active_pid=MyProcPid;
336337
SpinLockRelease(&s->mutex);
337338
slot=s;
338339
break;
@@ -345,10 +346,11 @@ ReplicationSlotAcquire(const char *name)
345346
ereport(ERROR,
346347
(errcode(ERRCODE_UNDEFINED_OBJECT),
347348
errmsg("replication slot \"%s\" does not exist",name)));
348-
if (active)
349+
if (active_pid!=0)
349350
ereport(ERROR,
350351
(errcode(ERRCODE_OBJECT_IN_USE),
351-
errmsg("replication slot \"%s\" is already active",name)));
352+
errmsg("replication slot \"%s\" is already active for pid %d",
353+
name,active_pid)));
352354

353355
/* We made this slot active, so it's ours now. */
354356
MyReplicationSlot=slot;
@@ -363,7 +365,7 @@ ReplicationSlotRelease(void)
363365
{
364366
ReplicationSlot*slot=MyReplicationSlot;
365367

366-
Assert(slot!=NULL&&slot->active);
368+
Assert(slot!=NULL&&slot->active_pid!=0);
367369

368370
if (slot->data.persistency==RS_EPHEMERAL)
369371
{
@@ -380,7 +382,7 @@ ReplicationSlotRelease(void)
380382
volatileReplicationSlot*vslot=slot;
381383

382384
SpinLockAcquire(&slot->mutex);
383-
vslot->active=false;
385+
vslot->active_pid=0;
384386
SpinLockRelease(&slot->mutex);
385387
}
386388

@@ -460,7 +462,7 @@ ReplicationSlotDropAcquired(void)
460462
boolfail_softly=slot->data.persistency==RS_EPHEMERAL;
461463

462464
SpinLockAcquire(&slot->mutex);
463-
vslot->active=false;
465+
vslot->active_pid=0;
464466
SpinLockRelease(&slot->mutex);
465467

466468
ereport(fail_softly ?WARNING :ERROR,
@@ -477,7 +479,7 @@ ReplicationSlotDropAcquired(void)
477479
* scanning the array.
478480
*/
479481
LWLockAcquire(ReplicationSlotControlLock,LW_EXCLUSIVE);
480-
slot->active=false;
482+
slot->active_pid=0;
481483
slot->in_use= false;
482484
LWLockRelease(ReplicationSlotControlLock);
483485

@@ -749,7 +751,7 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
749751
/* count slots with spinlock held */
750752
SpinLockAcquire(&s->mutex);
751753
(*nslots)++;
752-
if (s->active)
754+
if (s->active_pid!=0)
753755
(*nactive)++;
754756
SpinLockRelease(&s->mutex);
755757
}
@@ -1227,7 +1229,7 @@ RestoreSlotFromDisk(const char *name)
12271229
slot->candidate_restart_valid=InvalidXLogRecPtr;
12281230

12291231
slot->in_use= true;
1230-
slot->active=false;
1232+
slot->active_pid=0;
12311233

12321234
restored= true;
12331235
break;

‎src/backend/replication/slotfuncs.c

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
158158
Datum
159159
pg_get_replication_slots(PG_FUNCTION_ARGS)
160160
{
161-
#definePG_GET_REPLICATION_SLOTS_COLS8
161+
#definePG_GET_REPLICATION_SLOTS_COLS9
162162
ReturnSetInfo*rsinfo= (ReturnSetInfo*)fcinfo->resultinfo;
163163
TupleDesctupdesc;
164164
Tuplestorestate*tupstore;
@@ -206,7 +206,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
206206
TransactionIdxmin;
207207
TransactionIdcatalog_xmin;
208208
XLogRecPtrrestart_lsn;
209-
boolactive;
209+
pid_tactive_pid;
210210
Oiddatabase;
211211
NameDataslot_name;
212212
NameDataplugin;
@@ -227,7 +227,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
227227
namecpy(&slot_name,&slot->data.name);
228228
namecpy(&plugin,&slot->data.plugin);
229229

230-
active=slot->active;
230+
active_pid=slot->active_pid;
231231
}
232232
SpinLockRelease(&slot->mutex);
233233

@@ -251,7 +251,12 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
251251
else
252252
values[i++]=database;
253253

254-
values[i++]=BoolGetDatum(active);
254+
values[i++]=BoolGetDatum(active_pid!=0);
255+
256+
if (active_pid!=0)
257+
values[i++]=Int32GetDatum(active_pid);
258+
else
259+
nulls[i++]= true;
255260

256261
if (xmin!=InvalidTransactionId)
257262
values[i++]=TransactionIdGetDatum(xmin);

‎src/include/catalog/pg_proc.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5106,7 +5106,7 @@ DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0
51065106
DESCR("create a physical replication slot");
51075107
DATA(insert OID = 3780 ( pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2278 "19" _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ ));
51085108
DESCR("drop a replication slot");
5109-
DATA(insert OID = 3781 ( pg_get_replication_slotsPGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{19,19,25,26,16,28,28,3220}" "{o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,active,xmin,catalog_xmin,restart_lsn}" _null_ pg_get_replication_slots _null_ _null_ _null_ ));
5109+
DATA(insert OID = 3781 ( pg_get_replication_slotsPGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{19,19,25,26,16,23,28,28,3220}" "{o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,active,active_in,xmin,catalog_xmin,restart_lsn}" _null_ pg_get_replication_slots _null_ _null_ _null_ ));
51105110
DESCR("information about replication slots currently in use");
51115111
DATA(insert OID = 3786 ( pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 2 0 2249 "19 19" "{19,19,25,3220}" "{i,i,o,o}" "{slot_name,plugin,slot_name,xlog_position}" _null_ pg_create_logical_replication_slot _null_ _null_ _null_ ));
51125112
DESCR("set up a logical replication slot");

‎src/include/replication/slot.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,8 @@ typedef struct ReplicationSlot
8484
/* is this slot defined */
8585
boolin_use;
8686

87-
/* issomebodystreaming out changes for this slot */
88-
boolactive;
87+
/*Whois streaming out changes for this slot? 0 in unused slots. */
88+
pid_tactive_pid;
8989

9090
/* any outstanding modifications? */
9191
booljust_dirtied;

‎src/test/regress/expected/rules.out

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1396,10 +1396,11 @@ pg_replication_slots| SELECT l.slot_name,
13961396
l.datoid,
13971397
d.datname AS database,
13981398
l.active,
1399+
l.active_in,
13991400
l.xmin,
14001401
l.catalog_xmin,
14011402
l.restart_lsn
1402-
FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, active, xmin, catalog_xmin, restart_lsn)
1403+
FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, active,active_in,xmin, catalog_xmin, restart_lsn)
14031404
LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
14041405
pg_roles| SELECT pg_authid.rolname,
14051406
pg_authid.rolsuper,

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp