Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit02c1b64

Browse files
author
Amit Kapila
committed
Refactor to split Apply and Tablesync Workers code.
Both apply and tablesync workers were using ApplyWorkerMain() as entrypoint. As the name implies, ApplyWorkerMain() should be considered asthe main function for apply workers. Tablesync worker's path was hiddenand does not have enough in common to share the same main function withapply worker.Also, most of the code shared by both worker types is already combinedin LogicalRepApplyLoop(). There is no need to combine the rest inApplyWorkerMain() anymore.This patch introduces TablesyncWorkerMain() as a new entry point fortablesync workers. This aims to increase code readability and would helpwith future improvements like the reuse of tablesync workers in theinitial synchronization.Author: Melih Mutlu based on suggestions by Melanie PlagemanReviewed-by: Peter Smith, Kuroda Hayato, Amit KapilaDiscussion:http://postgr.es/m/CAGPVpCTq=rUDd4JUdaRc1XUWf4BrH2gdSNf3rtOMUGj9rPpfzQ@mail.gmail.com
1 parent0125c4e commit02c1b64

File tree

7 files changed

+299
-224
lines changed

7 files changed

+299
-224
lines changed

‎src/backend/postmaster/bgworker.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,9 @@ static const struct
131131
},
132132
{
133133
"ParallelApplyWorkerMain",ParallelApplyWorkerMain
134+
},
135+
{
136+
"TablesyncWorkerMain",TablesyncWorkerMain
134137
}
135138
};
136139

‎src/backend/replication/logical/applyparallelworker.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -942,7 +942,7 @@ ParallelApplyWorkerMain(Datum main_arg)
942942
MyLogicalRepWorker->last_send_time=MyLogicalRepWorker->last_recv_time=
943943
MyLogicalRepWorker->reply_time=0;
944944

945-
InitializeApplyWorker();
945+
InitializeLogRepWorker();
946946

947947
InitializingApplyWorker= false;
948948

‎src/backend/replication/logical/launcher.c

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -459,24 +459,30 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
459459
snprintf(bgw.bgw_library_name,MAXPGPATH,"postgres");
460460

461461
if (is_parallel_apply_worker)
462+
{
462463
snprintf(bgw.bgw_function_name,BGW_MAXLEN,"ParallelApplyWorkerMain");
463-
else
464-
snprintf(bgw.bgw_function_name,BGW_MAXLEN,"ApplyWorkerMain");
465-
466-
if (OidIsValid(relid))
467464
snprintf(bgw.bgw_name,BGW_MAXLEN,
468-
"logical replication worker for subscription %u sync %u",subid,relid);
469-
elseif (is_parallel_apply_worker)
465+
"logical replication parallel apply worker for subscription %u",
466+
subid);
467+
snprintf(bgw.bgw_type,BGW_MAXLEN,"logical replication parallel worker");
468+
}
469+
elseif (OidIsValid(relid))
470+
{
471+
snprintf(bgw.bgw_function_name,BGW_MAXLEN,"TablesyncWorkerMain");
470472
snprintf(bgw.bgw_name,BGW_MAXLEN,
471-
"logical replication parallel apply worker for subscription %u",subid);
473+
"logical replication tablesync worker for subscription %u sync %u",
474+
subid,
475+
relid);
476+
snprintf(bgw.bgw_type,BGW_MAXLEN,"logical replication tablesync worker");
477+
}
472478
else
479+
{
480+
snprintf(bgw.bgw_function_name,BGW_MAXLEN,"ApplyWorkerMain");
473481
snprintf(bgw.bgw_name,BGW_MAXLEN,
474-
"logical replication apply worker for subscription %u",subid);
475-
476-
if (is_parallel_apply_worker)
477-
snprintf(bgw.bgw_type,BGW_MAXLEN,"logical replication parallel worker");
478-
else
479-
snprintf(bgw.bgw_type,BGW_MAXLEN,"logical replication worker");
482+
"logical replication apply worker for subscription %u",
483+
subid);
484+
snprintf(bgw.bgw_type,BGW_MAXLEN,"logical replication apply worker");
485+
}
480486

481487
bgw.bgw_restart_time=BGW_NEVER_RESTART;
482488
bgw.bgw_notify_pid=MyProcPid;

‎src/backend/replication/logical/tablesync.c

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@
106106
#include"pgstat.h"
107107
#include"replication/logicallauncher.h"
108108
#include"replication/logicalrelation.h"
109+
#include"replication/logicalworker.h"
109110
#include"replication/walreceiver.h"
110111
#include"replication/worker_internal.h"
111112
#include"replication/slot.h"
@@ -1241,7 +1242,7 @@ ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
12411242
*
12421243
* The returned slot name is palloc'ed in current memory context.
12431244
*/
1244-
char*
1245+
staticchar*
12451246
LogicalRepSyncTableStart(XLogRecPtr*origin_startpos)
12461247
{
12471248
char*slotname;
@@ -1584,6 +1585,94 @@ FetchTableStates(bool *started_tx)
15841585
returnhas_subrels;
15851586
}
15861587

1588+
/*
1589+
* Execute the initial sync with error handling. Disable the subscription,
1590+
* if it's required.
1591+
*
1592+
* Allocate the slot name in long-lived context on return. Note that we don't
1593+
* handle FATAL errors which are probably because of system resource error and
1594+
* are not repeatable.
1595+
*/
1596+
staticvoid
1597+
start_table_sync(XLogRecPtr*origin_startpos,char**slotname)
1598+
{
1599+
char*sync_slotname=NULL;
1600+
1601+
Assert(am_tablesync_worker());
1602+
1603+
PG_TRY();
1604+
{
1605+
/* Call initial sync. */
1606+
sync_slotname=LogicalRepSyncTableStart(origin_startpos);
1607+
}
1608+
PG_CATCH();
1609+
{
1610+
if (MySubscription->disableonerr)
1611+
DisableSubscriptionAndExit();
1612+
else
1613+
{
1614+
/*
1615+
* Report the worker failed during table synchronization. Abort
1616+
* the current transaction so that the stats message is sent in an
1617+
* idle state.
1618+
*/
1619+
AbortOutOfAnyTransaction();
1620+
pgstat_report_subscription_error(MySubscription->oid, false);
1621+
1622+
PG_RE_THROW();
1623+
}
1624+
}
1625+
PG_END_TRY();
1626+
1627+
/* allocate slot name in long-lived context */
1628+
*slotname=MemoryContextStrdup(ApplyContext,sync_slotname);
1629+
pfree(sync_slotname);
1630+
}
1631+
1632+
/*
1633+
* Runs the tablesync worker.
1634+
*
1635+
* It starts syncing tables. After a successful sync, sets streaming options
1636+
* and starts streaming to catchup with apply worker.
1637+
*/
1638+
staticvoid
1639+
run_tablesync_worker()
1640+
{
1641+
charoriginname[NAMEDATALEN];
1642+
XLogRecPtrorigin_startpos=InvalidXLogRecPtr;
1643+
char*slotname=NULL;
1644+
WalRcvStreamOptionsoptions;
1645+
1646+
start_table_sync(&origin_startpos,&slotname);
1647+
1648+
ReplicationOriginNameForLogicalRep(MySubscription->oid,
1649+
MyLogicalRepWorker->relid,
1650+
originname,
1651+
sizeof(originname));
1652+
1653+
set_apply_error_context_origin(originname);
1654+
1655+
set_stream_options(&options,slotname,&origin_startpos);
1656+
1657+
walrcv_startstreaming(LogRepWorkerWalRcvConn,&options);
1658+
1659+
/* Apply the changes till we catchup with the apply worker. */
1660+
start_apply(origin_startpos);
1661+
}
1662+
1663+
/* Logical Replication Tablesync worker entry point */
1664+
void
1665+
TablesyncWorkerMain(Datummain_arg)
1666+
{
1667+
intworker_slot=DatumGetInt32(main_arg);
1668+
1669+
SetupApplyOrSyncWorker(worker_slot);
1670+
1671+
run_tablesync_worker();
1672+
1673+
finish_sync_worker();
1674+
}
1675+
15871676
/*
15881677
* If the subscription has no tables then return false.
15891678
*

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp