Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit98eb376

Browse files
committed
Make raftable use non-blocking socket.
1 parent9adeb34 commit98eb376

File tree

1 file changed

+181
-43
lines changed

1 file changed

+181
-43
lines changed

‎contrib/raftable/raftable.c‎

Lines changed: 181 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include"worker.h"
2222
#include"state.h"
2323

24+
#include<poll.h>
2425
#include<sys/socket.h>
2526
#include<netinet/in.h>
2627
#include<netinet/tcp.h>
@@ -80,17 +81,111 @@ static void disconnect_leader(void)
8081
leadersock=-1;
8182
}
8283

83-
staticboolconnect_leader(void)
84+
staticboolpoll_until_writable(intsock,inttimeout_ms)
85+
{
86+
structpollfdpfd= {sock,POLLOUT,0};
87+
intr=poll(&pfd,1,timeout_ms);
88+
if (r!=1)return false;
89+
returnpfd.revents&POLLOUT;
90+
}
91+
92+
staticboolpoll_until_readable(intsock,inttimeout_ms)
93+
{
94+
structpollfdpfd= {sock,POLLIN,0};
95+
intr=poll(&pfd,1,timeout_ms);
96+
if (r!=1)return false;
97+
returnpfd.revents&POLLIN;
98+
}
99+
100+
staticlongmsec(TimestampTztimer)
101+
{
102+
longsec;
103+
intusec;
104+
TimestampDifference(0,timer,&sec,&usec);
105+
returnsec*1000+usec /1000;
106+
}
107+
108+
staticbooltimed_write(intsock,void*data,size_tlen,inttimeout_ms)
109+
{
110+
TimestampTzstart,now;
111+
intsent=0;
112+
113+
now=start=GetCurrentTimestamp();
114+
115+
while (sent<len)
116+
{
117+
intnewbytes;
118+
now=GetCurrentTimestamp();
119+
if ((timeout_ms!=-1)&& (msec(now-start)>timeout_ms)) {
120+
elog(WARNING,"write timed out");
121+
return false;
122+
}
123+
124+
newbytes=write(sock, (char*)data+sent,len-sent);
125+
if (newbytes==-1)
126+
{
127+
if (errno==EAGAIN) {
128+
if (poll_until_writable(sock,timeout_ms-msec(now-start))) {
129+
continue;
130+
}
131+
}
132+
elog(WARNING,"failed to write: %s",strerror(errno));
133+
return false;
134+
}
135+
sent+=newbytes;
136+
}
137+
138+
return true;
139+
}
140+
141+
staticbooltimed_read(intsock,void*data,size_tlen,inttimeout_ms)
142+
{
143+
intrecved=0;
144+
TimestampTzstart,now;
145+
now=start=GetCurrentTimestamp();
146+
147+
while (recved<len)
148+
{
149+
intnewbytes;
150+
now=GetCurrentTimestamp();
151+
if ((timeout_ms!=-1)&& (msec(now-start)>timeout_ms)) {
152+
elog(WARNING,"read timed out");
153+
return false;
154+
}
155+
156+
newbytes=read(sock, (char*)data+recved,len-recved);
157+
if (newbytes==-1)
158+
{
159+
if (errno==EAGAIN) {
160+
if (poll_until_readable(sock,timeout_ms-msec(now-start))) {
161+
continue;
162+
}
163+
}
164+
elog(WARNING,"failed to read: %s",strerror(errno));
165+
return false;
166+
}
167+
recved+=newbytes;
168+
}
169+
170+
return true;
171+
}
172+
173+
staticboolconnect_leader(inttimeout_ms)
84174
{
85175
structaddrinfo*addrs=NULL;
86176
structaddrinfohint;
87177
charportstr[6];
88178
structaddrinfo*a;
89179
intrc;
90180

181+
TimestampTznow;
182+
intelapsed_ms;
183+
184+
HostPort*leaderhp;
185+
91186
if (*shared.leader==NOBODY)select_next_peer();
92187

93-
HostPort*leaderhp=wcfg.peers+*shared.leader;
188+
leaderhp=wcfg.peers+*shared.leader;
94189

95190
memset(&hint,0,sizeof(hint));
96191
hint.ai_socktype=SOCK_STREAM;
@@ -108,11 +203,13 @@ static bool connect_leader(void)
108203
}
109204

110205
fprintf(stderr,"trying [%d] %s:%d\n",*shared.leader,leaderhp->host,leaderhp->port);
206+
elapsed_ms=0;
207+
now=GetCurrentTimestamp();
111208
for (a=addrs;a!=NULL;a=a->ai_next)
112209
{
113210
intone=1;
114211

115-
intsd=socket(a->ai_family,a->ai_socktype,a->ai_protocol);
212+
intsd=socket(a->ai_family,SOCK_STREAM |SOCK_NONBLOCK,0);
116213
if (sd==-1)
117214
{
118215
perror("failed to create a socket");
@@ -122,9 +219,34 @@ static bool connect_leader(void)
122219

123220
if (connect(sd,a->ai_addr,a->ai_addrlen)==-1)
124221
{
125-
perror("failed to connect to an address");
126-
close(sd);
127-
continue;
222+
if (errno==EINPROGRESS)
223+
{
224+
while ((elapsed_ms <=timeout_ms)|| (timeout_ms==-1))
225+
{
226+
TimestampTzpast=now;
227+
228+
if (poll_until_writable(sd,timeout_ms-elapsed_ms))
229+
{
230+
interr;
231+
socklen_toptlen=sizeof(err);
232+
getsockopt(sd,SOL_SOCKET,SO_ERROR,&err,&optlen);
233+
if (err==0)
234+
{
235+
// success
236+
break;
237+
}
238+
}
239+
240+
now=GetCurrentTimestamp();
241+
elapsed_ms+=msec(now-past);
242+
}
243+
}
244+
else
245+
{
246+
perror("failed to connect to an address");
247+
close(sd);
248+
continue;
249+
}
128250
}
129251

130252
/* success */
@@ -138,15 +260,14 @@ static bool connect_leader(void)
138260
return false;
139261
}
140262

141-
staticintget_connection(void)
263+
staticintget_connection(inttimeout_ms)
142264
{
143265
if (leadersock<0)
144266
{
145-
if (connect_leader())returnleadersock;
146-
147-
inttimeout_ms=100;
148-
structtimespectimeout= {0,timeout_ms*1000000};
149-
nanosleep(&timeout,NULL);
267+
if (connect_leader(timeout_ms))returnleadersock;
268+
//int timeout_ms = 100;
269+
//struct timespec timeout = {0, timeout_ms * 1000000};
270+
//nanosleep(&timeout, NULL);
150271
}
151272
returnleadersock;
152273
}
@@ -162,11 +283,12 @@ raftable_sql_get(PG_FUNCTION_ARGS)
162283
{
163284
RaftableKeykey;
164285
size_tlen;
286+
char*s;
165287
text_to_cstring_buffer(PG_GETARG_TEXT_P(0),key.data,sizeof(key.data));
166288

167289
Assert(shared.state);
168290

169-
char*s=state_get(shared.state,key.data,&len);
291+
s=state_get(shared.state,key.data,&len);
170292
if (s)
171293
{
172294
text*t=cstring_to_text_with_len(s,len);
@@ -177,54 +299,65 @@ raftable_sql_get(PG_FUNCTION_ARGS)
177299
PG_RETURN_NULL();
178300
}
179301

180-
staticlongmsec(TimestampTztimer)
302+
staticbooltry_sending_update(RaftableUpdate*ru,size_tsize,inttimeout_ms)
181303
{
182-
longsec;
183-
intusec;
184-
TimestampDifference(0,timer,&sec,&usec);
185-
returnsec*1000+usec /1000;
186-
}
304+
ints,status;
305+
TimestampTzstart,now;
187306

188-
staticbooltry_sending_update(RaftableUpdate*ru,size_tsize)
189-
{
190-
ints=get_connection();
307+
now=start=GetCurrentTimestamp();
191308

309+
s=get_connection(timeout_ms- (now-start));
192310
if (s<0)return false;
193311

194-
intsent=0,recved=0;
195-
intstatus;
312+
now=GetCurrentTimestamp();
313+
if ((timeout_ms!=-1)&& (msec(now-start)>timeout_ms))
314+
{
315+
elog(WARNING,"update: connect() timed out");
316+
return false;
317+
}
196318

197-
if (write(s,&size,sizeof(size))!=sizeof(size))
319+
if (!timed_write(s,&size,sizeof(size),timeout_ms-msec(now-start)))
198320
{
199-
disconnect_leader();
200321
elog(WARNING,"failed to send the update size to the leader");
201322
return false;
202323
}
203324

204-
while (sent<size)
325+
now=GetCurrentTimestamp();
326+
if ((timeout_ms!=-1)&& (msec(now-start)>timeout_ms))
205327
{
206-
intnewbytes=write(s, (char*)ru+sent,size-sent);
207-
if (newbytes==-1)
208-
{
209-
disconnect_leader();
210-
elog(WARNING,"failed to send the update to the leader");
211-
return false;
212-
}
213-
sent+=newbytes;
328+
elog(WARNING,"update: send(size) timed out");
329+
return false;
214330
}
215331

216-
recved=read(s,&status,sizeof(status));
217-
if (recved!=sizeof(status))
332+
if (!timed_write(s,ru,size,timeout_ms-msec(now-start)))
333+
{
334+
elog(WARNING,"failed to send the update to the leader");
335+
return false;
336+
}
337+
338+
now=GetCurrentTimestamp();
339+
if ((timeout_ms!=-1)&& (msec(now-start)>timeout_ms))
340+
{
341+
elog(WARNING,"update: send(body) timed out");
342+
return false;
343+
}
344+
345+
if (!timed_read(s,&status,sizeof(status),timeout_ms-msec(now-start)))
218346
{
219-
disconnect_leader();
220347
elog(WARNING,"failed to recv the update status from the leader");
221348
return false;
222349
}
223350

351+
now=GetCurrentTimestamp();
352+
if ((timeout_ms!=-1)&& (msec(now-start)>timeout_ms))
353+
{
354+
elog(WARNING,"update: recv(status) timed out");
355+
return false;
356+
}
357+
224358
if (status!=1)
225359
{
226-
disconnect_leader();
227-
elog(WARNING,"leader returned %d",status);
360+
elog(WARNING,"update: leader returned status = %d",status);
228361
return false;
229362
}
230363

@@ -233,6 +366,7 @@ static bool try_sending_update(RaftableUpdate *ru, size_t size)
233366

234367
boolraftable_set(constchar*key,constchar*value,size_tvallen,inttimeout_ms)
235368
{
369+
RaftableField*f;
236370
RaftableUpdate*ru;
237371
size_tsize=sizeof(RaftableUpdate);
238372
size_tkeylen=0;
@@ -251,7 +385,7 @@ bool raftable_set(const char *key, const char *value, size_t vallen, int timeout
251385
ru->expector=wcfg.id;
252386
ru->fieldnum=1;
253387

254-
RaftableField*f= (RaftableField*)ru->data;
388+
f= (RaftableField*)ru->data;
255389
f->keylen=keylen;
256390
f->vallen=vallen;
257391
memcpy(f->data,key,keylen);
@@ -262,17 +396,21 @@ bool raftable_set(const char *key, const char *value, size_t vallen, int timeout
262396
while ((elapsed_ms <=timeout_ms)|| (timeout_ms==-1))
263397
{
264398
TimestampTzpast=now;
265-
if (try_sending_update(ru,size))
399+
if (try_sending_update(ru,size,timeout_ms-elapsed_ms))
266400
{
267401
pfree(ru);
268402
return true;
269403
}
404+
else
405+
{
406+
disconnect_leader();
407+
}
270408
now=GetCurrentTimestamp();
271409
elapsed_ms+=msec(now-past);
272410
}
273411

274412
pfree(ru);
275-
elog(WARNING,"failed to set raftable value after %d ms",timeout_ms);
413+
elog(WARNING,"failed to set raftable value after %d ms",elapsed_ms);
276414
return false;
277415
}
278416

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp