Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit915aafe

Browse files
author
Amit Kapila
committed
Don't advance origin during apply failure.
We advance origin progress during abort on successful streaming andapplication of ROLLBACK in parallel streaming mode. But the originshouldn't be advanced during an error or unsuccessful apply due toshutdown. Otherwise, it will result in a transaction loss as such atransaction won't be sent again by the server.Reported-by: Hou ZhijieAuthor: Hayato Kuroda and Shveta MalikReviewed-by: Amit KapilaBackpatch-through: 16Discussion:https://postgr.es/m/TYAPR01MB5692FAC23BE40C69DA8ED4AFF5B92@TYAPR01MB5692.jpnprd01.prod.outlook.com
1 parent5effd59 commit915aafe

File tree

4 files changed

+66
-1
lines changed

4 files changed

+66
-1
lines changed

‎src/backend/replication/logical/worker.c

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4639,6 +4639,17 @@ InitializeLogRepWorker(void)
46394639
CommitTransactionCommand();
46404640
}
46414641

4642+
/*
4643+
* Reset the origin state.
4644+
*/
4645+
staticvoid
4646+
replorigin_reset(intcode,Datumarg)
4647+
{
4648+
replorigin_session_origin=InvalidRepOriginId;
4649+
replorigin_session_origin_lsn=InvalidXLogRecPtr;
4650+
replorigin_session_origin_timestamp=0;
4651+
}
4652+
46424653
/* Common function to setup the leader apply or tablesync worker. */
46434654
void
46444655
SetupApplyOrSyncWorker(intworker_slot)
@@ -4667,6 +4678,19 @@ SetupApplyOrSyncWorker(int worker_slot)
46674678

46684679
InitializeLogRepWorker();
46694680

4681+
/*
4682+
* Register a callback to reset the origin state before aborting any
4683+
* pending transaction during shutdown (see ShutdownPostgres()). This will
4684+
* avoid origin advancement for an in-complete transaction which could
4685+
* otherwise lead to its loss as such a transaction won't be sent by the
4686+
* server again.
4687+
*
4688+
* Note that even a LOG or DEBUG statement placed after setting the origin
4689+
* state may process a shutdown signal before committing the current apply
4690+
* operation. So, it is important to register such a callback here.
4691+
*/
4692+
before_shmem_exit(replorigin_reset, (Datum)0);
4693+
46704694
/* Connect to the origin and start the replication. */
46714695
elog(DEBUG1,"connecting to publisher using connection string \"%s\"",
46724696
MySubscription->conninfo);
@@ -4893,12 +4917,23 @@ void
48934917
apply_error_callback(void*arg)
48944918
{
48954919
ApplyErrorCallbackArg*errarg=&apply_error_callback_arg;
4920+
intelevel;
48964921

48974922
if (apply_error_callback_arg.command==0)
48984923
return;
48994924

49004925
Assert(errarg->origin_name);
49014926

4927+
elevel=geterrlevel();
4928+
4929+
/*
4930+
* Reset the origin state to prevent the advancement of origin progress if
4931+
* we fail to apply. Otherwise, this will result in transaction loss as
4932+
* that transaction won't be sent again by the server.
4933+
*/
4934+
if (elevel >=ERROR)
4935+
replorigin_reset(0, (Datum)0);
4936+
49024937
if (errarg->rel==NULL)
49034938
{
49044939
if (!TransactionIdIsValid(errarg->remote_xid))

‎src/backend/utils/error/elog.c

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1568,6 +1568,23 @@ geterrcode(void)
15681568
returnedata->sqlerrcode;
15691569
}
15701570

1571+
/*
1572+
* geterrlevel --- return the currently set error level
1573+
*
1574+
* This is only intended for use in error callback subroutines, since there
1575+
* is no other place outside elog.c where the concept is meaningful.
1576+
*/
1577+
int
1578+
geterrlevel(void)
1579+
{
1580+
ErrorData*edata=&errordata[errordata_stack_depth];
1581+
1582+
/* we don't bother incrementing recursion_depth */
1583+
CHECK_STACK_DEPTH();
1584+
1585+
returnedata->elevel;
1586+
}
1587+
15711588
/*
15721589
* geterrposition --- return the currently set error position (0 if none)
15731590
*

‎src/include/utils/elog.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ extern intinternalerrquery(const char *query);
226226
externinterr_generic_string(intfield,constchar*str);
227227

228228
externintgeterrcode(void);
229+
externintgeterrlevel(void);
229230
externintgeterrposition(void);
230231
externintgetinternalerrposition(void);
231232

‎src/test/subscription/t/021_twophase.pl

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
my$node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
2424
$node_subscriber->init;
2525
$node_subscriber->append_conf('postgresql.conf',
26-
qq(max_prepared_transactions =10));
26+
qq(max_prepared_transactions =0));
2727
$node_subscriber->start;
2828

2929
# Create some pre-existing content on publisher
@@ -67,12 +67,24 @@
6767
# then COMMIT PREPARED
6868
###############################
6969

70+
# Save the log location, to see the failure of the application
71+
my$log_location =-s$node_subscriber->logfile;
72+
7073
$node_publisher->safe_psql(
7174
'postgres',"
7275
BEGIN;
7376
INSERT INTO tab_full VALUES (11);
7477
PREPARE TRANSACTION 'test_prepared_tab_full';");
7578

79+
# Confirm the ERROR is reported becasue max_prepared_transactions is zero
80+
$node_subscriber->wait_for_log(
81+
qr/ERROR: ( [A-Z0-9]+:)? prepared transactions are disabled/);
82+
83+
# Set max_prepared_transactions to correct value to resume the replication
84+
$node_subscriber->append_conf('postgresql.conf',
85+
qq(max_prepared_transactions = 10));
86+
$node_subscriber->restart;
87+
7688
$node_publisher->wait_for_catchup($appname);
7789

7890
# check that transaction is in prepared state on subscriber

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp