Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitb8c0a63

Browse files
knizhnikkelvich
authored andcommitted
Prevent recusive broadcast by setting special applicastion name
1 parent16df7ec commitb8c0a63

File tree

3 files changed

+15
-20
lines changed

3 files changed

+15
-20
lines changed

‎multimaster.c

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -798,7 +798,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
798798
* Send notification only if ABORT happens during transaction processing at replicas,
799799
* do not send notification if ABORT is receiver from master
800800
*/
801-
MTM_TRACE("%d: send ABORT notification to coordinator %d\n",MyProcPid,x->gtid.node);
801+
MTM_INFO("%d: send ABORT notificationabort transaction %dto coordinator %d\n",MyProcPid,x->gtid.xid,x->gtid.node);
802802
if (ts==NULL) {
803803
Assert(TransactionIdIsValid(x->xid));
804804
ts=hash_search(MtmXid2State,&x->xid,HASH_ENTER,NULL);
@@ -1602,6 +1602,11 @@ MtmSlotMode MtmReceiverSlotMode(int nodeId)
16021602
returnMtm->recoverySlot ?SLOT_CREATE_NEW :SLOT_OPEN_ALWAYS;
16031603
}
16041604

1605+
staticboolMtmIsBroadcast()
1606+
{
1607+
returnapplication_name!=NULL&&strcmp(application_name,MULTIMASTER_BROADCAST_SERVICE)==0;
1608+
}
1609+
16051610
voidMtmRecoverNode(intnodeId)
16061611
{
16071612
if (nodeId <=0||nodeId>Mtm->nNodes)
@@ -1611,7 +1616,7 @@ void MtmRecoverNode(int nodeId)
16111616
if (!BIT_CHECK(Mtm->disabledNodeMask,nodeId-1)) {
16121617
elog(ERROR,"Node %d was not disabled",nodeId);
16131618
}
1614-
if (!IsTransactionBlock())
1619+
if (!MtmIsBroadcast())
16151620
{
16161621
MtmBroadcastUtilityStmt(psprintf("select pg_create_logical_replication_slot('"MULTIMASTER_SLOT_PATTERN"', '"MULTIMASTER_NAME"')",nodeId), true);
16171622
}
@@ -1628,7 +1633,7 @@ void MtmDropNode(int nodeId, bool dropSlot)
16281633
}
16291634
BIT_SET(Mtm->disabledNodeMask,nodeId-1);
16301635
Mtm->nNodes-=1;
1631-
if (!IsTransactionBlock())
1636+
if (!MtmIsBroadcast())
16321637
{
16331638
MtmBroadcastUtilityStmt(psprintf("select mtm.drop_node(%d,%s)",nodeId,dropSlot ?"true" :"false"), true);
16341639
}
@@ -1648,7 +1653,6 @@ MtmReplicationShutdownHook(struct PGLogicalShutdownHookArgs* args)
16481653
staticbool
16491654
MtmReplicationTxnFilterHook(structPGLogicalTxnFilterArgs*args)
16501655
{
1651-
elog(WARNING,"MtmReplicationTxnFilterHook: args->origin_id=%d, MtmReplicationNodeId=%d",args->origin_id,MtmReplicationNodeId);
16521656
returnargs->origin_id==InvalidRepOriginId||MtmIsRecoveredNode(MtmReplicationNodeId);
16531657
}
16541658

@@ -1795,7 +1799,6 @@ static bool MtmRunUtilityStmt(PGconn* conn, char const* sql, char **errmsg)
17951799
{
17961800
PGresult*result=PQexec(conn,sql);
17971801
intstatus=PQresultStatus(result);
1798-
char*errstr;
17991802

18001803
boolret=status==PGRES_COMMAND_OK||status==PGRES_TUPLES_OK;
18011804

@@ -1815,25 +1818,18 @@ static bool MtmRunUtilityStmt(PGconn* conn, char const* sql, char **errmsg)
18151818

18161819
staticvoidMtmBroadcastUtilityStmt(charconst*sql,boolignoreError)
18171820
{
1818-
char*conn_str=pstrdup(MtmConnStrs);
1819-
char*conn_str_end=conn_str+strlen(conn_str);
18201821
inti=0;
18211822
nodemask_tdisabledNodeMask=Mtm->disabledNodeMask;
18221823
intfailedNode=-1;
18231824
charconst*errorMsg=NULL;
18241825
PGconn**conns=palloc0(sizeof(PGconn*)*MtmNodes);
18251826
char*utility_errmsg;
18261827

1827-
while (conn_str<conn_str_end)
1828+
for (i=0;i<MtmNodes;i++)
18281829
{
1829-
char*p=strchr(conn_str,',');
1830-
if (p==NULL) {
1831-
p=conn_str_end;
1832-
}
1833-
*p='\0';
18341830
if (!BIT_CHECK(disabledNodeMask,i))
18351831
{
1836-
conns[i]=PQconnectdb(conn_str);
1832+
conns[i]=PQconnectdb(psprintf("%s application_name=%s",Mtm->nodes[i].con.connStr,MULTIMASTER_BROADCAST_SERVICE));
18371833
if (PQstatus(conns[i])!=CONNECTION_OK)
18381834
{
18391835
if (ignoreError)
@@ -1845,12 +1841,10 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
18451841
do {
18461842
PQfinish(conns[i]);
18471843
}while (--i >=0);
1848-
elog(ERROR,"Failed to establish connection '%s' to node %d",conn_str,failedNode);
1844+
elog(ERROR,"Failed to establish connection '%s' to node %d",Mtm->nodes[i].con.connStr,failedNode);
18491845
}
18501846
}
18511847
}
1852-
conn_str=p+1;
1853-
i+=1;
18541848
}
18551849
Assert(i==MtmNodes);
18561850

‎multimaster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#defineMULTIMASTER_MAX_SLOT_NAME_SIZE 16
2727
#defineMULTIMASTER_MAX_CONN_STR_SIZE 128
2828
#defineMULTIMASTER_MAX_HOST_NAME_SIZE 64
29+
#defineMULTIMASTER_BROADCAST_SERVICE "mtm_broadcast"
2930

3031
#defineUSEC 1000000
3132

‎pglogical_apply.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -934,10 +934,10 @@ void MtmExecutor(int id, void* work, size_t size)
934934
{
935935
EmitErrorReport();
936936
FlushErrorState();
937-
MTM_TRACE("%d: REMOTE begin abort transaction %d\n",MyProcPid,MtmGetCurrentTransactionId());
938-
MtmEndSession();
937+
MTM_INFO("%d: REMOTE begin abort transaction %d\n",MyProcPid,MtmGetCurrentTransactionId());
938+
MtmEndSession(false);
939939
AbortCurrentTransaction();
940-
MTM_TRACE("%d: REMOTE end abort transaction %d\n",MyProcPid,MtmGetCurrentTransactionId());
940+
MTM_INFO("%d: REMOTE end abort transaction %d\n",MyProcPid,MtmGetCurrentTransactionId());
941941
}
942942
PG_END_TRY();
943943

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp