Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit1b5946d

Browse files
knizhnikkelvich
authored andcommitted
Continue work on MMTS
1 parentcd6b6d2 commit1b5946d

File tree

3 files changed

+172
-33
lines changed

3 files changed

+172
-33
lines changed

‎arbiter.c

Lines changed: 150 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@
5151
#include"multimaster.h"
5252

5353
#defineMAX_CONNECT_ATTEMPTS 10
54-
#defineTX_BUFFER_SIZE 1024
54+
#defineBUFFER_SIZE 1024
55+
#defineBUFFER_SIZE 1024
5556

5657
typedefstruct
5758
{
@@ -61,12 +62,11 @@ typedef struct
6162

6263
typedefstruct
6364
{
64-
DtmCommitMessagedata[TX_BUFFER_SIZE];
65+
DtmCommitMessagedata[BUFFER_SIZE];
6566
intused;
66-
}DtmTxBuffer;
67+
}DtmBuffer;
6768

6869
staticint*sockets;
69-
staticDtmTxBuffer*txBuffers;
7070

7171
staticBackgroundWorkerDtmSender= {
7272
"mm-sender",
@@ -115,6 +115,34 @@ static int resolve_host_by_name(const char *hostname, unsigned* addrs, unsigned*
115115
return1;
116116
}
117117

118+
#ifdefUSE_EPOLL
119+
staticintepollfd;
120+
#else
121+
staticintmax_fd;
122+
staticfd_setinset;
123+
#endif
124+
125+
inlinevoidregisterSocket(intfd,inti)
126+
{
127+
#ifdefUSE_EPOLL
128+
structepoll_eventev;
129+
ev.events=EPOLLIN;
130+
ev.data.u32=i;
131+
if (epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&ev)<0) {
132+
charbuf[ERR_BUF_SIZE];
133+
sprintf(buf,"Failed to add socket %d to epoll set",fd);
134+
shub->params->error_handler(buf,SHUB_FATAL_ERROR);
135+
}
136+
#else
137+
FD_SET(fd,&inset);
138+
if (fd>max_fd) {
139+
max_fd=fd;
140+
}
141+
#endif
142+
}
143+
144+
145+
118146
staticintconnectSocket(charconst*host,intport)
119147
{
120148
structsockaddr_insock_inet;
@@ -206,20 +234,47 @@ static void acceptConnections()
206234
elog(ERROR,"Failed to bind socket: %d",errno);
207235
}
208236

209-
for (i=0;i<nNodes-1;i++) {
210-
sockets[i]=accept(sd,NULL,NULL);
211-
if (sockets[i]<0) {
237+
for (i=0;i<nNodes;i++) {
238+
intfd=accept(sd,NULL,NULL);
239+
if (fd<0) {
212240
elog(ERROR,"Failed to accept socket: %d",errno);
213241
}
242+
registerSocket(fd,i);
243+
sockets[i]=fd;
214244
}
215245
}
216246

247+
staticvoidWriteSocket(intsd,voidconst*buf,intsize)
248+
{
249+
char*src= (char*)buf;
250+
while (size!=0) {
251+
intn=send(sd,src,size,0);
252+
if (n <=0) {
253+
return0;
254+
}
255+
size-=n;
256+
src+=n;
257+
}
258+
}
259+
260+
staticintReadSocket(intsd,void*buf,intbuf_size)
261+
{
262+
intrc=recv(sd,buf,buf_size,0);
263+
if (rc <=0) {
264+
elog(ERROR,"Arbiter failed to read socket: %d",rc);
265+
}
266+
returnrc;
267+
}
268+
269+
217270
staticvoidDtmTransSender(Datumarg)
218271
{
219272
intnNodes=dtm->nNodes;
220273
inti;
221-
DtmCommitMessage*txBuffer= (DtmCommitMessage*)palloc(sizeof(DtmTxBuffer)*(nNodes));
274+
DtmTxBuffer*txBuffer= (DtmTxBuffer*)palloc(sizeof(DtmTxBuffer)*nNodes);
222275

276+
sockets= (int*)palloc(sizeof(int)*nNodes);
277+
223278
openConnections();
224279

225280
for (i=0;i<nNodes;i++) {
@@ -229,31 +284,106 @@ static void DtmTransSender(Datum arg)
229284
while (true) {
230285
DtmTransState*ts;
231286
PGSemaphoreLock(&dtm->semphore);
287+
CHECK_FOR_INTERRUPTS();
232288

233289
SpinLockAcquire(&dtm->spinlock);
234290
ts=dtm->pendingTransactions;
235291
dtm->pendingTransactions=NULL;
236292
SpinLockRelease(&dtm->spinlock);
237293

238-
for (ts=dtm->pendingTransactions;ts!=NULL;ts=ts->nextPending) {
239-
intnode=ts->gtid.node;
240-
Assert(node!=MMNodeId);
241-
node-=1;
242-
if (txBuffer[node].used==TX_BUFFER_SIZE) {
243-
WriteSocket(sockets[node],txBuffer[node].data,txBuffer[node].used*sizeof(DtmCommitRequest));
244-
txBuffer[node].used=0;
294+
for (;ts!=NULL;ts=ts->nextPending) {
295+
i=ts->gtid.node-1;
296+
Assert(i!=MMNodeId);
297+
if (txBuffer[i].used==BUFFER_SIZE) {
298+
WriteSocket(sockets[i],txBuffer[i].data,txBuffer[i].used*sizeof(DtmCommitRequest));
299+
txBuffer[i].used=0;
245300
}
246-
txBuffer[node].data[txBuffer[node].used].xid=ts->xid;
247-
txBuffer[node].data[txBuffer[node].used].csn=ts->csn;
248-
txBuffer[node].used+=1;
301+
txBuffer[i].data[txBuffer[i].used].xid=ts->xid;
302+
txBuffer[i].data[txBuffer[i].used].csn=ts->csn;
303+
txBuffer[i].used+=1;
249304
}
250-
dtm->pendingTransactions=NULL;
251-
305+
for (i=0;i<nNodes;i++) {
306+
if (txBuffer[i].used!=0) {
307+
WriteSocket(sockets[i],txBuffer[i].data,txBuffer[i].used*sizeof(DtmCommitRequest));
308+
txBuffer[i].used=0;
309+
}
310+
}
252311
}
253312
}
254313

255314
staticvoidDtmTransReceiver(Datumarg)
256315
{
316+
intnNodes=dtm->nNodes-1;
317+
inti,j,rc;
318+
intrxBufPos=0;
319+
DtmBuffer*rxBuffer= (DtmBuffer*)palloc(sizeof(DtmBuffer)*nNodes);
320+
HTAB*xid2state;
321+
322+
#ifdefUSE_EPOLL
323+
structepoll_event*events= (structepoll_event*)palloc(SIZEOF(structepoll_event)*nNodes);
324+
epollfd=epoll_create(nNodes);
325+
#else
326+
FD_ZERO(&inset);
327+
max_fd=0;
328+
#endif
329+
257330
acceptConnections();
331+
xid2state=MMCreateHash();
332+
333+
for (i=0;i<nNodes;i++) {
334+
txBuffer[i].used=0;
335+
}
336+
337+
while (true) {
338+
#ifdefUSE_EPOLL
339+
rc=epoll_wait(epollfd,events,MAX_EVENTS,shub->in_buffer_used==0 ?-1 :shub->params->delay);
340+
if (rc<0) {
341+
elog(ERROR,"epoll failed: %d",errno);
342+
}
343+
for (j=0;j<rc;j++) {
344+
i=events[j].data.u32;
345+
if (events[j].events&EPOLLERR) {
346+
structsockaddr_ininsock;
347+
socklen_tlen=sizeof(insock);
348+
getpeername(fd, (structsockaddr*)&insock,&len);
349+
elog(WARNING,"Loose connection with %s",inet_ntoa(insock.sin_addr_));
350+
epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,NULL);
351+
}
352+
elseif (events[j].events&EPOLLIN)
353+
#else
354+
fd_setevents;
355+
events=inset;
356+
rc=select(max_fd+1,&events,NULL,NULL,NULL);
357+
if (rc<0) {
358+
elog(ERROR,"select failed: %d",errno);
359+
}
360+
for (i=0;i<nNodes;i++) {
361+
if (FD_ISSET(sockets[i],&events))
362+
#endif
363+
{
364+
intnResponses;
365+
rxBuffer[i].used+=ReadSocket(sockets[i], (char*)rxBuffer[i].data+rxBuffer[i].used,RX_BUFFER_SIZE-rxBufPos);
366+
nResponses=rxBuffer[i].used/sizeof(DtmCommitRequest);
367+
368+
LWLockAcquire(&dtm->hashLock,LW_SHARED);
369+
370+
for (j=0;j<nResponses;j++) {
371+
DtmCommitRequest*req=&rxBuffer[i].data[j];
372+
DtmTransState*ts= (DtmTransState*)hash_search(xid2state,&req->xid,HASH_FIND,NULL);
373+
Assert(ts!=NULL);
374+
if (req->csn>ts->csn) {
375+
ts->csn=req->csn;
376+
}
377+
if (ts->nVotes==dtm->nNodes-1) {
378+
SetLatch(&ProcGlobal->allProcs[ts->pid].procLatch);
379+
}
380+
}
381+
if (rxBuffer[i].used!=nResponses*sizeof(DtmCommitRequest)) {
382+
rxBuffer[i].used-=nResponses*sizeof(DtmCommitRequest);
383+
memmove(rxBuffer[i].data, (char*)rxBuffer[i].data+nResponses*sizeof(DtmCommitRequest),rxBuffer[i].used);
384+
}
385+
}
386+
}
387+
}
258388
}
259389

‎multimaster.c

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,6 @@ static TransactionId DtmAdjustOldestXid(TransactionId xid)
385385
staticvoidDtmInitialize()
386386
{
387387
boolfound;
388-
staticHASHCTLinfo;
389388

390389
LWLockAcquire(AddinShmemInitLock,LW_EXCLUSIVE);
391390
dtm=ShmemInitStruct("dtm",sizeof(DtmState),&found);
@@ -403,18 +402,8 @@ static void DtmInitialize()
403402
RegisterXactCallback(DtmXactCallback,NULL);
404403
RegisterSubXactCallback(DtmSubXactCallback,NULL);
405404
}
405+
xid2state=MMCreateHash();
406406
LWLockRelease(AddinShmemInitLock);
407-
408-
info.keysize=sizeof(TransactionId);
409-
info.entrysize=sizeof(DtmTransState);
410-
info.hash=dtm_xid_hash_fn;
411-
info.match=dtm_xid_match_fn;
412-
xid2state=ShmemInitHash(
413-
"xid2state",
414-
DTM_HASH_SIZE,DTM_HASH_SIZE,
415-
&info,
416-
HASH_ELEM |HASH_FUNCTION |HASH_COMPARE
417-
);
418407
MMDoReplication= true;
419408
TM=&DtmTM;
420409
}
@@ -987,13 +976,32 @@ void MMVoteForTransaction(DtmTransState* ts)
987976
/* I am replica: first notify master... */
988977
SpinLockAcquire(&dtm->spinlock);
989978
ts->nextPending=dtm->pendingTransactions;
979+
ts->pid=MyProc->pgprocno;
990980
dtm->pendingTransactions=ts;
991981
SpinLockRelease(&dtm->spinlock);
992982

993983
PGSemaphoreUnlock(&dtm->semapahore);
994-
/* ... and waitreposnse from it */
984+
/* ... and waitresponse from it */
995985
WaitLatch(&MyProc->procLatch,WL_LATCH_SET,-1);
996986
ResetLatch(&MyProc->procLatch);
997987
}
998988
LWLockAcquire(&dtm->hashLock<LW_EXCLUSIVE);
999989
}
990+
991+
HTAB*MMCreateHash();
992+
{
993+
staticHASHCTLinfo;
994+
TAB*htab;
995+
info.keysize=sizeof(TransactionId);
996+
info.entrysize=sizeof(DtmTransState);
997+
info.hash=dtm_xid_hash_fn;
998+
info.match=dtm_xid_match_fn;
999+
htab=ShmemInitHash(
1000+
"xid2state",
1001+
DTM_HASH_SIZE,DTM_HASH_SIZE,
1002+
&info,
1003+
HASH_ELEM |HASH_FUNCTION |HASH_COMPARE
1004+
);
1005+
returnhtab;
1006+
}
1007+

‎multimaster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ extern void MMJoinTransaction(GlobalTransactionId gtid, csn_t snapshot);
6060
externvoidMMReceiverStarted(void);
6161
externvoidMMExecute(void*work,intsize);
6262
externvoidMMExecutor(intid,void*work,size_tsize);
63+
externHTAB*MMCreateHash();
6364

6465
externchar*MMDatabaseName;
6566

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp