Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitb39c527

Browse files
author
Amit Kapila
committed
Don't advance origin during apply failure.
We advance origin progress during abort on successful streaming andapplication of ROLLBACK in parallel streaming mode. But the originshouldn't be advanced during an error or unsuccessful apply due toshutdown. Otherwise, it will result in a transaction loss as such atransaction won't be sent again by the server.Reported-by: Hou ZhijieAuthor: Hayato Kuroda and Shveta MalikReviewed-by: Amit KapilaBackpatch-through: 16Discussion:https://postgr.es/m/TYAPR01MB5692FAC23BE40C69DA8ED4AFF5B92@TYAPR01MB5692.jpnprd01.prod.outlook.com
1 parent25642b2 commitb39c527

File tree

4 files changed

+66
-1
lines changed

4 files changed

+66
-1
lines changed

‎src/backend/replication/logical/worker.c

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4404,6 +4404,17 @@ start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
44044404
pfree(syncslotname);
44054405
}
44064406

4407+
/*
4408+
* Reset the origin state.
4409+
*/
4410+
staticvoid
4411+
replorigin_reset(intcode,Datumarg)
4412+
{
4413+
replorigin_session_origin=InvalidRepOriginId;
4414+
replorigin_session_origin_lsn=InvalidXLogRecPtr;
4415+
replorigin_session_origin_timestamp=0;
4416+
}
4417+
44074418
/*
44084419
* Run the apply loop with error handling. Disable the subscription,
44094420
* if necessary.
@@ -4553,6 +4564,19 @@ ApplyWorkerMain(Datum main_arg)
45534564

45544565
InitializeApplyWorker();
45554566

4567+
/*
4568+
* Register a callback to reset the origin state before aborting any
4569+
* pending transaction during shutdown (see ShutdownPostgres()). This will
4570+
* avoid origin advancement for an in-complete transaction which could
4571+
* otherwise lead to its loss as such a transaction won't be sent by the
4572+
* server again.
4573+
*
4574+
* Note that even a LOG or DEBUG statement placed after setting the origin
4575+
* state may process a shutdown signal before committing the current apply
4576+
* operation. So, it is important to register such a callback here.
4577+
*/
4578+
before_shmem_exit(replorigin_reset, (Datum)0);
4579+
45564580
InitializingApplyWorker= false;
45574581

45584582
/* Connect to the origin and start the replication. */
@@ -4916,12 +4940,23 @@ void
49164940
apply_error_callback(void*arg)
49174941
{
49184942
ApplyErrorCallbackArg*errarg=&apply_error_callback_arg;
4943+
intelevel;
49194944

49204945
if (apply_error_callback_arg.command==0)
49214946
return;
49224947

49234948
Assert(errarg->origin_name);
49244949

4950+
elevel=geterrlevel();
4951+
4952+
/*
4953+
* Reset the origin state to prevent the advancement of origin progress if
4954+
* we fail to apply. Otherwise, this will result in transaction loss as
4955+
* that transaction won't be sent again by the server.
4956+
*/
4957+
if (elevel >=ERROR)
4958+
replorigin_reset(0, (Datum)0);
4959+
49254960
if (errarg->rel==NULL)
49264961
{
49274962
if (!TransactionIdIsValid(errarg->remote_xid))

‎src/backend/utils/error/elog.c

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1571,6 +1571,23 @@ geterrcode(void)
15711571
returnedata->sqlerrcode;
15721572
}
15731573

1574+
/*
1575+
* geterrlevel --- return the currently set SQLSTATE error level
1576+
*
1577+
* This is only intended for use in error callback subroutines, since there
1578+
* is no other place outside elog.c where the concept is meaningful.
1579+
*/
1580+
int
1581+
geterrlevel(void)
1582+
{
1583+
ErrorData*edata=&errordata[errordata_stack_depth];
1584+
1585+
/* we don't bother incrementing recursion_depth */
1586+
CHECK_STACK_DEPTH();
1587+
1588+
returnedata->elevel;
1589+
}
1590+
15741591
/*
15751592
* geterrposition --- return the currently set error position (0 if none)
15761593
*

‎src/include/utils/elog.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ extern intinternalerrquery(const char *query);
226226
externinterr_generic_string(intfield,constchar*str);
227227

228228
externintgeterrcode(void);
229+
externintgeterrlevel(void);
229230
externintgeterrposition(void);
230231
externintgetinternalerrposition(void);
231232

‎src/test/subscription/t/021_twophase.pl

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
my$node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
2424
$node_subscriber->init(allows_streaming=>'logical');
2525
$node_subscriber->append_conf('postgresql.conf',
26-
qq(max_prepared_transactions =10));
26+
qq(max_prepared_transactions =0));
2727
$node_subscriber->start;
2828

2929
# Create some pre-existing content on publisher
@@ -67,12 +67,24 @@
6767
# then COMMIT PREPARED
6868
###############################
6969

70+
# Save the log location, to see the failure of the application
71+
my$log_location =-s$node_subscriber->logfile;
72+
7073
$node_publisher->safe_psql(
7174
'postgres',"
7275
BEGIN;
7376
INSERT INTO tab_full VALUES (11);
7477
PREPARE TRANSACTION 'test_prepared_tab_full';");
7578

79+
# Confirm the ERROR is reported becasue max_prepared_transactions is zero
80+
$node_subscriber->wait_for_log(
81+
qr/ERROR: ( [A-Z0-9]+:)? prepared transactions are disabled/);
82+
83+
# Set max_prepared_transactions to correct value to resume the replication
84+
$node_subscriber->append_conf('postgresql.conf',
85+
qq(max_prepared_transactions = 10));
86+
$node_subscriber->restart;
87+
7688
$node_publisher->wait_for_catchup($appname);
7789

7890
# check that transaction is in prepared state on subscriber

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp