Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit4710350

Browse files
knizhnikkelvich
authored andcommitted
Code cleanup
1 parentde24524 commit4710350

File tree

5 files changed

+55
-94
lines changed

5 files changed

+55
-94
lines changed

‎arbiter.c

Lines changed: 26 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
#include"replication/slot.h"
5757
#include"port/atomics.h"
5858
#include"tcop/utility.h"
59+
#include"libpq/ip.h"
5960

6061
#ifndefUSE_EPOLL
6162
#ifdef__linux__
@@ -139,31 +140,6 @@ void MtmArbiterInitialize(void)
139140
RegisterBackgroundWorker(&MtmMonitorWorker);
140141
}
141142

142-
staticint
143-
MtmResolveHostByName(constchar*hostname,unsigned*addrs,unsigned*n_addrs)
144-
{
145-
structsockaddr_insin;
146-
structhostent*hp;
147-
unsignedi;
148-
149-
sin.sin_addr.s_addr=inet_addr(hostname);
150-
if (sin.sin_addr.s_addr!=INADDR_NONE) {
151-
memcpy(&addrs[0],&sin.sin_addr.s_addr,sizeof(sin.sin_addr.s_addr));
152-
*n_addrs=1;
153-
return1;
154-
}
155-
156-
hp=gethostbyname(hostname);
157-
if (hp==NULL||hp->h_addrtype!=AF_INET) {
158-
return0;
159-
}
160-
for (i=0;hp->h_addr_list[i]!=NULL&&i<*n_addrs;i++) {
161-
memcpy(&addrs[i],hp->h_addr_list[i],sizeof(addrs[i]));
162-
}
163-
*n_addrs=i;
164-
return1;
165-
}
166-
167143
staticintstop=0;
168144
staticvoidSetStop(intsig)
169145
{
@@ -352,7 +328,6 @@ static void MtmCheckResponse(MtmArbiterMessage* resp)
352328

353329
staticvoidMtmScheduleHeartbeat()
354330
{
355-
//Assert(!last_sent_heartbeat || last_sent_heartbeat + MSEC_TO_USEC(MtmHeartbeatRecvTimeout) >= MtmGetSystemTime());
356331
if (!stop) {
357332
enable_timeout_after(heartbeat_timer,MtmHeartbeatSendTimeout);
358333
send_heartbeat= true;
@@ -399,7 +374,6 @@ static void MtmSendHeartbeat()
399374
close(sockets[i]);
400375
sockets[i]=-1;
401376
MtmReconnectNode(i+1);/* set reconnect mask to force node reconnent */
402-
//MtmOnNodeConnect(i+1);
403377
}
404378
MTM_LOG4("Send heartbeat to node %d with timestamp %lld",i+1,now);
405379
}
@@ -426,23 +400,31 @@ void MtmCheckHeartbeat()
426400

427401
staticintMtmConnectSocket(intnode,intport,time_ttimeout)
428402
{
429-
structsockaddr_insock_inet;
430-
unsignedaddrs[MAX_ROUTES];
431-
unsignedi,n_addrs=sizeof(addrs) /sizeof(addrs[0]);
403+
structaddrinfo*addrs=NULL;
404+
structaddrinfo*addr;
405+
structaddrinfohint;
406+
charportstr[MAXPGPATH];
432407
MtmHandshakeMessagereq;
433408
MtmArbiterMessageresp;
434409
intsd;
410+
intret;
435411
timestamp_tstart=MtmGetSystemTime();
436412
charconst*host=Mtm->nodes[node].con.hostName;
437413
nodemask_tsave_mask=busy_mask;
438414
timestamp_tafterWait;
439415
timestamp_tbeforeWait;
440416

441-
sock_inet.sin_family=AF_INET;
442-
sock_inet.sin_port=htons(port);
417+
/* Initialize hint structure */
418+
MemSet(&hint,0,sizeof(hint));
419+
hint.ai_socktype=SOCK_STREAM;
420+
hint.ai_family=AF_UNSPEC;
421+
422+
snprintf(portstr,sizeof(portstr),"%d",port);
443423

444-
if (!MtmResolveHostByName(host,addrs,&n_addrs)) {
445-
MTM_ELOG(LOG,"Arbiter failed to resolve host '%s' by name",host);
424+
ret=pg_getaddrinfo_all(host,portstr,&hint,&addrs);
425+
if (ret!=0)
426+
{
427+
MTM_ELOG(LOG,"Arbiter failed to resolve host '%s' by name: %s",host,gai_strerror(ret));
446428
return-1;
447429
}
448430
BIT_SET(busy_mask,node);
@@ -459,13 +441,14 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
459441
rc=fcntl(sd,F_SETFL,O_NONBLOCK);
460442
if (rc<0) {
461443
MTM_ELOG(LOG,"Arbiter failed to switch socket to non-blocking mode: %d",errno);
444+
close(sd);
462445
busy_mask=save_mask;
463446
return-1;
464447
}
465-
for (i=0;i<n_addrs;++i) {
466-
memcpy(&sock_inet.sin_addr,&addrs[i],sizeofsock_inet.sin_addr);
448+
for (addr=addrs;addr!=NULL;addr=addr->ai_next)
449+
{
467450
do {
468-
rc=connect(sd,(structsockaddr*)&sock_inet,sizeof(sock_inet));
451+
rc=connect(sd,addr->ai_addr,addr->ai_addrlen);
469452
}while (rc<0&&errno==EINTR);
470453

471454
if (rc >=0||errno==EINPROGRESS) {
@@ -638,6 +621,7 @@ static void MtmAcceptOneConnection()
638621
rc=MtmReadSocket(fd,&req,sizeofreq);
639622
if (rc<sizeof(req)) {
640623
MTM_ELOG(WARNING,"Arbiter failed to handshake socket: %d, errno=%d",rc,errno);
624+
close(fd);
641625
}elseif (req.hdr.code!=MSG_HANDSHAKE&&req.hdr.dxid!=HANDSHAKE_MAGIC) {
642626
MTM_ELOG(WARNING,"Arbiter get unexpected handshake message %d",req.hdr.code);
643627
close(fd);
@@ -693,7 +677,9 @@ static void MtmAcceptIncomingConnections()
693677
if (gateway<0) {
694678
MTM_ELOG(ERROR,"Arbiter failed to create socket: %d",errno);
695679
}
696-
setsockopt(gateway,SOL_SOCKET,SO_REUSEADDR, (char*)&on,sizeofon);
680+
if (setsockopt(gateway,SOL_SOCKET,SO_REUSEADDR, (char*)&on,sizeofon)<0) {
681+
MTM_ELOG(ERROR,"Arbiter failed to set options for socket: %d",errno);
682+
}
697683

698684
if (bind(gateway, (structsockaddr*)&sock_inet,sizeof(sock_inet))<0) {
699685
MTM_ELOG(ERROR,"Arbiter failed to bind socket: %d",errno);
@@ -726,7 +712,6 @@ static void MtmAppendBuffer(MtmBuffer* txBuffer, MtmArbiterMessage* msg)
726712

727713
staticvoidMtmSender(Datumarg)
728714
{
729-
sigset_tsset;
730715
intnNodes=MtmMaxNodes;
731716
inti;
732717

@@ -737,8 +722,6 @@ static void MtmSender(Datum arg)
737722
signal(SIGINT,SetStop);
738723
signal(SIGQUIT,SetStop);
739724
signal(SIGTERM,SetStop);
740-
sigfillset(&sset);
741-
sigprocmask(SIG_UNBLOCK,&sset,NULL);
742725

743726
/* We're now ready to receive signals */
744727
BackgroundWorkerUnblockSignals();
@@ -815,13 +798,9 @@ static bool MtmRecovery()
815798

816799
staticvoidMtmMonitor(Datumarg)
817800
{
818-
sigset_tsset;
819-
820801
signal(SIGINT,SetStop);
821802
signal(SIGQUIT,SetStop);
822803
signal(SIGTERM,SetStop);
823-
sigfillset(&sset);
824-
sigprocmask(SIG_UNBLOCK,&sset,NULL);
825804

826805
/* We're now ready to receive signals */
827806
BackgroundWorkerUnblockSignals();
@@ -840,7 +819,6 @@ static void MtmMonitor(Datum arg)
840819

841820
staticvoidMtmReceiver(Datumarg)
842821
{
843-
sigset_tsset;
844822
intnNodes=MtmMaxNodes;
845823
intnResponses;
846824
inti,j,n,rc;
@@ -860,8 +838,6 @@ static void MtmReceiver(Datum arg)
860838
signal(SIGINT,SetStop);
861839
signal(SIGQUIT,SetStop);
862840
signal(SIGTERM,SetStop);
863-
sigfillset(&sset);
864-
sigprocmask(SIG_UNBLOCK,&sset,NULL);
865841

866842
/* We're now ready to receive signals */
867843
BackgroundWorkerUnblockSignals();
@@ -1078,7 +1054,6 @@ static void MtmReceiver(Datum arg)
10781054
}elseif (MtmUseDtm) {
10791055
ts->votedMask=0;
10801056
MTM_TXTRACE(ts,"MtmTransReceiver send MSG_PRECOMMIT");
1081-
//MtmSend2PCMessage(ts, MSG_PRECOMMIT);
10821057
Assert(replorigin_session_origin==InvalidRepOriginId);
10831058
MTM_LOG2("SetPreparedTransactionState for %s",ts->gid);
10841059
MtmUnlock();
@@ -1130,7 +1105,7 @@ static void MtmReceiver(Datum arg)
11301105
}else {
11311106
Assert(ts->status==TRANSACTION_STATUS_ABORTED);
11321107
MTM_ELOG(WARNING,"Receive PRECOMMITTED response for aborted transaction %s (%llu) from node %d",
1133-
ts->gid, (long64)ts->xid,node);// How it can happen? Should we use assert here?
1108+
ts->gid, (long64)ts->xid,node);
11341109
if ((ts->participantsMask& ~Mtm->disabledNodeMask& ~ts->votedMask)==0) {
11351110
MtmWakeUpBackend(ts);
11361111
}
@@ -1140,23 +1115,7 @@ static void MtmReceiver(Datum arg)
11401115
Assert(false);
11411116
}
11421117
}else {
1143-
switch (msg->code) {
1144-
caseMSG_PRECOMMIT:
1145-
Assert(false);// Now sent through pglogical
1146-
if (ts->status==TRANSACTION_STATUS_IN_PROGRESS) {
1147-
ts->status=TRANSACTION_STATUS_UNKNOWN;
1148-
ts->csn=MtmAssignCSN();
1149-
MtmAdjustSubtransactions(ts);
1150-
MtmSend2PCMessage(ts,MSG_PRECOMMITTED);
1151-
}elseif (ts->status==TRANSACTION_STATUS_ABORTED) {
1152-
MtmSend2PCMessage(ts,MSG_ABORTED);
1153-
}else {
1154-
MTM_ELOG(WARNING,"Transaction %s is already %s",ts->gid,MtmTxnStatusMnem[ts->status]);
1155-
}
1156-
break;
1157-
default:
1158-
Assert(false);
1159-
}
1118+
Assert(false);/* All broadcasts are now sent through pglogical */
11601119
}
11611120
}
11621121
MtmUnlock();

‎bgwpool.c

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ static void BgwPoolMainLoop(BgwPool* pool)
3131
intsize;
3232
void*work;
3333
staticPortalDatafakePortal;
34-
sigset_tsset;
3534

3635
MtmIsLogicalReceiver= true;
3736
MtmPool=pool;
@@ -40,9 +39,6 @@ static void BgwPoolMainLoop(BgwPool* pool)
4039
signal(SIGQUIT,BgwShutdownWorker);
4140
signal(SIGTERM,BgwShutdownWorker);
4241

43-
sigfillset(&sset);
44-
sigprocmask(SIG_UNBLOCK,&sset,NULL);
45-
4642
BackgroundWorkerUnblockSignals();
4743
BackgroundWorkerInitializeConnection(pool->dbname,pool->dbuser);
4844
ActivePortal=&fakePortal;
@@ -57,7 +53,7 @@ static void BgwPoolMainLoop(BgwPool* pool)
5753
}
5854
size=*(int*)&pool->queue[pool->head];
5955
Assert(size<pool->size);
60-
work=malloc(size);
56+
work=palloc(size);
6157
pool->pending-=1;
6258
pool->active+=1;
6359
if (pool->lastPeakTime==0&&pool->active==pool->nWorkers&&pool->pending!=0) {
@@ -80,7 +76,7 @@ static void BgwPoolMainLoop(BgwPool* pool)
8076
}
8177
SpinLockRelease(&pool->lock);
8278
pool->executor(work,size);
83-
free(work);
79+
pfree(work);
8480
SpinLockAcquire(&pool->lock);
8581
pool->active-=1;
8682
pool->lastPeakTime=0;
@@ -93,6 +89,9 @@ void BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, char const* dbname, c
9389
{
9490
MtmPool=pool;
9591
pool->queue= (char*)ShmemAlloc(queueSize);
92+
if (pool->queue==NULL) {
93+
elog(PANIC,"Failed to allocate memory for background workers pool: %lld bytes requested", (long64)queueSize);
94+
}
9695
pool->executor=executor;
9796
PGSemaphoreCreate(&pool->available);
9897
PGSemaphoreCreate(&pool->overflow);

‎multimaster.c

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1255,7 +1255,6 @@ MtmAbortPreparedTransaction(MtmCurrentTrans* x)
12551255
if (x->status!=TRANSACTION_STATUS_ABORTED) {
12561256
MtmLock(LW_EXCLUSIVE);
12571257
tm= (MtmTransMap*)hash_search(MtmGid2State,x->gid,HASH_FIND,NULL);
1258-
//tm = (MtmTransMap*)hash_search(MtmGid2State, x->gid, HASH_REMOVE, NULL);
12591258
if (tm==NULL) {
12601259
MTM_ELOG(WARNING,"Global transaciton ID '%s' is not found",x->gid);
12611260
}else {
@@ -1394,6 +1393,9 @@ void MtmSendMessage(MtmArbiterMessage* msg)
13941393
MtmMessageQueue*sendQueue=Mtm->sendQueue;
13951394
if (mq==NULL) {
13961395
mq= (MtmMessageQueue*)ShmemAlloc(sizeof(MtmMessageQueue));
1396+
if (mq==NULL) {
1397+
elog(PANIC,"Failed to allocate shared memory for message queue");
1398+
}
13971399
}else {
13981400
Mtm->freeQueue=mq->next;
13991401
}
@@ -1425,20 +1427,8 @@ void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd)
14251427
msg.lockReq=Mtm->nodeLockerMask!=0;
14261428
memcpy(msg.gid,ts->gid,MULTIMASTER_MAX_GID_SIZE);
14271429

1428-
if (MtmIsCoordinator(ts)) {
1429-
inti;
1430-
Assert(false);// All broadcasts are now done through logical decoding
1431-
for (i=0;i<Mtm->nAllNodes;i++)
1432-
{
1433-
if (BIT_CHECK(ts->participantsMask& ~Mtm->disabledNodeMask& ~ts->votedMask,i))
1434-
{
1435-
Assert(TransactionIdIsValid(ts->xids[i]));
1436-
msg.node=i+1;
1437-
msg.dxid=ts->xids[i];
1438-
MtmSendMessage(&msg);
1439-
}
1440-
}
1441-
}elseif (!BIT_CHECK(Mtm->disabledNodeMask,ts->gtid.node-1)) {
1430+
Assert(!MtmIsCoordinator(ts));/* All broadcasts are now done through logical decoding */
1431+
if (!BIT_CHECK(Mtm->disabledNodeMask,ts->gtid.node-1)) {
14421432
MTM_LOG2("Send %s message to node %d xid=%d gid=%s",MtmMessageKindMnem[cmd],ts->gtid.node,ts->gtid.xid,ts->gid);
14431433
msg.node=ts->gtid.node;
14441434
msg.dxid=ts->gtid.xid;
@@ -4674,6 +4664,10 @@ void MtmUpdateLockGraph(int nodeId, void const* messageBody, int messageSize)
46744664
if (messageSize>allocated) {
46754665
allocated=Max(Max(MULTIMASTER_LOCK_BUF_INIT_SIZE,allocated*2),messageSize);
46764666
Mtm->nodes[nodeId-1].lockGraphData=ShmemAlloc(allocated);
4667+
if (Mtm->nodes[nodeId-1].lockGraphData==NULL) {
4668+
elog(PANIC,"Failed to allocate shared memory for lock graph: %d bytes requested",
4669+
allocated);
4670+
}
46774671
Mtm->nodes[nodeId-1].lockGraphAllocated=allocated;
46784672
}
46794673
memcpy(Mtm->nodes[nodeId-1].lockGraphData,messageBody,messageSize);

‎pglogical_apply.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -650,7 +650,7 @@ process_remote_commit(StringInfo in)
650650
casePGLOGICAL_PRECOMMIT_PREPARED:
651651
{
652652
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
653-
strcpy(gid,pq_getmsgstring(in));
653+
strncpy(gid,pq_getmsgstring(in),sizeofgid);
654654
MTM_LOG2("%d: PGLOGICAL_PRECOMMIT_PREPARED %s",MyProcPid,gid);
655655
MtmBeginSession(origin_node);
656656
MtmPrecommitTransaction(gid);
@@ -671,7 +671,7 @@ process_remote_commit(StringInfo in)
671671
casePGLOGICAL_PREPARE:
672672
{
673673
Assert(IsTransactionState()&&TransactionIdIsValid(MtmGetCurrentTransactionId()));
674-
strcpy(gid,pq_getmsgstring(in));
674+
strncpy(gid,pq_getmsgstring(in),sizeofgid);
675675
if (MtmExchangeGlobalTransactionStatus(gid,TRANSACTION_STATUS_IN_PROGRESS)==TRANSACTION_STATUS_ABORTED) {
676676
MTM_LOG1("Avoid prepare of previously aborted global transaction %s",gid);
677677
AbortCurrentTransaction();
@@ -704,7 +704,7 @@ process_remote_commit(StringInfo in)
704704
{
705705
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
706706
csn=pq_getmsgint64(in);
707-
strcpy(gid,pq_getmsgstring(in));
707+
strncpy(gid,pq_getmsgstring(in),sizeofgid);
708708
MTM_LOG2("PGLOGICAL_COMMIT_PREPARED commit: csn=%lld, gid=%s, lsn=%llx",csn,gid,end_lsn);
709709
MtmResetTransaction();
710710
StartTransactionCommand();
@@ -721,7 +721,7 @@ process_remote_commit(StringInfo in)
721721
casePGLOGICAL_ABORT_PREPARED:
722722
{
723723
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
724-
strcpy(gid,pq_getmsgstring(in));
724+
strncpy(gid,pq_getmsgstring(in),sizeofgid);
725725
/* MtmRollbackPreparedTransaction will set origin session itself */
726726
MTM_LOG1("Receive ABORT_PREPARED logical message for transaction %s from node %d",gid,origin_node);
727727
MtmRollbackPreparedTransaction(origin_node,gid);

‎spill.c

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,12 @@ void MtmCreateSpillDirectory(int node_id)
3535
mkdir(path,S_IRWXU);
3636

3737
spill_dir=AllocateDir(path);
38-
38+
if (spill_dir==NULL) {
39+
ereport(PANIC,
40+
(errcode_for_file_access(),
41+
MTM_ERRMSG("pglogical_receiver failed to create spill directory \"%s\": %m",
42+
path)));
43+
}
3944
while ((spill_de=ReadDir(spill_dir,path))!=NULL)
4045
{
4146
if (strncmp(spill_de->d_name,"txn",3)==0)
@@ -90,7 +95,11 @@ int MtmOpenSpillFile(int node_id, int file_id)
9095
MTM_ERRMSG("pglogical_apply could not open spill file \"%s\": %m",
9196
path)));
9297
}
93-
unlink(path);/* Should remove file on close */
98+
if (unlink(path)<0) {/* Should remove file on close */
99+
ereport(LOG,
100+
(errcode_for_file_access(),
101+
MTM_ERRMSG("pglogical_apply failed to unlink spill file: %m")));
102+
}
94103
returnfd;
95104
}
96105

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp