Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitbf78f0b

Browse files
knizhnikkelvich
authored andcommitted
Add arbiter.c
1 parent4abea17 commitbf78f0b

File tree

1 file changed

+237
-0
lines changed

1 file changed

+237
-0
lines changed

‎arbiter.c

Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
/*
2+
* multimaster.c
3+
*
4+
* Multimaster based on logical replication
5+
*
6+
*/
7+
8+
#include<unistd.h>
9+
#include<sys/time.h>
10+
#include<time.h>
11+
12+
#include"postgres.h"
13+
#include"fmgr.h"
14+
#include"miscadmin.h"
15+
#include"libpq-fe.h"
16+
#include"postmaster/postmaster.h"
17+
#include"postmaster/bgworker.h"
18+
#include"storage/s_lock.h"
19+
#include"storage/spin.h"
20+
#include"storage/lmgr.h"
21+
#include"storage/shmem.h"
22+
#include"storage/ipc.h"
23+
#include"access/xlogdefs.h"
24+
#include"access/xact.h"
25+
#include"access/xtm.h"
26+
#include"access/transam.h"
27+
#include"access/subtrans.h"
28+
#include"access/commit_ts.h"
29+
#include"access/xlog.h"
30+
#include"storage/proc.h"
31+
#include"storage/procarray.h"
32+
#include"executor/executor.h"
33+
#include"access/twophase.h"
34+
#include"utils/guc.h"
35+
#include"utils/hsearch.h"
36+
#include"utils/tqual.h"
37+
#include"utils/array.h"
38+
#include"utils/builtins.h"
39+
#include"utils/memutils.h"
40+
#include"commands/dbcommands.h"
41+
#include"miscadmin.h"
42+
#include"postmaster/autovacuum.h"
43+
#include"storage/pmsignal.h"
44+
#include"storage/proc.h"
45+
#include"utils/syscache.h"
46+
#include"replication/walsender.h"
47+
#include"replication/slot.h"
48+
#include"port/atomics.h"
49+
#include"tcop/utility.h"
50+
51+
#include"multimaster.h"
52+
53+
#defineMAX_CONNECT_ATTEMPTS 10
54+
#defineTX_BUFFER_SIZE 1024
55+
56+
typedefstruct
57+
{
58+
TransactionIdxid;
59+
csn_tcsn;
60+
}DtmCommitMessage;
61+
62+
typedefstruct
63+
{
64+
DtmCOmmitMessagebuf[TX_BUFFER_SIZE];
65+
intused;
66+
}DtmTxBuffer;
67+
68+
staticint*sockets;
69+
staticDtmCommitMessage**txBuffers;
70+
71+
staticBackgroundWorkerDtmSender= {
72+
"mm-sender",
73+
0,/* do not need connection to the database */
74+
BgWorkerStart_PostmasterStart,
75+
1,/* restrart in one second (is it possible to restort immediately?) */
76+
DtmTransSender
77+
};
78+
79+
staticBackgroundWorkerDtmRecevier= {
80+
"mm-receiver",
81+
0,/* do not need connection to the database */
82+
BgWorkerStart_PostmasterStart,
83+
1,/* restrart in one second (is it possible to restort immediately?) */
84+
DtmTransReceiver
85+
};
86+
87+
voidMMArbiterInitialize()
88+
{
89+
RegisterBackgroundWorker(&DtmSender);
90+
RegisterBackgroundWorker(&DtmRecevier);
91+
}
92+
93+
94+
staticintresolve_host_by_name(constchar*hostname,unsigned*addrs,unsigned*n_addrs)
95+
{
96+
structsockaddr_insin;
97+
structhostent*hp;
98+
unsignedi;
99+
100+
sin.sin_addr.s_addr=inet_addr(hostname);
101+
if (sin.sin_addr.s_addr!=INADDR_NONE) {
102+
memcpy(&addrs[0],&sin.sin_addr.s_addr,sizeof(sin.sin_addr.s_addr));
103+
*n_addrs=1;
104+
return1;
105+
}
106+
107+
hp=gethostbyname(hostname);
108+
if (hp==NULL||hp->h_addrtype!=AF_INET) {
109+
return0;
110+
}
111+
for (i=0;hp->h_addr_list[i]!=NULL&&i<*n_addrs;i++) {
112+
memcpy(&addrs[i],hp->h_addr_list[i],sizeof(addrs[i]));
113+
}
114+
*n_addrs=i;
115+
return1;
116+
}
117+
118+
staticintconnectSocket(charconst*host,intport)
119+
{
120+
structsockaddr_insock_inet;
121+
unsignedaddrs[128];
122+
unsignedi,n_addrs=sizeof(addrs) /sizeof(addrs[0]);
123+
intmax_attempts=MAX_CONNECT_ATTEMPTS;
124+
intsd;
125+
126+
sock_inet.sin_family=AF_INET;
127+
sock_inet.sin_port=htons(port);
128+
129+
if (!resolve_host_by_name(host,addrs,&n_addrs)) {
130+
elog(ERROR,"Failed to resolve host '%s' by name",host);
131+
}
132+
sd=socket(AF_INET,SOCK_STREAM,0);
133+
if (sd<0) {
134+
elog(ERROR,"Failed to create socket: %d",errno);
135+
}
136+
while (1) {
137+
intrc=-1;
138+
for (i=0;i<n_addrs;++i) {
139+
memcpy(&sock_inet.sin_addr,&addrs[i],sizeofsock_inet.sin_addr);
140+
do {
141+
rc=connect(sd, (structsockaddr*)&sock_inet,sizeof(sock_inet));
142+
}while (rc<0&&errno==EINTR);
143+
144+
if (rc >=0||errno==EINPROGRESS) {
145+
break;
146+
}
147+
}
148+
if (rc<0) {
149+
if ((errno!=ENOENT&&errno!=ECONNREFUSED&&errno!=EINPROGRESS)||max_attempts==0) {
150+
elog(ERROR,"Sockhub failed to connect to %s:%d: %d",host,port,errno);
151+
}else {
152+
max_attempts-=1;
153+
sleep(1);
154+
}
155+
continue;
156+
}else {
157+
intoptval=1;
158+
setsockopt(shub->output,IPPROTO_TCP,TCP_NODELAY, (charconst*)&optval,sizeof(optval));
159+
returnsd;
160+
}
161+
}
162+
}
163+
164+
staticvoidopenConnections()
165+
{
166+
intnNodes=dtm->nNodes;
167+
inti;
168+
char*connStr=pstrdup(MMConnStrs);
169+
170+
sockets= (int*)palloc(sizeof(int)*nNodes);
171+
172+
for (i=0;i<nNodes;i++) {
173+
char*host=strstr(connStr,"host=");
174+
char*end;
175+
if (host==NULL) {
176+
elog(ERROR,"Invalid connection string: '%s'",MMConnStrs);
177+
}
178+
for (end=host+5;*end!=' '&&*end!=','&&end!='\0';end++);
179+
*end='\0';
180+
connStr=end+1;
181+
sockets[i]=i+1!=MMNodeId ?connectSocket(host,MMArbiterPort+i) :-1;
182+
}
183+
}
184+
185+
staticvoidacceptConnections()
186+
{
187+
intnNodes=dtm->nNodes-1;
188+
sockaddr_insock_inet;
189+
inti;
190+
intsd;
191+
inton=1;
192+
193+
sockets= (int*)palloc(sizeof(int)*nNodes);
194+
195+
sock_inet.sin_family=AF_INET;
196+
sock_inet.sin_addr.s_addr=htonl(INADDR_ANY);
197+
sock_inet.sin_port=htons(MMArbiterPort+MMNodeId);
198+
199+
sd=socket(u.sock.sa_family,SOCK_STREAM,0);
200+
if (sd<0) {
201+
elog(ERROR,"Failed to create socket: %d",errno);
202+
}
203+
setsockopt(sd,SOL_SOCKET,SO_REUSEADDR, (char*)&on,sizeofon);
204+
205+
if (bind(fd, (sockaddr*)&sock_init,nNodes-1)<0) {
206+
elog(ERROR,"Failed to bind socket: %d",errno);
207+
}
208+
209+
for (i=0;i<nNodes-1;i++) {
210+
sockets[i]=accept(sd,NULL,NULL);
211+
if (sockets[i]<0) {
212+
elog(ERROR,"Failed to accept socket: %d",errno);
213+
}
214+
}
215+
}
216+
217+
staticvoidDtmTransSender(Datumarg)
218+
{
219+
txBuffer= (DtmCommitMessage*)
220+
openConnections();
221+
222+
while (true) {
223+
DtmTransState*ts;
224+
PGSemaphoreLock(&dtm->semphore);
225+
226+
LWLockAcquire(&dtm->hashLock,LW_EXCLUSIVE);
227+
for (ts=dtm->pendingTransactions;ts!=NULL;ts=ts->nextPending) {
228+
intnode=ts->gtid.node;
229+
Assert(node!=MMNodeId);
230+
sockets
231+
}
232+
233+
staticvoidDtmTransReceiver(Datumarg)
234+
{
235+
acceptConnections();
236+
}
237+

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp