Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commite62a796

Browse files
knizhnikkelvich
authored andcommitted
Fix recovery
1 parentd29666d commite62a796

File tree

9 files changed

+42
-43
lines changed

9 files changed

+42
-43
lines changed

‎arbiter.c

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -289,8 +289,7 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
289289
elog(ERROR,"Arbiter failed to resolve host '%s' by name",host);
290290
}
291291

292-
Retry:
293-
292+
Retry:
294293
while (1) {
295294
intrc=-1;
296295

@@ -384,20 +383,29 @@ static void MtmOpenConnections()
384383

385384

386385
staticboolMtmSendToNode(intnode,voidconst*buf,intsize)
387-
{
388-
while (sockets[node]<0|| !MtmWriteSocket(sockets[node],buf,size)) {
389-
elog(WARNING,"Arbiter failed to write to node %d: %d",node+1,errno);
390-
if (sockets[node] >=0) {
386+
{
387+
while (true) {
388+
if (sockets[node] >=0&&BIT_CHECK(Mtm->reconnectMask,node)) {
389+
elog(WARNING,"Arbiter is forced to reconnect to node %d",node+1);
390+
BIT_CLEAR(Mtm->reconnectMask,node);
391391
close(sockets[node]);
392+
sockets[node]=-1;
392393
}
393-
sockets[node]=MtmConnectSocket(Mtm->nodes[node].con.hostName,MtmArbiterPort+node+1,MtmReconnectAttempts);
394-
if (sockets[node]<0) {
395-
MtmOnNodeDisconnect(node+1);
396-
return false;
394+
if (sockets[node]<0|| !MtmWriteSocket(sockets[node],buf,size)) {
395+
if (sockets[node] >=0) {
396+
elog(WARNING,"Arbiter failed to write to node %d: %d",node+1,errno);
397+
close(sockets[node]);
398+
}
399+
sockets[node]=MtmConnectSocket(Mtm->nodes[node].con.hostName,MtmArbiterPort+node+1,MtmReconnectAttempts);
400+
if (sockets[node]<0) {
401+
MtmOnNodeDisconnect(node+1);
402+
return false;
403+
}
404+
MTM_TRACE("Arbiter restablished connection with node %d\n",node+1);
405+
}else {
406+
return true;
397407
}
398-
elog(NOTICE,"Arbiter restablish connection with node %d",node+1);
399408
}
400-
return true;
401409
}
402410

403411
staticintMtmReadFromNode(intnode,void*buf,intbuf_size)
@@ -477,10 +485,6 @@ static void MtmAcceptIncomingConnections()
477485

478486
sockets[MtmNodeId-1]=gateway;
479487
MtmRegisterSocket(gateway,MtmNodeId-1);
480-
481-
for (i=0;i<MtmNodes-1;i++) {
482-
MtmAcceptOneConnection();
483-
}
484488
}
485489

486490

@@ -693,6 +697,7 @@ static void MtmTransReceiver(Datum arg)
693697
msg->node,Mtm->disabledNodeMask,msg->disabledNodeMask);
694698
ts->status=TRANSACTION_STATUS_ABORTED;
695699
MtmAdjustSubtransactions(ts);
700+
Mtm->nActiveTransactions-=1;
696701
}
697702

698703
if (++ts->nVotes==Mtm->nNodes) {
@@ -712,6 +717,7 @@ static void MtmTransReceiver(Datum arg)
712717
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
713718
ts->status=TRANSACTION_STATUS_ABORTED;
714719
MtmAdjustSubtransactions(ts);
720+
Mtm->nActiveTransactions-=1;
715721
}
716722
if (++ts->nVotes==Mtm->nNodes) {
717723
MtmWakeUpBackend(ts);

‎bgwpool.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
typedefvoid(*BgwPoolExecutor)(intid,void*work,size_tsize);
99

1010
#defineMAX_DBNAME_LEN 30
11-
#defineMULTIMASTER_BGW_RESTART_TIMEOUT10/* seconds */
11+
#defineMULTIMASTER_BGW_RESTART_TIMEOUT1/* seconds */
1212

1313
typedefstruct
1414
{

‎multimaster.c

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -736,11 +736,15 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
736736
ts=hash_search(MtmXid2State,&x->xid,HASH_FIND,NULL);
737737
Assert(ts!=NULL);
738738

739-
if (!MtmIsCoordinator(ts)) {
739+
if (!MtmIsCoordinator(ts)||Mtm->status==MTM_RECOVERY) {
740740
MtmTransMap*tm= (MtmTransMap*)hash_search(MtmGid2State,x->gid,HASH_ENTER,NULL);
741741
Assert(x->gid[0]);
742742
tm->state=ts;
743-
MtmSendNotificationMessage(ts,MSG_READY);/* send notification to coordinator */
743+
if (Mtm->status!=MTM_RECOVERY) {
744+
MtmSendNotificationMessage(ts,MSG_READY);/* send notification to coordinator */
745+
}else {
746+
ts->status=TRANSACTION_STATUS_UNKNOWN;
747+
}
744748
MtmUnlock();
745749
MtmResetTransaction(x);
746750
}else {
@@ -769,6 +773,7 @@ MtmAbortPreparedTransaction(MtmCurrentTrans* x)
769773
Assert(tm!=NULL);
770774
tm->state->status=TRANSACTION_STATUS_ABORTED;
771775
MtmAdjustSubtransactions(tm->state);
776+
Mtm->nActiveTransactions-=1;
772777
MtmUnlock();
773778
x->status=TRANSACTION_STATUS_ABORTED;
774779
}
@@ -809,6 +814,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
809814
Mtm->nActiveTransactions-=1;
810815
}
811816
if (!commit&&x->isReplicated&&TransactionIdIsValid(x->gtid.xid)) {
817+
Assert(Mtm->status!=MTM_RECOVERY);
812818
/*
813819
* Send notification only if ABORT happens during transaction processing at replicas,
814820
* do not send notification if ABORT is receiver from master
@@ -1205,6 +1211,7 @@ void MtmCheckQuorum(void)
12051211
voidMtmOnNodeDisconnect(intnodeId)
12061212
{
12071213
BIT_SET(Mtm->connectivityMask,nodeId-1);
1214+
BIT_SET(Mtm->reconnectMask,nodeId-1);
12081215
RaftableSet(psprintf("node-mask-%d",MtmNodeId),&Mtm->connectivityMask,sizeofMtm->connectivityMask, false);
12091216

12101217
/* Wait more than socket KEEPALIVE timeout to let other nodes update their statuses */
@@ -1292,6 +1299,7 @@ static void MtmInitialize()
12921299
Mtm->pglogicalNodeMask=0;
12931300
Mtm->walSenderLockerMask=0;
12941301
Mtm->nodeLockerMask=0;
1302+
Mtm->reconnectMask=0;
12951303
Mtm->nLockers=0;
12961304
Mtm->nActiveTransactions=0;
12971305
Mtm->votingTransactions=NULL;

‎multimaster.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,8 @@ typedef struct
133133
nodemask_tpglogicalNodeMask;/* bitmask of started pglogic receivers */
134134
nodemask_twalSenderLockerMask;/* Mask of WAL-senders IDs locking the cluster */
135135
nodemask_tnodeLockerMask;/* Mask of node IDs which WAL-senders are locking the cluster */
136+
nodemask_treconnectMask;/* Mask of nodes connection to which has to be reestablished by sender */
137+
136138
intnNodes;/* Number of active nodes */
137139
intnReceivers;/* Number of initialized logical receivers (used to determine moment when Mtm intialization is completed */
138140
intnLockers;/* Number of lockers */

‎pglogical_apply.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -902,7 +902,7 @@ void MtmExecutor(int id, void* work, size_t size)
902902
{
903903
while (true) {
904904
charaction=pq_getmsgbyte(&s);
905-
MTM_TRACE("%d: REMOTE processactiob %c\n",MyProcPid,action);
905+
MTM_TRACE("%d: REMOTE processaction %c\n",MyProcPid,action);
906906
switch (action) {
907907
/* BEGIN */
908908
case'B':

‎pglogical_proto.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,10 +163,10 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
163163
pq_sendint64(out,txn->end_lsn);
164164
pq_sendint64(out,txn->commit_time);
165165

166-
if (flags==PGLOGICAL_COMMIT_PREPARED) {
166+
if (txn->xact_action==XLOG_XACT_COMMIT_PREPARED) {
167167
pq_sendint64(out,MtmGetTransactionCSN(txn->xid));
168168
}
169-
if (flags!=PGLOGICAL_COMMIT) {
169+
if (txn->xact_action!=XLOG_XACT_COMMIT) {
170170
pq_sendstring(out,txn->gid);
171171
}
172172
}

‎raftable.c

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,9 @@
11
#include<dlfcn.h>
22
#include"postgres.h"
3+
#include"raftable.h"
34
#include"raftable_wrapper.h"
45

56

6-
staticraftable_get_traftable_get_impl;
7-
staticraftable_set_traftable_set_impl;
8-
9-
staticvoidRaftableResolve()
10-
{
11-
if (raftable_get_impl==NULL) {
12-
void*dll=dlopen(NULL,RTLD_NOW);
13-
raftable_get_impl=dlsym(dll,"raftable_get");
14-
raftable_set_impl=dlsym(dll,"raftable_set");
15-
Assert(raftable_get_impl!=NULL&&raftable_set_impl!=NULL);
16-
}
17-
}
18-
197
/*
208
* Raftable function proxies
219
*/
@@ -24,16 +12,14 @@ void* RaftableGet(char const* key, size_t* size, RaftableTimestamp* ts, bool now
2412
if (!MtmUseRaftable) {
2513
returnNULL;
2614
}
27-
RaftableResolve();
28-
return (*raftable_get_impl)(key,size);
15+
returnraftable_get(key,size);
2916
}
3017

3118

3219
voidRaftableSet(charconst*key,voidconst*value,size_tsize,boolnowait)
3320
{
3421
if (MtmUseRaftable) {
35-
RaftableResolve();
36-
(*raftable_set_impl)(key,value,size,nowait ?0 :-1);
22+
raftable_set(key,value,size,nowait ?0 :-1);
3723
}
3824
}
3925

‎raftable_wrapper.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,6 @@ extern void RaftableSet(char const* key, void const* value, size_t size, bool n
3030
*/
3131
externboolRaftableCAS(charconst*key,charconst*value,boolnowait);
3232

33-
typedefvoid* (*raftable_get_t)(charconst*key,size_t*size);
34-
typedefvoid (*raftable_set_t)(charconst*key,voidconst*value,size_tsize,inttimeout_ms);
35-
3633
externboolMtmUseRaftable;
3734

3835
#endif

‎tests/postgresql.conf.mm

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@
144144

145145
#max_files_per_process =1000# min25
146146
# (changerequires restart)
147-
shared_preload_libraries ='multimaster'# (changerequires restart)
147+
shared_preload_libraries ='raftable,multimaster'# (changerequires restart)
148148

149149
# - Cost-Based Vacuum Delay -
150150

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp