Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit1d2cfd1

Browse files
knizhnikkelvich
authored andcommitted
Reverse started hook commits
1 parentdde47c0 commit1d2cfd1

File tree

7 files changed

+8
-66
lines changed

7 files changed

+8
-66
lines changed

‎multimaster.c

Lines changed: 7 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,6 @@ static int MtmMaxRecoveryLag;
245245
staticintMtmGcPeriod;
246246
staticboolMtmIgnoreTablesWithoutPk;
247247
staticintMtmLockCount;
248-
staticintMtmSenderStarted;
249248

250249
staticExecutorStart_hook_typePreviousExecutorStartHook;
251250
staticExecutorFinish_hook_typePreviousExecutorFinishHook;
@@ -1668,8 +1667,8 @@ void MtmRecoveryCompleted(void)
16681667
Mtm->nodes[i].lastHeartbeat=0;/* defuse watchdog until first heartbeat is received */
16691668
}
16701669
/* Mode will be changed to online once all logical receiver are connected */
1671-
elog(LOG,"Recovery completed with %d active receiversand %d started sendersfrom %d",Mtm->nReceivers,Mtm->nSenders,Mtm->nLiveNodes-1);
1672-
MtmSwitchClusterMode(Mtm->nReceivers==Mtm->nLiveNodes-1&&Mtm->nSenders==Mtm->nLiveNodes-1?MTM_ONLINE :MTM_CONNECTED);
1670+
elog(LOG,"Recovery completed with %d active receivers from %d",Mtm->nReceivers,Mtm->nLiveNodes-1);
1671+
MtmSwitchClusterMode(Mtm->nReceivers==Mtm->nLiveNodes-1 ?MTM_ONLINE :MTM_CONNECTED);
16731672
MtmUnlock();
16741673
}
16751674

@@ -2199,7 +2198,6 @@ static void MtmInitialize()
21992198
Mtm->transListHead=NULL;
22002199
Mtm->transListTail=&Mtm->transListHead;
22012200
Mtm->nReceivers=0;
2202-
Mtm->nSenders=0;
22032201
Mtm->timeShift=0;
22042202
Mtm->transCount=0;
22052203
Mtm->gcCount=0;
@@ -2908,9 +2906,11 @@ void MtmReceiverStarted(int nodeId)
29082906
MtmEnableNode(nodeId);
29092907
MtmCheckQuorum();
29102908
}
2911-
elog(LOG,"Start %d receivers and %d senders from %d cluster status %s",Mtm->nReceivers+1,Mtm->nSenders,Mtm->nLiveNodes-1,MtmNodeStatusMnem[Mtm->status]);
2912-
if (++Mtm->nReceivers==Mtm->nLiveNodes-1&&Mtm->nSenders==Mtm->nLiveNodes-1&&Mtm->status==MTM_CONNECTED) {
2913-
MtmSwitchClusterMode(MTM_ONLINE);
2909+
elog(LOG,"Start %d receivers from %d cluster status %s",Mtm->nReceivers+1,Mtm->nLiveNodes-1,MtmNodeStatusMnem[Mtm->status]);
2910+
if (++Mtm->nReceivers==Mtm->nLiveNodes-1) {
2911+
if (Mtm->status==MTM_CONNECTED) {
2912+
MtmSwitchClusterMode(MTM_ONLINE);
2913+
}
29142914
}
29152915
}
29162916
MtmUnlock();
@@ -2997,7 +2997,6 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
29972997
elog(WARNING,"Process %d starts recovery from node %d",MyProcPid,nodeId);
29982998
Mtm->recoverySlot=nodeId;
29992999
Mtm->nReceivers=0;
3000-
Mtm->nSenders=0;
30013000
Mtm->recoveryCount+=1;
30023001
Mtm->pglogicalNodeMask=0;
30033002
MtmUnlock();
@@ -3077,19 +3076,6 @@ MtmOnProcExit(int code, Datum arg)
30773076
}
30783077
}
30793078

3080-
staticvoid
3081-
MtmReplicationStartedHook(structPGLogicalStartedHookArgs*args)
3082-
{
3083-
MtmLock(LW_EXCLUSIVE);
3084-
MtmSenderStarted=1;
3085-
elog(LOG,"Start %d senders and %d receivers from %d cluster status %s",Mtm->nSenders+1,Mtm->nReceivers,Mtm->nLiveNodes-1,MtmNodeStatusMnem[Mtm->status]);
3086-
if (++Mtm->nSenders==Mtm->nLiveNodes-1&&Mtm->nReceivers==Mtm->nLiveNodes-1&&Mtm->status==MTM_CONNECTED) {
3087-
MtmSwitchClusterMode(MTM_ONLINE);
3088-
}
3089-
MtmUnlock();
3090-
}
3091-
3092-
30933079
staticvoid
30943080
MtmReplicationStartupHook(structPGLogicalStartupHookArgs*args)
30953081
{
@@ -3206,9 +3192,6 @@ static void
32063192
MtmReplicationShutdownHook(structPGLogicalShutdownHookArgs*args)
32073193
{
32083194
if (MtmReplicationNodeId >=0) {
3209-
MtmLock(LW_EXCLUSIVE);
3210-
Mtm->nSenders-=MtmSenderStarted;
3211-
MtmUnlock();
32123195
MTM_LOG1("Logical replication to node %d is stopped",MtmReplicationNodeId);
32133196
/* MtmOnNodeDisconnect(MtmReplicationNodeId); */
32143197
MtmReplicationNodeId=-1;/* defuse on_proc_exit hook */
@@ -3320,7 +3303,6 @@ bool MtmFilterTransaction(char* record, int size)
33203303
voidMtmSetupReplicationHooks(structPGLogicalHooks*hooks)
33213304
{
33223305
hooks->startup_hook=MtmReplicationStartupHook;
3323-
hooks->started_hook=MtmReplicationStartedHook;
33243306
hooks->shutdown_hook=MtmReplicationShutdownHook;
33253307
hooks->txn_filter_hook=MtmReplicationTxnFilterHook;
33263308
hooks->row_filter_hook=MtmReplicationRowFilterHook;

‎multimaster.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -273,8 +273,7 @@ typedef struct
273273
intinject2PCError;/* Simulate error during 2PC commit at this node */
274274
intnLiveNodes;/* Number of active nodes */
275275
intnAllNodes;/* Total numbber of nodes */
276-
intnReceivers;/* Number of initialized logical receivers (used to determine moment when intialization/recovery is completed) */
277-
intnSenders;/* Number of started WAL senders (used to determine moment when recovery) */
276+
intnReceivers;/* Number of initialized logical receivers (used to determine moment when Mtm intialization is completed */
278277
intnLockers;/* Number of lockers */
279278
intnActiveTransactions;/* Nunmber of active 2PC transactions */
280279
intnConfigChanges;/* Number of cluster configuration changes */

‎pglogical_config.c

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -477,8 +477,6 @@ prepare_startup_message(PGLogicalOutputData *data)
477477
*/
478478
l=add_startup_msg_b(l,"hooks.startup_hook_enabled",
479479
data->hooks.startup_hook!=NULL);
480-
l=add_startup_msg_b(l,"hooks.started_hook_enabled",
481-
data->hooks.started_hook!=NULL);
482480
l=add_startup_msg_b(l,"hooks.shutdown_hook_enabled",
483481
data->hooks.shutdown_hook!=NULL);
484482
l=add_startup_msg_b(l,"hooks.row_filter_enabled",

‎pglogical_hooks.c

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -96,14 +96,12 @@ load_hooks(PGLogicalOutputData *data)
9696

9797
elog(DEBUG3,"pglogical_output: Loaded hooks from function %u. Hooks are: \n"
9898
"\tstartup_hook: %p\n"
99-
"\tstarted_hook: %p\n"
10099
"\tshutdown_hook: %p\n"
101100
"\trow_filter_hook: %p\n"
102101
"\ttxn_filter_hook: %p\n"
103102
"\thooks_private_data: %p\n",
104103
hooks_func,
105104
data->hooks.startup_hook,
106-
data->hooks.started_hook,
107105
data->hooks.shutdown_hook,
108106
data->hooks.row_filter_hook,
109107
data->hooks.txn_filter_hook,
@@ -120,21 +118,6 @@ load_hooks(PGLogicalOutputData *data)
120118
CommitTransactionCommand();
121119
}
122120

123-
void
124-
call_started_hook(PGLogicalOutputData*data)
125-
{
126-
structPGLogicalStartedHookArgsargs;
127-
MemoryContextold_ctxt;
128-
129-
if (data->hooks.started_hook!=NULL)
130-
{
131-
args.private_data=data->hooks.hooks_private_data;
132-
old_ctxt=MemoryContextSwitchTo(data->hooks_mctxt);
133-
(void) (*data->hooks.started_hook)(&args);
134-
MemoryContextSwitchTo(old_ctxt);
135-
}
136-
}
137-
138121
void
139122
call_startup_hook(PGLogicalOutputData*data,List*plugin_params)
140123
{

‎pglogical_hooks.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@ extern void load_hooks(PGLogicalOutputData *data);
1111

1212
externvoidcall_startup_hook(PGLogicalOutputData*data,List*plugin_params);
1313

14-
externvoidcall_started_hook(PGLogicalOutputData*data);
15-
1614
externvoidcall_shutdown_hook(PGLogicalOutputData*data);
1715

1816
externboolcall_row_filter_hook(PGLogicalOutputData*data,

‎pglogical_output.c

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ extern void_PG_output_plugin_init(OutputPluginCallbacks *cb);
5454
/* These must be available to pg_dlsym() */
5555
staticvoidpg_decode_startup(LogicalDecodingContext*ctx,
5656
OutputPluginOptions*opt,boolis_init);
57-
staticvoidpg_decode_started(LogicalDecodingContext*ctx);
5857
staticvoidpg_decode_shutdown(LogicalDecodingContext*ctx);
5958
staticvoidpg_decode_begin_txn(LogicalDecodingContext*ctx,
6059
ReorderBufferTXN*txn);
@@ -84,7 +83,6 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
8483
AssertVariableIsOfType(&_PG_output_plugin_init,LogicalOutputPluginInit);
8584

8685
cb->startup_cb=pg_decode_startup;
87-
cb->started_cb=pg_decode_started;
8886
cb->begin_cb=pg_decode_begin_txn;
8987
cb->change_cb=pg_decode_change;
9088
cb->commit_cb=pg_decode_commit_txn;
@@ -492,15 +490,6 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
492490
MemoryContextReset(data->context);
493491
}
494492

495-
staticvoid
496-
pg_decode_started(LogicalDecodingContext*ctx)
497-
{
498-
PGLogicalOutputData*data=ctx->output_plugin_private;
499-
call_started_hook(data);
500-
}
501-
502-
503-
504493
/*
505494
* Decide if the whole transaction with specific origin should be filtered out.
506495
*/

‎pglogical_output/hooks.h

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,6 @@ struct PGLogicalStartupHookArgs
2727

2828
typedefvoid (*pglogical_startup_hook_fn)(structPGLogicalStartupHookArgs*args);
2929

30-
structPGLogicalStartedHookArgs
31-
{
32-
void*private_data;
33-
};
34-
35-
typedefvoid (*pglogical_started_hook_fn)(structPGLogicalStartedHookArgs*args);
3630

3731
structPGLogicalTxnFilterArgs
3832
{
@@ -69,7 +63,6 @@ typedef void (*pglogical_shutdown_hook_fn)(struct PGLogicalShutdownHookArgs *arg
6963
structPGLogicalHooks
7064
{
7165
pglogical_startup_hook_fnstartup_hook;
72-
pglogical_started_hook_fnstarted_hook;
7366
pglogical_shutdown_hook_fnshutdown_hook;
7467
pglogical_txn_filter_hook_fntxn_filter_hook;
7568
pglogical_row_filter_hook_fnrow_filter_hook;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp