Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit3f28b2f

Browse files
author
Amit Kapila
committed
Don't advance origin during apply failure.
We advance origin progress during abort on successful streaming andapplication of ROLLBACK in parallel streaming mode. But the originshouldn't be advanced during an error or unsuccessful apply due toshutdown. Otherwise, it will result in a transaction loss as such atransaction won't be sent again by the server.Reported-by: Hou ZhijieAuthor: Hayato Kuroda and Shveta MalikReviewed-by: Amit KapilaBackpatch-through: 16Discussion:https://postgr.es/m/TYAPR01MB5692FAC23BE40C69DA8ED4AFF5B92@TYAPR01MB5692.jpnprd01.prod.outlook.com
1 parenta95ff1f commit3f28b2f

File tree

4 files changed

+66
-1
lines changed

4 files changed

+66
-1
lines changed

‎src/backend/replication/logical/worker.c

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4713,6 +4713,17 @@ InitializeLogRepWorker(void)
47134713
CommitTransactionCommand();
47144714
}
47154715

4716+
/*
4717+
* Reset the origin state.
4718+
*/
4719+
staticvoid
4720+
replorigin_reset(intcode,Datumarg)
4721+
{
4722+
replorigin_session_origin=InvalidRepOriginId;
4723+
replorigin_session_origin_lsn=InvalidXLogRecPtr;
4724+
replorigin_session_origin_timestamp=0;
4725+
}
4726+
47164727
/* Common function to setup the leader apply or tablesync worker. */
47174728
void
47184729
SetupApplyOrSyncWorker(intworker_slot)
@@ -4741,6 +4752,19 @@ SetupApplyOrSyncWorker(int worker_slot)
47414752

47424753
InitializeLogRepWorker();
47434754

4755+
/*
4756+
* Register a callback to reset the origin state before aborting any
4757+
* pending transaction during shutdown (see ShutdownPostgres()). This will
4758+
* avoid origin advancement for an in-complete transaction which could
4759+
* otherwise lead to its loss as such a transaction won't be sent by the
4760+
* server again.
4761+
*
4762+
* Note that even a LOG or DEBUG statement placed after setting the origin
4763+
* state may process a shutdown signal before committing the current apply
4764+
* operation. So, it is important to register such a callback here.
4765+
*/
4766+
before_shmem_exit(replorigin_reset, (Datum)0);
4767+
47444768
/* Connect to the origin and start the replication. */
47454769
elog(DEBUG1,"connecting to publisher using connection string \"%s\"",
47464770
MySubscription->conninfo);
@@ -4967,12 +4991,23 @@ void
49674991
apply_error_callback(void*arg)
49684992
{
49694993
ApplyErrorCallbackArg*errarg=&apply_error_callback_arg;
4994+
intelevel;
49704995

49714996
if (apply_error_callback_arg.command==0)
49724997
return;
49734998

49744999
Assert(errarg->origin_name);
49755000

5001+
elevel=geterrlevel();
5002+
5003+
/*
5004+
* Reset the origin state to prevent the advancement of origin progress if
5005+
* we fail to apply. Otherwise, this will result in transaction loss as
5006+
* that transaction won't be sent again by the server.
5007+
*/
5008+
if (elevel >=ERROR)
5009+
replorigin_reset(0, (Datum)0);
5010+
49765011
if (errarg->rel==NULL)
49775012
{
49785013
if (!TransactionIdIsValid(errarg->remote_xid))

‎src/backend/utils/error/elog.c

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1568,6 +1568,23 @@ geterrcode(void)
15681568
returnedata->sqlerrcode;
15691569
}
15701570

1571+
/*
1572+
* geterrlevel --- return the currently set error level
1573+
*
1574+
* This is only intended for use in error callback subroutines, since there
1575+
* is no other place outside elog.c where the concept is meaningful.
1576+
*/
1577+
int
1578+
geterrlevel(void)
1579+
{
1580+
ErrorData*edata=&errordata[errordata_stack_depth];
1581+
1582+
/* we don't bother incrementing recursion_depth */
1583+
CHECK_STACK_DEPTH();
1584+
1585+
returnedata->elevel;
1586+
}
1587+
15711588
/*
15721589
* geterrposition --- return the currently set error position (0 if none)
15731590
*

‎src/include/utils/elog.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ extern intinternalerrquery(const char *query);
226226
externinterr_generic_string(intfield,constchar*str);
227227

228228
externintgeterrcode(void);
229+
externintgeterrlevel(void);
229230
externintgeterrposition(void);
230231
externintgetinternalerrposition(void);
231232

‎src/test/subscription/t/021_twophase.pl

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
my$node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
2424
$node_subscriber->init;
2525
$node_subscriber->append_conf('postgresql.conf',
26-
qq(max_prepared_transactions =10));
26+
qq(max_prepared_transactions =0));
2727
$node_subscriber->start;
2828

2929
# Create some pre-existing content on publisher
@@ -67,12 +67,24 @@
6767
# then COMMIT PREPARED
6868
###############################
6969

70+
# Save the log location, to see the failure of the application
71+
my$log_location =-s$node_subscriber->logfile;
72+
7073
$node_publisher->safe_psql(
7174
'postgres',"
7275
BEGIN;
7376
INSERT INTO tab_full VALUES (11);
7477
PREPARE TRANSACTION 'test_prepared_tab_full';");
7578

79+
# Confirm the ERROR is reported becasue max_prepared_transactions is zero
80+
$node_subscriber->wait_for_log(
81+
qr/ERROR: ( [A-Z0-9]+:)? prepared transactions are disabled/);
82+
83+
# Set max_prepared_transactions to correct value to resume the replication
84+
$node_subscriber->append_conf('postgresql.conf',
85+
qq(max_prepared_transactions = 10));
86+
$node_subscriber->restart;
87+
7688
$node_publisher->wait_for_catchup($appname);
7789

7890
# check that transaction is in prepared state on subscriber

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp