Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitae341ef

Browse files
committed
Add started hook to pglogical
1 parent57da7fc commitae341ef

File tree

10 files changed

+108
-8
lines changed

10 files changed

+108
-8
lines changed

‎contrib/mmts/multimaster.c

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,7 @@ static int MtmMaxRecoveryLag;
245245
staticintMtmGcPeriod;
246246
staticboolMtmIgnoreTablesWithoutPk;
247247
staticintMtmLockCount;
248+
staticintMtmSenderStarted;
248249

249250
staticExecutorStart_hook_typePreviousExecutorStartHook;
250251
staticExecutorFinish_hook_typePreviousExecutorFinishHook;
@@ -1667,8 +1668,8 @@ void MtmRecoveryCompleted(void)
16671668
Mtm->nodes[i].lastHeartbeat=0;/* defuse watchdog until first heartbeat is received */
16681669
}
16691670
/* Mode will be changed to online once all logical receiver are connected */
1670-
elog(LOG,"Recovery completed with %d active receivers from %d",Mtm->nReceivers,Mtm->nLiveNodes-1);
1671-
MtmSwitchClusterMode(Mtm->nReceivers==Mtm->nLiveNodes-1 ?MTM_ONLINE :MTM_CONNECTED);
1671+
elog(LOG,"Recovery completed with %d active receiversand %d started sendersfrom %d",Mtm->nReceivers,Mtm->nSenders,Mtm->nLiveNodes-1);
1672+
MtmSwitchClusterMode(Mtm->nReceivers==Mtm->nLiveNodes-1&&Mtm->nSenders==Mtm->nLiveNodes-1?MTM_ONLINE :MTM_CONNECTED);
16721673
MtmUnlock();
16731674
}
16741675

@@ -2198,6 +2199,7 @@ static void MtmInitialize()
21982199
Mtm->transListHead=NULL;
21992200
Mtm->transListTail=&Mtm->transListHead;
22002201
Mtm->nReceivers=0;
2202+
Mtm->nSenders=0;
22012203
Mtm->timeShift=0;
22022204
Mtm->transCount=0;
22032205
Mtm->gcCount=0;
@@ -2906,11 +2908,9 @@ void MtmReceiverStarted(int nodeId)
29062908
MtmEnableNode(nodeId);
29072909
MtmCheckQuorum();
29082910
}
2909-
elog(LOG,"Start %d receivers from %d cluster status %s",Mtm->nReceivers+1,Mtm->nLiveNodes-1,MtmNodeStatusMnem[Mtm->status]);
2910-
if (++Mtm->nReceivers==Mtm->nLiveNodes-1) {
2911-
if (Mtm->status==MTM_CONNECTED) {
2912-
MtmSwitchClusterMode(MTM_ONLINE);
2913-
}
2911+
elog(LOG,"Start %d receivers and %d senders from %d cluster status %s",Mtm->nReceivers+1,Mtm->nSenders,Mtm->nLiveNodes-1,MtmNodeStatusMnem[Mtm->status]);
2912+
if (++Mtm->nReceivers==Mtm->nLiveNodes-1&&Mtm->nSenders==Mtm->nLiveNodes-1&&Mtm->status==MTM_CONNECTED) {
2913+
MtmSwitchClusterMode(MTM_ONLINE);
29142914
}
29152915
}
29162916
MtmUnlock();
@@ -2997,6 +2997,7 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
29972997
elog(WARNING,"Process %d starts recovery from node %d",MyProcPid,nodeId);
29982998
Mtm->recoverySlot=nodeId;
29992999
Mtm->nReceivers=0;
3000+
Mtm->nSenders=0;
30003001
Mtm->recoveryCount+=1;
30013002
Mtm->pglogicalNodeMask=0;
30023003
MtmUnlock();
@@ -3076,6 +3077,18 @@ MtmOnProcExit(int code, Datum arg)
30763077
}
30773078
}
30783079

3080+
staticvoid
3081+
MtmReplicationStartedHook(structPGLogicalStartedHookArgs*args)
3082+
{
3083+
MtmLock(LW_EXCLUSIVE);
3084+
MtmSenderStarted=1;
3085+
if (++Mtm->nSenders==Mtm->nLiveNodes-1&&Mtm->nReceivers==Mtm->nLiveNodes-1&&Mtm->status==MTM_CONNECTED) {
3086+
MtmSwitchClusterMode(MTM_ONLINE);
3087+
}
3088+
MtmUnlock();
3089+
}
3090+
3091+
30793092
staticvoid
30803093
MtmReplicationStartupHook(structPGLogicalStartupHookArgs*args)
30813094
{
@@ -3192,6 +3205,9 @@ static void
31923205
MtmReplicationShutdownHook(structPGLogicalShutdownHookArgs*args)
31933206
{
31943207
if (MtmReplicationNodeId >=0) {
3208+
MtmLock(LW_EXCLUSIVE);
3209+
Mtm->nSenders-=MtmSenderStarted;
3210+
MtmUnlock();
31953211
MTM_LOG1("Logical replication to node %d is stopped",MtmReplicationNodeId);
31963212
/* MtmOnNodeDisconnect(MtmReplicationNodeId); */
31973213
MtmReplicationNodeId=-1;/* defuse on_proc_exit hook */
@@ -3303,6 +3319,7 @@ bool MtmFilterTransaction(char* record, int size)
33033319
voidMtmSetupReplicationHooks(structPGLogicalHooks*hooks)
33043320
{
33053321
hooks->startup_hook=MtmReplicationStartupHook;
3322+
hooks->started_hook=MtmReplicationStartedHook;
33063323
hooks->shutdown_hook=MtmReplicationShutdownHook;
33073324
hooks->txn_filter_hook=MtmReplicationTxnFilterHook;
33083325
hooks->row_filter_hook=MtmReplicationRowFilterHook;

‎contrib/mmts/multimaster.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,8 @@ typedef struct
273273
intinject2PCError;/* Simulate error during 2PC commit at this node */
274274
intnLiveNodes;/* Number of active nodes */
275275
intnAllNodes;/* Total numbber of nodes */
276-
intnReceivers;/* Number of initialized logical receivers (used to determine moment when Mtm intialization is completed */
276+
intnReceivers;/* Number of initialized logical receivers (used to determine moment when intialization/recovery is completed) */
277+
intnSenders;/* Number of started WAL senders (used to determine moment when recovery) */
277278
intnLockers;/* Number of lockers */
278279
intnActiveTransactions;/* Nunmber of active 2PC transactions */
279280
intnConfigChanges;/* Number of cluster configuration changes */

‎contrib/mmts/pglogical_config.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,8 @@ prepare_startup_message(PGLogicalOutputData *data)
477477
*/
478478
l=add_startup_msg_b(l,"hooks.startup_hook_enabled",
479479
data->hooks.startup_hook!=NULL);
480+
l=add_startup_msg_b(l,"hooks.started_hook_enabled",
481+
data->hooks.started_hook!=NULL);
480482
l=add_startup_msg_b(l,"hooks.shutdown_hook_enabled",
481483
data->hooks.shutdown_hook!=NULL);
482484
l=add_startup_msg_b(l,"hooks.row_filter_enabled",

‎contrib/mmts/pglogical_hooks.c

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,12 +96,14 @@ load_hooks(PGLogicalOutputData *data)
9696

9797
elog(DEBUG3,"pglogical_output: Loaded hooks from function %u. Hooks are: \n"
9898
"\tstartup_hook: %p\n"
99+
"\tstarted_hook: %p\n"
99100
"\tshutdown_hook: %p\n"
100101
"\trow_filter_hook: %p\n"
101102
"\ttxn_filter_hook: %p\n"
102103
"\thooks_private_data: %p\n",
103104
hooks_func,
104105
data->hooks.startup_hook,
106+
data->hooks.started_hook,
105107
data->hooks.shutdown_hook,
106108
data->hooks.row_filter_hook,
107109
data->hooks.txn_filter_hook,
@@ -118,6 +120,21 @@ load_hooks(PGLogicalOutputData *data)
118120
CommitTransactionCommand();
119121
}
120122

123+
void
124+
call_started_hook(PGLogicalOutputData*data)
125+
{
126+
structPGLogicalStartedHookArgsargs;
127+
MemoryContextold_ctxt;
128+
129+
if (data->hooks.started_hook!=NULL)
130+
{
131+
args.private_data=data->hooks.hooks_private_data;
132+
old_ctxt=MemoryContextSwitchTo(data->hooks_mctxt);
133+
(void) (*data->hooks.started_hook)(&args);
134+
MemoryContextSwitchTo(old_ctxt);
135+
}
136+
}
137+
121138
void
122139
call_startup_hook(PGLogicalOutputData*data,List*plugin_params)
123140
{

‎contrib/mmts/pglogical_hooks.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ extern void load_hooks(PGLogicalOutputData *data);
1111

1212
externvoidcall_startup_hook(PGLogicalOutputData*data,List*plugin_params);
1313

14+
externvoidcall_started_hook(PGLogicalOutputData*data);
15+
1416
externvoidcall_shutdown_hook(PGLogicalOutputData*data);
1517

1618
externboolcall_row_filter_hook(PGLogicalOutputData*data,

‎contrib/mmts/pglogical_output.c

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ extern void_PG_output_plugin_init(OutputPluginCallbacks *cb);
5454
/* These must be available to pg_dlsym() */
5555
staticvoidpg_decode_startup(LogicalDecodingContext*ctx,
5656
OutputPluginOptions*opt,boolis_init);
57+
staticvoidpg_decode_started(LogicalDecodingContext*ctx);
5758
staticvoidpg_decode_shutdown(LogicalDecodingContext*ctx);
5859
staticvoidpg_decode_begin_txn(LogicalDecodingContext*ctx,
5960
ReorderBufferTXN*txn);
@@ -83,6 +84,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
8384
AssertVariableIsOfType(&_PG_output_plugin_init,LogicalOutputPluginInit);
8485

8586
cb->startup_cb=pg_decode_startup;
87+
cb->started_cb=pg_decode_started;
8688
cb->begin_cb=pg_decode_begin_txn;
8789
cb->change_cb=pg_decode_change;
8890
cb->commit_cb=pg_decode_commit_txn;
@@ -490,6 +492,15 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
490492
MemoryContextReset(data->context);
491493
}
492494

495+
staticvoid
496+
pg_decode_started(LogicalDecodingContext*ctx)
497+
{
498+
PGLogicalOutputData*data=ctx->output_plugin_private;
499+
call_started_hook(data);
500+
}
501+
502+
503+
493504
/*
494505
* Decide if the whole transaction with specific origin should be filtered out.
495506
*/

‎contrib/mmts/pglogical_output/hooks.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,12 @@ struct PGLogicalStartupHookArgs
2727

2828
typedefvoid (*pglogical_startup_hook_fn)(structPGLogicalStartupHookArgs*args);
2929

30+
structPGLogicalStartedHookArgs
31+
{
32+
void*private_data;
33+
};
34+
35+
typedefvoid (*pglogical_started_hook_fn)(structPGLogicalStartedHookArgs*args);
3036

3137
structPGLogicalTxnFilterArgs
3238
{
@@ -63,6 +69,7 @@ typedef void (*pglogical_shutdown_hook_fn)(struct PGLogicalShutdownHookArgs *arg
6369
structPGLogicalHooks
6470
{
6571
pglogical_startup_hook_fnstartup_hook;
72+
pglogical_started_hook_fnstarted_hook;
6673
pglogical_shutdown_hook_fnshutdown_hook;
6774
pglogical_txn_filter_hook_fntxn_filter_hook;
6875
pglogical_row_filter_hook_fnrow_filter_hook;

‎contrib/multimaster/pglogical_output.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
7676
AssertVariableIsOfType(&_PG_output_plugin_init,LogicalOutputPluginInit);
7777

7878
cb->startup_cb=pg_decode_startup;
79+
cb->started_cb=pg_decode_started;
7980
cb->begin_cb=pg_decode_begin_txn;
8081
cb->change_cb=pg_decode_change;
8182
cb->commit_cb=pg_decode_commit_txn;

‎src/backend/replication/logical/logical.c

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ typedef struct LogicalErrorCallbackState
5454

5555
/* wrappers around output plugin callbacks */
5656
staticvoidoutput_plugin_error_callback(void*arg);
57+
staticvoidstarted_cb_wrapper(LogicalDecodingContext*ctx);
5758
staticvoidstartup_cb_wrapper(LogicalDecodingContext*ctx,OutputPluginOptions*opt,
5859
boolis_init);
5960
staticvoidshutdown_cb_wrapper(LogicalDecodingContext*ctx);
@@ -413,6 +414,7 @@ DecodingContextReady(LogicalDecodingContext *ctx)
413414
void
414415
DecodingContextFindStartpoint(LogicalDecodingContext*ctx)
415416
{
417+
MemoryContextold_context;
416418
XLogRecPtrstartptr;
417419

418420
/* Initialize from where to start reading WAL. */
@@ -447,6 +449,11 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
447449
}
448450

449451
ctx->slot->data.confirmed_flush=ctx->reader->EndRecPtr;
452+
453+
old_context=MemoryContextSwitchTo(ctx->context);
454+
if (ctx->callbacks.started_cb!=NULL)
455+
started_cb_wrapper(ctx);
456+
MemoryContextSwitchTo(old_context);
450457
}
451458

452459
/*
@@ -562,6 +569,31 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool i
562569
error_context_stack=errcallback.previous;
563570
}
564571

572+
staticvoid
573+
started_cb_wrapper(LogicalDecodingContext*ctx)
574+
{
575+
LogicalErrorCallbackStatestate;
576+
ErrorContextCallbackerrcallback;
577+
578+
/* Push callback + info on the error context stack */
579+
state.ctx=ctx;
580+
state.callback_name="startup";
581+
state.report_location=InvalidXLogRecPtr;
582+
errcallback.callback=output_plugin_error_callback;
583+
errcallback.arg= (void*)&state;
584+
errcallback.previous=error_context_stack;
585+
error_context_stack=&errcallback;
586+
587+
/* set output state */
588+
ctx->accept_writes= false;
589+
590+
/* do the actual work: call callback */
591+
ctx->callbacks.started_cb(ctx);
592+
593+
/* Pop the error context stack */
594+
error_context_stack=errcallback.previous;
595+
}
596+
565597
staticvoid
566598
shutdown_cb_wrapper(LogicalDecodingContext*ctx)
567599
{

‎src/include/replication/output_plugin.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,15 @@ typedef void (*LogicalDecodeStartupCB) (
4747
boolis_init
4848
);
4949

50+
/*
51+
* Callback that gets called when WAL-sender is started. ctx->private_data can
52+
* be set to some private data.
53+
*
54+
*/
55+
typedefvoid (*LogicalDecodeStartedCB) (
56+
structLogicalDecodingContext*ctx
57+
);
58+
5059
/*
5160
* Callback called for every (explicit or implicit) BEGIN of a successful
5261
* transaction.
@@ -105,6 +114,7 @@ typedef void (*LogicalDecodeShutdownCB) (
105114
typedefstructOutputPluginCallbacks
106115
{
107116
LogicalDecodeStartupCBstartup_cb;
117+
LogicalDecodeStartedCBstarted_cb;
108118
LogicalDecodeBeginCBbegin_cb;
109119
LogicalDecodeChangeCBchange_cb;
110120
LogicalDecodeCommitCBcommit_cb;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp