Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit05676d8

Browse files
author
Amit Kapila
committed
Fix an oversight in3f28b2f.
Commit3f28b2f tried to ensure that the replication origin shouldn't beadvanced in case of an ERROR in the apply worker, so that it can requestthe same data again after restart. However, it is possible that an ERRORwas caught and handled by a (say PL/pgSQL) function, and the apply workercontinues to apply further changes, in which case, we shouldn't reset thereplication origin.Ensure to reset the origin only when the apply worker exits after anERROR.Commit3f28b2f added new function geterrlevel, which we removed in HEADas part of this commit, but kept it in backbranches to avoid breaking anyapplications. A separate case can be made to have such a function even forHEAD.Reported-by: Shawn McCoy <shawn.the.mccoy@gmail.com>Author: Hayato Kuroda <kuroda.hayato@fujitsu.com>Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>Reviewed-by: vignesh C <vignesh21@gmail.com>Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>Backpatch-through: 16, where it was introducedDiscussion:https://postgr.es/m/CALsgZNCGARa2mcYNVTSj9uoPcJo-tPuWUGECReKpNgTpo31_Pw@mail.gmail.com
1 parent4b6331e commit05676d8

File tree

2 files changed

+87
-11
lines changed

2 files changed

+87
-11
lines changed

‎src/backend/replication/logical/worker.c

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,8 @@ static inline void reset_apply_error_context_info(void);
416416
staticTransApplyActionget_transaction_apply_action(TransactionIdxid,
417417
ParallelApplyWorkerInfo**winfo);
418418

419+
staticvoidreplorigin_reset(intcode,Datumarg);
420+
419421
/*
420422
* Form the origin name for the subscription.
421423
*
@@ -4441,6 +4443,14 @@ start_apply(XLogRecPtr origin_startpos)
44414443
}
44424444
PG_CATCH();
44434445
{
4446+
/*
4447+
* Reset the origin state to prevent the advancement of origin
4448+
* progress if we fail to apply. Otherwise, this will result in
4449+
* transaction loss as that transaction won't be sent again by the
4450+
* server.
4451+
*/
4452+
replorigin_reset(0, (Datum)0);
4453+
44444454
if (MySubscription->disableonerr)
44454455
DisableSubscriptionAndExit();
44464456
else
@@ -4928,23 +4938,12 @@ void
49284938
apply_error_callback(void*arg)
49294939
{
49304940
ApplyErrorCallbackArg*errarg=&apply_error_callback_arg;
4931-
intelevel;
49324941

49334942
if (apply_error_callback_arg.command==0)
49344943
return;
49354944

49364945
Assert(errarg->origin_name);
49374946

4938-
elevel=geterrlevel();
4939-
4940-
/*
4941-
* Reset the origin state to prevent the advancement of origin progress if
4942-
* we fail to apply. Otherwise, this will result in transaction loss as
4943-
* that transaction won't be sent again by the server.
4944-
*/
4945-
if (elevel >=ERROR)
4946-
replorigin_reset(0, (Datum)0);
4947-
49484947
if (errarg->rel==NULL)
49494948
{
49504949
if (!TransactionIdIsValid(errarg->remote_xid))

‎src/test/subscription/t/100_bugs.pl

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,4 +490,81 @@
490490
$node_publisher->stop('fast');
491491
$node_subscriber->stop('fast');
492492

493+
# The bug was that when an ERROR was caught and handled by a (PL/pgSQL)
494+
# function, the apply worker reset the replication origin but continued
495+
# processing subsequent changes. So, we fail to update the replication origin
496+
# during further apply operations. This can lead to the apply worker requesting
497+
# the changes that have been applied again after restarting.
498+
499+
$node_publisher->rotate_logfile();
500+
$node_publisher->start();
501+
502+
$node_subscriber->rotate_logfile();
503+
$node_subscriber->start();
504+
505+
# Set up a publication with a table
506+
$node_publisher->safe_psql(
507+
'postgres',qq(
508+
CREATE TABLE t1 (a int);
509+
CREATE PUBLICATION regress_pub FOR TABLE t1;
510+
));
511+
512+
# Set up a subscription which subscribes the publication
513+
$node_subscriber->safe_psql(
514+
'postgres',qq(
515+
CREATE TABLE t1 (a int);
516+
CREATE SUBSCRIPTION regress_sub CONNECTION '$publisher_connstr' PUBLICATION regress_pub;
517+
));
518+
519+
$node_subscriber->wait_for_subscription_sync($node_publisher,'regress_sub');
520+
521+
# Create an AFTER INSERT trigger on the table that raises and subsequently
522+
# handles an exception. Subsequent insertions will trigger this exception,
523+
# causing the apply worker to invoke its error callback with an ERROR. However,
524+
# since the error is caught within the trigger, the apply worker will continue
525+
# processing changes.
526+
$node_subscriber->safe_psql(
527+
'postgres',q{
528+
CREATE FUNCTION handle_exception_trigger()
529+
RETURNS TRIGGER AS $$
530+
BEGIN
531+
BEGIN
532+
-- Raise an exception
533+
RAISE EXCEPTION 'This is a test exception';
534+
EXCEPTION
535+
WHEN OTHERS THEN
536+
RETURN NEW;
537+
END;
538+
539+
RETURN NEW;
540+
END;
541+
$$ LANGUAGE plpgsql;
542+
543+
CREATE TRIGGER silent_exception_trigger
544+
AFTER INSERT OR UPDATE ON t1
545+
FOR EACH ROW
546+
EXECUTE FUNCTION handle_exception_trigger();
547+
548+
ALTER TABLE t1 ENABLE ALWAYS TRIGGER silent_exception_trigger;
549+
});
550+
551+
# Obtain current remote_lsn value to check its advancement later
552+
my$remote_lsn =$node_subscriber->safe_psql('postgres',
553+
"SELECT remote_lsn FROM pg_replication_origin_status os, pg_subscription s WHERE os.external_id = 'pg_' || s.oid AND s.subname = 'regress_sub'"
554+
);
555+
556+
# Insert a tuple to replicate changes
557+
$node_publisher->safe_psql('postgres',"INSERT INTO t1 VALUES (1);");
558+
$node_publisher->wait_for_catchup('regress_sub');
559+
560+
# Confirms the origin can be advanced
561+
$result =$node_subscriber->safe_psql('postgres',
562+
"SELECT remote_lsn > '$remote_lsn' FROM pg_replication_origin_status os, pg_subscription s WHERE os.external_id = 'pg_' || s.oid AND s.subname = 'regress_sub'"
563+
);
564+
is($result,'t',
565+
'remote_lsn has advanced for apply worker raising an exception');
566+
567+
$node_publisher->stop('fast');
568+
$node_subscriber->stop('fast');
569+
493570
done_testing();

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp