Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commita11f330

Browse files
author
Amit Kapila
committed
Track last_inactive_time in pg_replication_slots.
This commit adds a new property called last_inactive_time for slots. It isset to 0 whenever a slot is made active/acquired and set to the currenttimestamp whenever the slot is inactive/released or restored from the disk.Note that we don't set the last_inactive_time for the slots currently beingsynced from the primary to the standby because such slots are typicallyinactive as decoding is not allowed on those.The 'last_inactive_time' will be useful on production servers to debug andanalyze inactive replication slots. It will also help to know the lifetimeof a replication slot - one can know how long a streaming standby, logicalsubscriber, or replication slot consumer is down.The 'last_inactive_time' will also be useful to implement inactivetimeout-based replication slot invalidation in a future commit.Author: Bharath RupireddyReviewed-by: Bertrand Drouvot, Amit Kapila, Shveta MalikDiscussion:https://www.postgresql.org/message-id/CALj2ACW4aUe-_uFQOjdWCEN-xXoLGhmvRFnL8SNw_TZ5nJe+aw@mail.gmail.com
1 parent0f7863a commita11f330

File tree

9 files changed

+213
-6
lines changed

9 files changed

+213
-6
lines changed

‎doc/src/sgml/system-views.sgml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2523,6 +2523,16 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
25232523
</para></entry>
25242524
</row>
25252525

2526+
<row>
2527+
<entry role="catalog_table_entry"><para role="column_definition">
2528+
<structfield>last_inactive_time</structfield> <type>timestamptz</type>
2529+
</para>
2530+
<para>
2531+
The time at which the slot became inactive.
2532+
<literal>NULL</literal> if the slot is currently being used.
2533+
</para></entry>
2534+
</row>
2535+
25262536
<row>
25272537
<entry role="catalog_table_entry"><para role="column_definition">
25282538
<structfield>conflicting</structfield> <type>bool</type>

‎src/backend/catalog/system_views.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1023,6 +1023,7 @@ CREATE VIEW pg_replication_slots AS
10231023
L.wal_status,
10241024
L.safe_wal_size,
10251025
L.two_phase,
1026+
L.last_inactive_time,
10261027
L.conflicting,
10271028
L.invalidation_reason,
10281029
L.failover,

‎src/backend/replication/slot.c

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
409409
slot->candidate_restart_valid=InvalidXLogRecPtr;
410410
slot->candidate_restart_lsn=InvalidXLogRecPtr;
411411
slot->last_saved_confirmed_flush=InvalidXLogRecPtr;
412+
slot->last_inactive_time=0;
412413

413414
/*
414415
* Create the slot on disk. We haven't actually marked the slot allocated
@@ -622,6 +623,11 @@ ReplicationSlotAcquire(const char *name, bool nowait)
622623
if (SlotIsLogical(s))
623624
pgstat_acquire_replslot(s);
624625

626+
/* Reset the last inactive time as the slot is active now. */
627+
SpinLockAcquire(&s->mutex);
628+
s->last_inactive_time=0;
629+
SpinLockRelease(&s->mutex);
630+
625631
if (am_walsender)
626632
{
627633
ereport(log_replication_commands ?LOG :DEBUG1,
@@ -645,6 +651,7 @@ ReplicationSlotRelease(void)
645651
ReplicationSlot*slot=MyReplicationSlot;
646652
char*slotname=NULL;/* keep compiler quiet */
647653
boolis_logical= false;/* keep compiler quiet */
654+
TimestampTznow=0;
648655

649656
Assert(slot!=NULL&&slot->active_pid!=0);
650657

@@ -679,6 +686,15 @@ ReplicationSlotRelease(void)
679686
ReplicationSlotsComputeRequiredXmin(false);
680687
}
681688

689+
/*
690+
* Set the last inactive time after marking the slot inactive. We don't set
691+
* it for the slots currently being synced from the primary to the standby
692+
* because such slots are typically inactive as decoding is not allowed on
693+
* those.
694+
*/
695+
if (!(RecoveryInProgress()&&slot->data.synced))
696+
now=GetCurrentTimestamp();
697+
682698
if (slot->data.persistency==RS_PERSISTENT)
683699
{
684700
/*
@@ -687,9 +703,16 @@ ReplicationSlotRelease(void)
687703
*/
688704
SpinLockAcquire(&slot->mutex);
689705
slot->active_pid=0;
706+
slot->last_inactive_time=now;
690707
SpinLockRelease(&slot->mutex);
691708
ConditionVariableBroadcast(&slot->active_cv);
692709
}
710+
else
711+
{
712+
SpinLockAcquire(&slot->mutex);
713+
slot->last_inactive_time=now;
714+
SpinLockRelease(&slot->mutex);
715+
}
693716

694717
MyReplicationSlot=NULL;
695718

@@ -2342,6 +2365,18 @@ RestoreSlotFromDisk(const char *name)
23422365
slot->in_use= true;
23432366
slot->active_pid=0;
23442367

2368+
/*
2369+
* We set the last inactive time after loading the slot from the disk
2370+
* into memory. Whoever acquires the slot i.e. makes the slot active
2371+
* will reset it. We don't set it for the slots currently being synced
2372+
* from the primary to the standby because such slots are typically
2373+
* inactive as decoding is not allowed on those.
2374+
*/
2375+
if (!(RecoveryInProgress()&&slot->data.synced))
2376+
slot->last_inactive_time=GetCurrentTimestamp();
2377+
else
2378+
slot->last_inactive_time=0;
2379+
23452380
restored= true;
23462381
break;
23472382
}

‎src/backend/replication/slotfuncs.c

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
239239
Datum
240240
pg_get_replication_slots(PG_FUNCTION_ARGS)
241241
{
242-
#definePG_GET_REPLICATION_SLOTS_COLS18
242+
#definePG_GET_REPLICATION_SLOTS_COLS19
243243
ReturnSetInfo*rsinfo= (ReturnSetInfo*)fcinfo->resultinfo;
244244
XLogRecPtrcurrlsn;
245245
intslotno;
@@ -410,6 +410,11 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
410410

411411
values[i++]=BoolGetDatum(slot_contents.data.two_phase);
412412

413+
if (slot_contents.last_inactive_time>0)
414+
values[i++]=TimestampTzGetDatum(slot_contents.last_inactive_time);
415+
else
416+
nulls[i++]= true;
417+
413418
cause=slot_contents.data.invalidated;
414419

415420
if (SlotIsPhysical(&slot_contents))

‎src/include/catalog/catversion.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,6 @@
5757
*/
5858

5959
/*yyyymmddN */
60-
#defineCATALOG_VERSION_NO202403222
60+
#defineCATALOG_VERSION_NO202403251
6161

6262
#endif

‎src/include/catalog/pg_proc.dat

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11133,9 +11133,9 @@
1113311133
proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
1113411134
proretset => 't', provolatile => 's', prorettype => 'record',
1113511135
proargtypes => '',
11136-
proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool,text,bool,bool}',
11137-
proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
11138-
proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting,invalidation_reason,failover,synced}',
11136+
proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,timestamptz,bool,text,bool,bool}',
11137+
proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
11138+
proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,last_inactive_time,conflicting,invalidation_reason,failover,synced}',
1113911139
prosrc => 'pg_get_replication_slots' },
1114011140
{ oid => '3786', descr => 'set up a logical replication slot',
1114111141
proname => 'pg_create_logical_replication_slot', provolatile => 'v',

‎src/include/replication/slot.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,9 @@ typedef struct ReplicationSlot
201201
* forcibly flushed or not.
202202
*/
203203
XLogRecPtrlast_saved_confirmed_flush;
204+
205+
/* The time at which this slot becomes inactive */
206+
TimestampTzlast_inactive_time;
204207
}ReplicationSlot;
205208

206209
#defineSlotIsPhysical(slot) ((slot)->data.database == InvalidOid)

‎src/test/recovery/t/019_replslot_limit.pl

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,4 +410,156 @@
410410
$node_primary3->stop;
411411
$node_standby3->stop;
412412

413+
# =============================================================================
414+
# Testcase start: Check last_inactive_time property of the streaming standby's slot
415+
#
416+
417+
# Initialize primary node
418+
my$primary4 = PostgreSQL::Test::Cluster->new('primary4');
419+
$primary4->init(allows_streaming=>'logical');
420+
$primary4->start;
421+
422+
# Take backup
423+
$backup_name ='my_backup4';
424+
$primary4->backup($backup_name);
425+
426+
# Create a standby linking to the primary using the replication slot
427+
my$standby4 = PostgreSQL::Test::Cluster->new('standby4');
428+
$standby4->init_from_backup($primary4,$backup_name,has_streaming=> 1);
429+
430+
my$sb4_slot ='sb4_slot';
431+
$standby4->append_conf('postgresql.conf',"primary_slot_name = '$sb4_slot'");
432+
433+
my$slot_creation_time =$primary4->safe_psql(
434+
'postgres',qq[
435+
SELECT current_timestamp;
436+
]);
437+
438+
$primary4->safe_psql(
439+
'postgres',qq[
440+
SELECT pg_create_physical_replication_slot(slot_name := '$sb4_slot');
441+
]);
442+
443+
# Get last_inactive_time value after the slot's creation. Note that the slot
444+
# is still inactive till it's used by the standby below.
445+
my$last_inactive_time =
446+
capture_and_validate_slot_last_inactive_time($primary4,$sb4_slot,$slot_creation_time);
447+
448+
$standby4->start;
449+
450+
# Wait until standby has replayed enough data
451+
$primary4->wait_for_catchup($standby4);
452+
453+
# Now the slot is active so last_inactive_time value must be NULL
454+
is($primary4->safe_psql(
455+
'postgres',
456+
qq[SELECT last_inactive_time IS NULL FROM pg_replication_slots WHERE slot_name = '$sb4_slot';]
457+
),
458+
't',
459+
'last inactive time for an active physical slot is NULL');
460+
461+
# Stop the standby to check its last_inactive_time value is updated
462+
$standby4->stop;
463+
464+
# Let's restart the primary so that the last_inactive_time is set upon
465+
# loading the slot from the disk.
466+
$primary4->restart;
467+
468+
is($primary4->safe_psql(
469+
'postgres',
470+
qq[SELECT last_inactive_time > '$last_inactive_time'::timestamptz FROM pg_replication_slots WHERE slot_name = '$sb4_slot' AND last_inactive_time IS NOT NULL;]
471+
),
472+
't',
473+
'last inactive time for an inactive physical slot is updated correctly');
474+
475+
$standby4->stop;
476+
477+
# Testcase end: Check last_inactive_time property of the streaming standby's slot
478+
# =============================================================================
479+
480+
# =============================================================================
481+
# Testcase start: Check last_inactive_time property of the logical subscriber's slot
482+
my$publisher4 =$primary4;
483+
484+
# Create subscriber node
485+
my$subscriber4 = PostgreSQL::Test::Cluster->new('subscriber4');
486+
$subscriber4->init;
487+
488+
# Setup logical replication
489+
my$publisher4_connstr =$publisher4->connstr .' dbname=postgres';
490+
$publisher4->safe_psql('postgres',"CREATE PUBLICATION pub FOR ALL TABLES");
491+
492+
$slot_creation_time =$publisher4->safe_psql(
493+
'postgres',qq[
494+
SELECT current_timestamp;
495+
]);
496+
497+
my$lsub4_slot ='lsub4_slot';
498+
$publisher4->safe_psql('postgres',
499+
"SELECT pg_create_logical_replication_slot(slot_name := '$lsub4_slot', plugin := 'pgoutput');"
500+
);
501+
502+
# Get last_inactive_time value after the slot's creation. Note that the slot
503+
# is still inactive till it's used by the subscriber below.
504+
$last_inactive_time =
505+
capture_and_validate_slot_last_inactive_time($publisher4,$lsub4_slot,$slot_creation_time);
506+
507+
$subscriber4->start;
508+
$subscriber4->safe_psql('postgres',
509+
"CREATE SUBSCRIPTION sub CONNECTION '$publisher4_connstr' PUBLICATION pub WITH (slot_name = '$lsub4_slot', create_slot = false)"
510+
);
511+
512+
# Wait until subscriber has caught up
513+
$subscriber4->wait_for_subscription_sync($publisher4,'sub');
514+
515+
# Now the slot is active so last_inactive_time value must be NULL
516+
is($publisher4->safe_psql(
517+
'postgres',
518+
qq[SELECT last_inactive_time IS NULL FROM pg_replication_slots WHERE slot_name = '$lsub4_slot';]
519+
),
520+
't',
521+
'last inactive time for an active logical slot is NULL');
522+
523+
# Stop the subscriber to check its last_inactive_time value is updated
524+
$subscriber4->stop;
525+
526+
# Let's restart the publisher so that the last_inactive_time is set upon
527+
# loading the slot from the disk.
528+
$publisher4->restart;
529+
530+
is($publisher4->safe_psql(
531+
'postgres',
532+
qq[SELECT last_inactive_time > '$last_inactive_time'::timestamptz FROM pg_replication_slots WHERE slot_name = '$lsub4_slot' AND last_inactive_time IS NOT NULL;]
533+
),
534+
't',
535+
'last inactive time for an inactive logical slot is updated correctly');
536+
537+
# Testcase end: Check last_inactive_time property of the logical subscriber's slot
538+
# =============================================================================
539+
540+
$publisher4->stop;
541+
$subscriber4->stop;
542+
543+
# Capture and validate last_inactive_time of a given slot.
544+
subcapture_and_validate_slot_last_inactive_time
545+
{
546+
my ($node,$slot_name,$slot_creation_time) =@_;
547+
548+
my$last_inactive_time =$node->safe_psql('postgres',
549+
qq(SELECT last_inactive_time FROM pg_replication_slots
550+
WHERE slot_name = '$slot_name' AND last_inactive_time IS NOT NULL;)
551+
);
552+
553+
# Check that the captured time is sane
554+
is($node->safe_psql(
555+
'postgres',
556+
qq[SELECT '$last_inactive_time'::timestamptz > to_timestamp(0) AND
557+
'$last_inactive_time'::timestamptz >= '$slot_creation_time'::timestamptz;]
558+
),
559+
't',
560+
"last inactive time for an active slot$slot_name is sane");
561+
562+
return$last_inactive_time;
563+
}
564+
413565
done_testing();

‎src/test/regress/expected/rules.out

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1473,11 +1473,12 @@ pg_replication_slots| SELECT l.slot_name,
14731473
l.wal_status,
14741474
l.safe_wal_size,
14751475
l.two_phase,
1476+
l.last_inactive_time,
14761477
l.conflicting,
14771478
l.invalidation_reason,
14781479
l.failover,
14791480
l.synced
1480-
FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting, invalidation_reason, failover, synced)
1481+
FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase,last_inactive_time,conflicting, invalidation_reason, failover, synced)
14811482
LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
14821483
pg_roles| SELECT pg_authid.rolname,
14831484
pg_authid.rolsuper,

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp