Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitbe47745

Browse files
committed
Merge branch 'master' of github.com:postgrespro/postgres_cluster
2 parentse728adb +a0be307 commitbe47745

File tree

4 files changed

+707
-240
lines changed

4 files changed

+707
-240
lines changed

‎contrib/mmts/arbiter.c‎

Lines changed: 389 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,389 @@
1+
/*
2+
* multimaster.c
3+
*
4+
* Multimaster based on logical replication
5+
*
6+
*/
7+
8+
#include<unistd.h>
9+
#include<sys/time.h>
10+
#include<time.h>
11+
12+
#include"postgres.h"
13+
#include"fmgr.h"
14+
#include"miscadmin.h"
15+
#include"libpq-fe.h"
16+
#include"postmaster/postmaster.h"
17+
#include"postmaster/bgworker.h"
18+
#include"storage/s_lock.h"
19+
#include"storage/spin.h"
20+
#include"storage/lmgr.h"
21+
#include"storage/shmem.h"
22+
#include"storage/ipc.h"
23+
#include"access/xlogdefs.h"
24+
#include"access/xact.h"
25+
#include"access/xtm.h"
26+
#include"access/transam.h"
27+
#include"access/subtrans.h"
28+
#include"access/commit_ts.h"
29+
#include"access/xlog.h"
30+
#include"storage/proc.h"
31+
#include"storage/procarray.h"
32+
#include"executor/executor.h"
33+
#include"access/twophase.h"
34+
#include"utils/guc.h"
35+
#include"utils/hsearch.h"
36+
#include"utils/tqual.h"
37+
#include"utils/array.h"
38+
#include"utils/builtins.h"
39+
#include"utils/memutils.h"
40+
#include"commands/dbcommands.h"
41+
#include"miscadmin.h"
42+
#include"postmaster/autovacuum.h"
43+
#include"storage/pmsignal.h"
44+
#include"storage/proc.h"
45+
#include"utils/syscache.h"
46+
#include"replication/walsender.h"
47+
#include"replication/slot.h"
48+
#include"port/atomics.h"
49+
#include"tcop/utility.h"
50+
51+
#include"multimaster.h"
52+
53+
#defineMAX_CONNECT_ATTEMPTS 10
54+
#defineBUFFER_SIZE 1024
55+
#defineBUFFER_SIZE 1024
56+
57+
typedefstruct
58+
{
59+
TransactionIdxid;
60+
csn_tcsn;
61+
}DtmCommitMessage;
62+
63+
typedefstruct
64+
{
65+
DtmCommitMessagedata[BUFFER_SIZE];
66+
intused;
67+
}DtmBuffer;
68+
69+
staticint*sockets;
70+
71+
staticBackgroundWorkerDtmSender= {
72+
"mm-sender",
73+
0,/* do not need connection to the database */
74+
BgWorkerStart_PostmasterStart,
75+
1,/* restrart in one second (is it possible to restort immediately?) */
76+
DtmTransSender
77+
};
78+
79+
staticBackgroundWorkerDtmRecevier= {
80+
"mm-receiver",
81+
0,/* do not need connection to the database */
82+
BgWorkerStart_PostmasterStart,
83+
1,/* restrart in one second (is it possible to restort immediately?) */
84+
DtmTransReceiver
85+
};
86+
87+
voidMMArbiterInitialize()
88+
{
89+
RegisterBackgroundWorker(&DtmSender);
90+
RegisterBackgroundWorker(&DtmRecevier);
91+
}
92+
93+
94+
staticintresolve_host_by_name(constchar*hostname,unsigned*addrs,unsigned*n_addrs)
95+
{
96+
structsockaddr_insin;
97+
structhostent*hp;
98+
unsignedi;
99+
100+
sin.sin_addr.s_addr=inet_addr(hostname);
101+
if (sin.sin_addr.s_addr!=INADDR_NONE) {
102+
memcpy(&addrs[0],&sin.sin_addr.s_addr,sizeof(sin.sin_addr.s_addr));
103+
*n_addrs=1;
104+
return1;
105+
}
106+
107+
hp=gethostbyname(hostname);
108+
if (hp==NULL||hp->h_addrtype!=AF_INET) {
109+
return0;
110+
}
111+
for (i=0;hp->h_addr_list[i]!=NULL&&i<*n_addrs;i++) {
112+
memcpy(&addrs[i],hp->h_addr_list[i],sizeof(addrs[i]));
113+
}
114+
*n_addrs=i;
115+
return1;
116+
}
117+
118+
#ifdefUSE_EPOLL
119+
staticintepollfd;
120+
#else
121+
staticintmax_fd;
122+
staticfd_setinset;
123+
#endif
124+
125+
inlinevoidregisterSocket(intfd,inti)
126+
{
127+
#ifdefUSE_EPOLL
128+
structepoll_eventev;
129+
ev.events=EPOLLIN;
130+
ev.data.u32=i;
131+
if (epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&ev)<0) {
132+
charbuf[ERR_BUF_SIZE];
133+
sprintf(buf,"Failed to add socket %d to epoll set",fd);
134+
shub->params->error_handler(buf,SHUB_FATAL_ERROR);
135+
}
136+
#else
137+
FD_SET(fd,&inset);
138+
if (fd>max_fd) {
139+
max_fd=fd;
140+
}
141+
#endif
142+
}
143+
144+
145+
146+
staticintconnectSocket(charconst*host,intport)
147+
{
148+
structsockaddr_insock_inet;
149+
unsignedaddrs[128];
150+
unsignedi,n_addrs=sizeof(addrs) /sizeof(addrs[0]);
151+
intmax_attempts=MAX_CONNECT_ATTEMPTS;
152+
intsd;
153+
154+
sock_inet.sin_family=AF_INET;
155+
sock_inet.sin_port=htons(port);
156+
157+
if (!resolve_host_by_name(host,addrs,&n_addrs)) {
158+
elog(ERROR,"Failed to resolve host '%s' by name",host);
159+
}
160+
sd=socket(AF_INET,SOCK_STREAM,0);
161+
if (sd<0) {
162+
elog(ERROR,"Failed to create socket: %d",errno);
163+
}
164+
while (1) {
165+
intrc=-1;
166+
for (i=0;i<n_addrs;++i) {
167+
memcpy(&sock_inet.sin_addr,&addrs[i],sizeofsock_inet.sin_addr);
168+
do {
169+
rc=connect(sd, (structsockaddr*)&sock_inet,sizeof(sock_inet));
170+
}while (rc<0&&errno==EINTR);
171+
172+
if (rc >=0||errno==EINPROGRESS) {
173+
break;
174+
}
175+
}
176+
if (rc<0) {
177+
if ((errno!=ENOENT&&errno!=ECONNREFUSED&&errno!=EINPROGRESS)||max_attempts==0) {
178+
elog(ERROR,"Sockhub failed to connect to %s:%d: %d",host,port,errno);
179+
}else {
180+
max_attempts-=1;
181+
sleep(1);
182+
}
183+
continue;
184+
}else {
185+
intoptval=1;
186+
setsockopt(shub->output,IPPROTO_TCP,TCP_NODELAY, (charconst*)&optval,sizeof(optval));
187+
returnsd;
188+
}
189+
}
190+
}
191+
192+
staticvoidopenConnections()
193+
{
194+
intnNodes=dtm->nNodes;
195+
inti;
196+
char*connStr=pstrdup(MMConnStrs);
197+
198+
sockets= (int*)palloc(sizeof(int)*nNodes);
199+
200+
for (i=0;i<nNodes;i++) {
201+
char*host=strstr(connStr,"host=");
202+
char*end;
203+
if (host==NULL) {
204+
elog(ERROR,"Invalid connection string: '%s'",MMConnStrs);
205+
}
206+
for (end=host+5;*end!=' '&&*end!=','&&end!='\0';end++);
207+
*end='\0';
208+
connStr=end+1;
209+
sockets[i]=i+1!=MMNodeId ?connectSocket(host,MMArbiterPort+i) :-1;
210+
}
211+
}
212+
213+
staticvoidacceptConnections()
214+
{
215+
intnNodes=dtm->nNodes-1;
216+
sockaddr_insock_inet;
217+
inti;
218+
intsd;
219+
inton=1;
220+
221+
sockets= (int*)palloc(sizeof(int)*nNodes);
222+
223+
sock_inet.sin_family=AF_INET;
224+
sock_inet.sin_addr.s_addr=htonl(INADDR_ANY);
225+
sock_inet.sin_port=htons(MMArbiterPort+MMNodeId);
226+
227+
sd=socket(u.sock.sa_family,SOCK_STREAM,0);
228+
if (sd<0) {
229+
elog(ERROR,"Failed to create socket: %d",errno);
230+
}
231+
setsockopt(sd,SOL_SOCKET,SO_REUSEADDR, (char*)&on,sizeofon);
232+
233+
if (bind(fd, (sockaddr*)&sock_init,nNodes-1)<0) {
234+
elog(ERROR,"Failed to bind socket: %d",errno);
235+
}
236+
237+
for (i=0;i<nNodes;i++) {
238+
intfd=accept(sd,NULL,NULL);
239+
if (fd<0) {
240+
elog(ERROR,"Failed to accept socket: %d",errno);
241+
}
242+
registerSocket(fd,i);
243+
sockets[i]=fd;
244+
}
245+
}
246+
247+
staticvoidWriteSocket(intsd,voidconst*buf,intsize)
248+
{
249+
char*src= (char*)buf;
250+
while (size!=0) {
251+
intn=send(sd,src,size,0);
252+
if (n <=0) {
253+
return0;
254+
}
255+
size-=n;
256+
src+=n;
257+
}
258+
}
259+
260+
staticintReadSocket(intsd,void*buf,intbuf_size)
261+
{
262+
intrc=recv(sd,buf,buf_size,0);
263+
if (rc <=0) {
264+
elog(ERROR,"Arbiter failed to read socket: %d",rc);
265+
}
266+
returnrc;
267+
}
268+
269+
270+
staticvoidDtmTransSender(Datumarg)
271+
{
272+
intnNodes=dtm->nNodes;
273+
inti;
274+
DtmTxBuffer*txBuffer= (DtmTxBuffer*)palloc(sizeof(DtmTxBuffer)*nNodes);
275+
276+
sockets= (int*)palloc(sizeof(int)*nNodes);
277+
278+
openConnections();
279+
280+
for (i=0;i<nNodes;i++) {
281+
txBuffer[i].used=0;
282+
}
283+
284+
while (true) {
285+
DtmTransState*ts;
286+
PGSemaphoreLock(&dtm->semphore);
287+
CHECK_FOR_INTERRUPTS();
288+
289+
SpinLockAcquire(&dtm->spinlock);
290+
ts=dtm->pendingTransactions;
291+
dtm->pendingTransactions=NULL;
292+
SpinLockRelease(&dtm->spinlock);
293+
294+
for (;ts!=NULL;ts=ts->nextPending) {
295+
i=ts->gtid.node-1;
296+
Assert(i!=MMNodeId);
297+
if (txBuffer[i].used==BUFFER_SIZE) {
298+
WriteSocket(sockets[i],txBuffer[i].data,txBuffer[i].used*sizeof(DtmCommitRequest));
299+
txBuffer[i].used=0;
300+
}
301+
txBuffer[i].data[txBuffer[i].used].xid=ts->xid;
302+
txBuffer[i].data[txBuffer[i].used].csn=ts->csn;
303+
txBuffer[i].used+=1;
304+
}
305+
for (i=0;i<nNodes;i++) {
306+
if (txBuffer[i].used!=0) {
307+
WriteSocket(sockets[i],txBuffer[i].data,txBuffer[i].used*sizeof(DtmCommitRequest));
308+
txBuffer[i].used=0;
309+
}
310+
}
311+
}
312+
}
313+
314+
staticvoidDtmTransReceiver(Datumarg)
315+
{
316+
intnNodes=dtm->nNodes-1;
317+
inti,j,rc;
318+
intrxBufPos=0;
319+
DtmBuffer*rxBuffer= (DtmBuffer*)palloc(sizeof(DtmBuffer)*nNodes);
320+
HTAB*xid2state;
321+
322+
#ifdefUSE_EPOLL
323+
structepoll_event*events= (structepoll_event*)palloc(SIZEOF(structepoll_event)*nNodes);
324+
epollfd=epoll_create(nNodes);
325+
#else
326+
FD_ZERO(&inset);
327+
max_fd=0;
328+
#endif
329+
330+
acceptConnections();
331+
xid2state=MMCreateHash();
332+
333+
for (i=0;i<nNodes;i++) {
334+
txBuffer[i].used=0;
335+
}
336+
337+
while (true) {
338+
#ifdefUSE_EPOLL
339+
rc=epoll_wait(epollfd,events,MAX_EVENTS,shub->in_buffer_used==0 ?-1 :shub->params->delay);
340+
if (rc<0) {
341+
elog(ERROR,"epoll failed: %d",errno);
342+
}
343+
for (j=0;j<rc;j++) {
344+
i=events[j].data.u32;
345+
if (events[j].events&EPOLLERR) {
346+
structsockaddr_ininsock;
347+
socklen_tlen=sizeof(insock);
348+
getpeername(fd, (structsockaddr*)&insock,&len);
349+
elog(WARNING,"Loose connection with %s",inet_ntoa(insock.sin_addr_));
350+
epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,NULL);
351+
}
352+
elseif (events[j].events&EPOLLIN)
353+
#else
354+
fd_setevents;
355+
events=inset;
356+
rc=select(max_fd+1,&events,NULL,NULL,NULL);
357+
if (rc<0) {
358+
elog(ERROR,"select failed: %d",errno);
359+
}
360+
for (i=0;i<nNodes;i++) {
361+
if (FD_ISSET(sockets[i],&events))
362+
#endif
363+
{
364+
intnResponses;
365+
rxBuffer[i].used+=ReadSocket(sockets[i], (char*)rxBuffer[i].data+rxBuffer[i].used,RX_BUFFER_SIZE-rxBufPos);
366+
nResponses=rxBuffer[i].used/sizeof(DtmCommitRequest);
367+
368+
LWLockAcquire(&dtm->hashLock,LW_SHARED);
369+
370+
for (j=0;j<nResponses;j++) {
371+
DtmCommitRequest*req=&rxBuffer[i].data[j];
372+
DtmTransState*ts= (DtmTransState*)hash_search(xid2state,&req->xid,HASH_FIND,NULL);
373+
Assert(ts!=NULL);
374+
if (req->csn>ts->csn) {
375+
ts->csn=req->csn;
376+
}
377+
if (ts->nVotes==dtm->nNodes-1) {
378+
SetLatch(&ProcGlobal->allProcs[ts->pid].procLatch);
379+
}
380+
}
381+
if (rxBuffer[i].used!=nResponses*sizeof(DtmCommitRequest)) {
382+
rxBuffer[i].used-=nResponses*sizeof(DtmCommitRequest);
383+
memmove(rxBuffer[i].data, (char*)rxBuffer[i].data+nResponses*sizeof(DtmCommitRequest),rxBuffer[i].used);
384+
}
385+
}
386+
}
387+
}
388+
}
389+

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp