Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitc7b5fcb

Browse files
committed
Merge branch 'master' of github.com:postgrespro/postgres_cluster
2 parents9210e0f +2420bbe commitc7b5fcb

File tree

9 files changed

+42
-43
lines changed

9 files changed

+42
-43
lines changed

‎contrib/mmts/arbiter.c‎

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -289,8 +289,7 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
289289
elog(ERROR,"Arbiter failed to resolve host '%s' by name",host);
290290
}
291291

292-
Retry:
293-
292+
Retry:
294293
while (1) {
295294
intrc=-1;
296295

@@ -384,20 +383,29 @@ static void MtmOpenConnections()
384383

385384

386385
staticboolMtmSendToNode(intnode,voidconst*buf,intsize)
387-
{
388-
while (sockets[node]<0|| !MtmWriteSocket(sockets[node],buf,size)) {
389-
elog(WARNING,"Arbiter failed to write to node %d: %d",node+1,errno);
390-
if (sockets[node] >=0) {
386+
{
387+
while (true) {
388+
if (sockets[node] >=0&&BIT_CHECK(Mtm->reconnectMask,node)) {
389+
elog(WARNING,"Arbiter is forced to reconnect to node %d",node+1);
390+
BIT_CLEAR(Mtm->reconnectMask,node);
391391
close(sockets[node]);
392+
sockets[node]=-1;
392393
}
393-
sockets[node]=MtmConnectSocket(Mtm->nodes[node].con.hostName,MtmArbiterPort+node+1,MtmReconnectAttempts);
394-
if (sockets[node]<0) {
395-
MtmOnNodeDisconnect(node+1);
396-
return false;
394+
if (sockets[node]<0|| !MtmWriteSocket(sockets[node],buf,size)) {
395+
if (sockets[node] >=0) {
396+
elog(WARNING,"Arbiter failed to write to node %d: %d",node+1,errno);
397+
close(sockets[node]);
398+
}
399+
sockets[node]=MtmConnectSocket(Mtm->nodes[node].con.hostName,MtmArbiterPort+node+1,MtmReconnectAttempts);
400+
if (sockets[node]<0) {
401+
MtmOnNodeDisconnect(node+1);
402+
return false;
403+
}
404+
MTM_TRACE("Arbiter restablished connection with node %d\n",node+1);
405+
}else {
406+
return true;
397407
}
398-
elog(NOTICE,"Arbiter restablish connection with node %d",node+1);
399408
}
400-
return true;
401409
}
402410

403411
staticintMtmReadFromNode(intnode,void*buf,intbuf_size)
@@ -477,10 +485,6 @@ static void MtmAcceptIncomingConnections()
477485

478486
sockets[MtmNodeId-1]=gateway;
479487
MtmRegisterSocket(gateway,MtmNodeId-1);
480-
481-
for (i=0;i<MtmNodes-1;i++) {
482-
MtmAcceptOneConnection();
483-
}
484488
}
485489

486490

@@ -693,6 +697,7 @@ static void MtmTransReceiver(Datum arg)
693697
msg->node,Mtm->disabledNodeMask,msg->disabledNodeMask);
694698
ts->status=TRANSACTION_STATUS_ABORTED;
695699
MtmAdjustSubtransactions(ts);
700+
Mtm->nActiveTransactions-=1;
696701
}
697702

698703
if (++ts->nVotes==Mtm->nNodes) {
@@ -712,6 +717,7 @@ static void MtmTransReceiver(Datum arg)
712717
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
713718
ts->status=TRANSACTION_STATUS_ABORTED;
714719
MtmAdjustSubtransactions(ts);
720+
Mtm->nActiveTransactions-=1;
715721
}
716722
if (++ts->nVotes==Mtm->nNodes) {
717723
MtmWakeUpBackend(ts);

‎contrib/mmts/bgwpool.h‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
typedefvoid(*BgwPoolExecutor)(intid,void*work,size_tsize);
99

1010
#defineMAX_DBNAME_LEN 30
11-
#defineMULTIMASTER_BGW_RESTART_TIMEOUT10/* seconds */
11+
#defineMULTIMASTER_BGW_RESTART_TIMEOUT1/* seconds */
1212

1313
typedefstruct
1414
{

‎contrib/mmts/multimaster.c‎

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -737,11 +737,15 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
737737
ts=hash_search(MtmXid2State,&x->xid,HASH_FIND,NULL);
738738
Assert(ts!=NULL);
739739

740-
if (!MtmIsCoordinator(ts)) {
740+
if (!MtmIsCoordinator(ts)||Mtm->status==MTM_RECOVERY) {
741741
MtmTransMap*tm= (MtmTransMap*)hash_search(MtmGid2State,x->gid,HASH_ENTER,NULL);
742742
Assert(x->gid[0]);
743743
tm->state=ts;
744-
MtmSendNotificationMessage(ts,MSG_READY);/* send notification to coordinator */
744+
if (Mtm->status!=MTM_RECOVERY) {
745+
MtmSendNotificationMessage(ts,MSG_READY);/* send notification to coordinator */
746+
}else {
747+
ts->status=TRANSACTION_STATUS_UNKNOWN;
748+
}
745749
MtmUnlock();
746750
MtmResetTransaction(x);
747751
}else {
@@ -770,6 +774,7 @@ MtmAbortPreparedTransaction(MtmCurrentTrans* x)
770774
Assert(tm!=NULL);
771775
tm->state->status=TRANSACTION_STATUS_ABORTED;
772776
MtmAdjustSubtransactions(tm->state);
777+
Mtm->nActiveTransactions-=1;
773778
MtmUnlock();
774779
x->status=TRANSACTION_STATUS_ABORTED;
775780
}
@@ -810,6 +815,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
810815
Mtm->nActiveTransactions-=1;
811816
}
812817
if (!commit&&x->isReplicated&&TransactionIdIsValid(x->gtid.xid)) {
818+
Assert(Mtm->status!=MTM_RECOVERY);
813819
/*
814820
* Send notification only if ABORT happens during transaction processing at replicas,
815821
* do not send notification if ABORT is receiver from master
@@ -1206,6 +1212,7 @@ void MtmCheckQuorum(void)
12061212
voidMtmOnNodeDisconnect(intnodeId)
12071213
{
12081214
BIT_SET(Mtm->connectivityMask,nodeId-1);
1215+
BIT_SET(Mtm->reconnectMask,nodeId-1);
12091216
RaftableSet(psprintf("node-mask-%d",MtmNodeId),&Mtm->connectivityMask,sizeofMtm->connectivityMask, false);
12101217

12111218
/* Wait more than socket KEEPALIVE timeout to let other nodes update their statuses */
@@ -1293,6 +1300,7 @@ static void MtmInitialize()
12931300
Mtm->pglogicalNodeMask=0;
12941301
Mtm->walSenderLockerMask=0;
12951302
Mtm->nodeLockerMask=0;
1303+
Mtm->reconnectMask=0;
12961304
Mtm->nLockers=0;
12971305
Mtm->nActiveTransactions=0;
12981306
Mtm->votingTransactions=NULL;

‎contrib/mmts/multimaster.h‎

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,8 @@ typedef struct
133133
nodemask_tpglogicalNodeMask;/* bitmask of started pglogic receivers */
134134
nodemask_twalSenderLockerMask;/* Mask of WAL-senders IDs locking the cluster */
135135
nodemask_tnodeLockerMask;/* Mask of node IDs which WAL-senders are locking the cluster */
136+
nodemask_treconnectMask;/* Mask of nodes connection to which has to be reestablished by sender */
137+
136138
intnNodes;/* Number of active nodes */
137139
intnReceivers;/* Number of initialized logical receivers (used to determine moment when Mtm intialization is completed */
138140
intnLockers;/* Number of lockers */

‎contrib/mmts/pglogical_apply.c‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -904,7 +904,7 @@ void MtmExecutor(int id, void* work, size_t size)
904904
{
905905
while (true) {
906906
charaction=pq_getmsgbyte(&s);
907-
MTM_TRACE("%d: REMOTE processactiob %c\n",MyProcPid,action);
907+
MTM_TRACE("%d: REMOTE processaction %c\n",MyProcPid,action);
908908
switch (action) {
909909
/* BEGIN */
910910
case'B':

‎contrib/mmts/pglogical_proto.c‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,10 +163,10 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
163163
pq_sendint64(out,txn->end_lsn);
164164
pq_sendint64(out,txn->commit_time);
165165

166-
if (flags==PGLOGICAL_COMMIT_PREPARED) {
166+
if (txn->xact_action==XLOG_XACT_COMMIT_PREPARED) {
167167
pq_sendint64(out,MtmGetTransactionCSN(txn->xid));
168168
}
169-
if (flags!=PGLOGICAL_COMMIT) {
169+
if (txn->xact_action!=XLOG_XACT_COMMIT) {
170170
pq_sendstring(out,txn->gid);
171171
}
172172
}

‎contrib/mmts/raftable.c‎

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,9 @@
11
#include<dlfcn.h>
22
#include"postgres.h"
3+
#include"raftable.h"
34
#include"raftable_wrapper.h"
45

56

6-
staticraftable_get_traftable_get_impl;
7-
staticraftable_set_traftable_set_impl;
8-
9-
staticvoidRaftableResolve()
10-
{
11-
if (raftable_get_impl==NULL) {
12-
void*dll=dlopen(NULL,RTLD_NOW);
13-
raftable_get_impl=dlsym(dll,"raftable_get");
14-
raftable_set_impl=dlsym(dll,"raftable_set");
15-
Assert(raftable_get_impl!=NULL&&raftable_set_impl!=NULL);
16-
}
17-
}
18-
197
/*
208
* Raftable function proxies
219
*/
@@ -24,16 +12,14 @@ void* RaftableGet(char const* key, size_t* size, RaftableTimestamp* ts, bool now
2412
if (!MtmUseRaftable) {
2513
returnNULL;
2614
}
27-
RaftableResolve();
28-
return (*raftable_get_impl)(key,size);
15+
returnraftable_get(key,size);
2916
}
3017

3118

3219
voidRaftableSet(charconst*key,voidconst*value,size_tsize,boolnowait)
3320
{
3421
if (MtmUseRaftable) {
35-
RaftableResolve();
36-
(*raftable_set_impl)(key,value,size,nowait ?0 :-1);
22+
raftable_set(key,value,size,nowait ?0 :-1);
3723
}
3824
}
3925

‎contrib/mmts/raftable_wrapper.h‎

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,6 @@ extern void RaftableSet(char const* key, void const* value, size_t size, bool n
3030
*/
3131
externboolRaftableCAS(charconst*key,charconst*value,boolnowait);
3232

33-
typedefvoid* (*raftable_get_t)(charconst*key,size_t*size);
34-
typedefvoid (*raftable_set_t)(charconst*key,voidconst*value,size_tsize,inttimeout_ms);
35-
3633
externboolMtmUseRaftable;
3734

3835
#endif

‎contrib/mmts/tests/postgresql.conf.mm‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@
144144

145145
#max_files_per_process =1000# min25
146146
# (changerequires restart)
147-
shared_preload_libraries ='multimaster'# (changerequires restart)
147+
shared_preload_libraries ='raftable,multimaster'# (changerequires restart)
148148

149149
# - Cost-Based Vacuum Delay -
150150

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp