Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit9864cf8

Browse files
Amit Kapilapull[bot]
Amit Kapila
authored andcommitted
Flush logical slots to disk during a shutdown checkpoint if required.
It's entirely possible for a logical slot to have a confirmed_flush LSNhigher than the last value saved on disk while not being marked as dirty.Currently, it is not a major problem but a later patch adding support forthe upgrade of slots relies on that value being properly flushed to disk.It can also help avoid processing the same transactions again in someboundary cases after the clean shutdown and restart. Say, we processsome transactions for which we didn't send anything downstream (thechanges got filtered) but the confirm_flush LSN is updated due tokeepalives. As we don't flush the latest value of confirm_flush LSN, itmay lead to processing the same changes again without this patch.The approach taken by this patch has been suggested by Ashutosh Bapat.Author: Vignesh C, Julien Rouhaud, Kuroda HayatoReviewed-by: Amit Kapila, Dilip Kumar, Michael Paquier, Ashutosh Bapat, Peter Smith, Hou ZhijieDiscussion:http://postgr.es/m/CAA4eK1JzJagMmb_E8D4au=GYQkxox0AfNBm1FbP7sy7t4YWXPQ@mail.gmail.comDiscussion:http://postgr.es/m/TYAPR01MB58664C81887B3AF2EB6B16E3F5939@TYAPR01MB5866.jpnprd01.prod.outlook.com
1 parent495b31e commit9864cf8

File tree

5 files changed

+145
-6
lines changed

5 files changed

+145
-6
lines changed

‎src/backend/access/transam/xlog.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7039,7 +7039,7 @@ static void
70397039
CheckPointGuts(XLogRecPtrcheckPointRedo,intflags)
70407040
{
70417041
CheckPointRelationMap();
7042-
CheckPointReplicationSlots();
7042+
CheckPointReplicationSlots(flags&CHECKPOINT_IS_SHUTDOWN);
70437043
CheckPointSnapBuild();
70447044
CheckPointLogicalRewriteHeap();
70457045
CheckPointReplicationOrigin();

‎src/backend/replication/slot.c

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
321321
slot->candidate_xmin_lsn=InvalidXLogRecPtr;
322322
slot->candidate_restart_valid=InvalidXLogRecPtr;
323323
slot->candidate_restart_lsn=InvalidXLogRecPtr;
324+
slot->last_saved_confirmed_flush=InvalidXLogRecPtr;
324325

325326
/*
326327
* Create the slot on disk. We haven't actually marked the slot allocated
@@ -1572,11 +1573,13 @@ InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause,
15721573
/*
15731574
* Flush all replication slots to disk.
15741575
*
1575-
* This needn't actually be part of a checkpoint, but it's a convenient
1576-
* location.
1576+
* It is convenient to flush dirty replication slots at the time of checkpoint.
1577+
* Additionally, in case of a shutdown checkpoint, we also identify the slots
1578+
* for which the confirmed_flush LSN has been updated since the last time it
1579+
* was saved and flush them.
15771580
*/
15781581
void
1579-
CheckPointReplicationSlots(void)
1582+
CheckPointReplicationSlots(boolis_shutdown)
15801583
{
15811584
inti;
15821585

@@ -1601,6 +1604,30 @@ CheckPointReplicationSlots(void)
16011604

16021605
/* save the slot to disk, locking is handled in SaveSlotToPath() */
16031606
sprintf(path,"pg_replslot/%s",NameStr(s->data.name));
1607+
1608+
/*
1609+
* Slot's data is not flushed each time the confirmed_flush LSN is
1610+
* updated as that could lead to frequent writes. However, we decide
1611+
* to force a flush of all logical slot's data at the time of shutdown
1612+
* if the confirmed_flush LSN is changed since we last flushed it to
1613+
* disk. This helps in avoiding an unnecessary retreat of the
1614+
* confirmed_flush LSN after restart.
1615+
*/
1616+
if (is_shutdown&&SlotIsLogical(s))
1617+
{
1618+
SpinLockAcquire(&s->mutex);
1619+
1620+
Assert(s->data.confirmed_flush >=s->last_saved_confirmed_flush);
1621+
1622+
if (s->data.invalidated==RS_INVAL_NONE&&
1623+
s->data.confirmed_flush!=s->last_saved_confirmed_flush)
1624+
{
1625+
s->just_dirtied= true;
1626+
s->dirty= true;
1627+
}
1628+
SpinLockRelease(&s->mutex);
1629+
}
1630+
16041631
SaveSlotToPath(s,path,LOG);
16051632
}
16061633
LWLockRelease(ReplicationSlotAllocationLock);
@@ -1873,11 +1900,12 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
18731900

18741901
/*
18751902
* Successfully wrote, unset dirty bit, unless somebody dirtied again
1876-
* already.
1903+
* already and remember the confirmed_flush LSN value.
18771904
*/
18781905
SpinLockAcquire(&slot->mutex);
18791906
if (!slot->just_dirtied)
18801907
slot->dirty= false;
1908+
slot->last_saved_confirmed_flush=cp.slotdata.confirmed_flush;
18811909
SpinLockRelease(&slot->mutex);
18821910

18831911
LWLockRelease(&slot->io_in_progress_lock);
@@ -2074,6 +2102,7 @@ RestoreSlotFromDisk(const char *name)
20742102
/* initialize in memory state */
20752103
slot->effective_xmin=cp.slotdata.xmin;
20762104
slot->effective_catalog_xmin=cp.slotdata.catalog_xmin;
2105+
slot->last_saved_confirmed_flush=cp.slotdata.confirmed_flush;
20772106

20782107
slot->candidate_catalog_xmin=InvalidTransactionId;
20792108
slot->candidate_xmin_lsn=InvalidXLogRecPtr;

‎src/include/replication/slot.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,13 @@ typedef struct ReplicationSlot
178178
XLogRecPtrcandidate_xmin_lsn;
179179
XLogRecPtrcandidate_restart_valid;
180180
XLogRecPtrcandidate_restart_lsn;
181+
182+
/*
183+
* This value tracks the last confirmed_flush LSN flushed which is used
184+
* during a shutdown checkpoint to decide if logical's slot data should be
185+
* forcibly flushed or not.
186+
*/
187+
XLogRecPtrlast_saved_confirmed_flush;
181188
}ReplicationSlot;
182189

183190
#defineSlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
@@ -241,7 +248,7 @@ extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslo
241248
externvoidReplicationSlotDropAtPubNode(WalReceiverConn*wrconn,char*slotname,boolmissing_ok);
242249

243250
externvoidStartupReplicationSlots(void);
244-
externvoidCheckPointReplicationSlots(void);
251+
externvoidCheckPointReplicationSlots(boolis_shutdown);
245252

246253
externvoidCheckSlotRequirements(void);
247254
externvoidCheckSlotPermissions(void);

‎src/test/recovery/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ tests += {
4343
't/035_standby_logical_decoding.pl',
4444
't/036_truncated_dropped.pl',
4545
't/037_invalid_database.pl',
46+
't/038_save_logical_slots_shutdown.pl',
4647
],
4748
},
4849
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
2+
# Copyright (c) 2023, PostgreSQL Global Development Group
3+
4+
# Test logical replication slots are always flushed to disk during a shutdown
5+
# checkpoint.
6+
7+
use strict;
8+
use warnings;
9+
10+
use PostgreSQL::Test::Cluster;
11+
use PostgreSQL::Test::Utils;
12+
use Test::More;
13+
14+
subcompare_confirmed_flush
15+
{
16+
my ($node,$confirmed_flush_from_log) =@_;
17+
18+
# Fetch Latest checkpoint location from the control file
19+
my ($stdout,$stderr) =
20+
run_command(['pg_controldata',$node->data_dir ]);
21+
my@control_data =split("\n",$stdout);
22+
my$latest_checkpoint =undef;
23+
foreach (@control_data)
24+
{
25+
if ($_ =~/^Latest checkpoint location:\s*(.*)$/mg)
26+
{
27+
$latest_checkpoint =$1;
28+
last;
29+
}
30+
}
31+
die"Latest checkpoint location not found in control file\n"
32+
unlessdefined($latest_checkpoint);
33+
34+
# Is it same as the value read from log?
35+
ok($latest_checkpointeq$confirmed_flush_from_log,
36+
"Check that the slot's confirmed_flush LSN is the same as the latest_checkpoint location"
37+
);
38+
39+
return;
40+
}
41+
42+
# Initialize publisher node
43+
my$node_publisher = PostgreSQL::Test::Cluster->new('pub');
44+
$node_publisher->init(allows_streaming=>'logical');
45+
# Avoid checkpoint during the test, otherwise, the latest checkpoint location
46+
# will change.
47+
$node_publisher->append_conf(
48+
'postgresql.conf',q{
49+
checkpoint_timeout = 1h
50+
autovacuum = off
51+
});
52+
$node_publisher->start;
53+
54+
# Create subscriber node
55+
my$node_subscriber = PostgreSQL::Test::Cluster->new('sub');
56+
$node_subscriber->init(allows_streaming=>'logical');
57+
$node_subscriber->start;
58+
59+
# Create tables
60+
$node_publisher->safe_psql('postgres',"CREATE TABLE test_tbl (id int)");
61+
$node_subscriber->safe_psql('postgres',"CREATE TABLE test_tbl (id int)");
62+
63+
# Insert some data
64+
$node_publisher->safe_psql('postgres',
65+
"INSERT INTO test_tbl VALUES (generate_series(1, 5));");
66+
67+
# Setup logical replication
68+
my$publisher_connstr =$node_publisher->connstr .' dbname=postgres';
69+
$node_publisher->safe_psql('postgres',
70+
"CREATE PUBLICATION pub FOR ALL TABLES");
71+
$node_subscriber->safe_psql('postgres',
72+
"CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub"
73+
);
74+
75+
$node_subscriber->wait_for_subscription_sync($node_publisher,'sub');
76+
77+
my$result =
78+
$node_subscriber->safe_psql('postgres',"SELECT count(*) FROM test_tbl");
79+
80+
is($result,qq(5),"check initial copy was done");
81+
82+
my$offset =-s$node_publisher->logfile;
83+
84+
# Restart the publisher to ensure that the slot will be flushed if required
85+
$node_publisher->restart();
86+
87+
# Wait until the walsender creates decoding context
88+
$node_publisher->wait_for_log(
89+
qr/Streaming transactions committing after ([A-F0-9]+\/[A-F0-9]+), reading WAL from ([A-F0-9]+\/[A-F0-9]+)./,
90+
$offset);
91+
92+
# Extract confirmed_flush from the logfile
93+
my$log_contents = slurp_file($node_publisher->logfile,$offset);
94+
$log_contents =~
95+
qr/Streaming transactions committing after ([A-F0-9]+\/[A-F0-9]+), reading WAL from ([A-F0-9]+\/[A-F0-9]+)./
96+
ordie"could not get confirmed_flush_lsn";
97+
98+
# Ensure that the slot's confirmed_flush LSN is the same as the
99+
# latest_checkpoint location.
100+
compare_confirmed_flush($node_publisher,$1);
101+
102+
done_testing();

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp