Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit1552662

Browse files
committed
Fix timeouts and other fatal bugs in raftable :)
1 parent3e6d19c commit1552662

File tree

8 files changed

+177
-139
lines changed

8 files changed

+177
-139
lines changed

‎contrib/raftable/Makefile‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
MODULE_big = raftable
2-
OBJS = raftable.o worker.o state.o blockmem.o raft/obj/raft.o raft/obj/util.o
2+
OBJS = raftable.o worker.o state.o blockmem.otimeout.oraft/obj/raft.o raft/obj/util.o
33
EXTENSION = raftable
44
DATA = raftable--1.0.sql
55

‎contrib/raftable/blockmem.c‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ block_fill(void *origin, int id, void *src, size_t len)
5858
returnlen;
5959
}
6060

61-
void
61+
staticvoid
6262
block_clear(void*origin,intid)
6363
{
6464
TAIL(origin,id)=0;

‎contrib/raftable/raftable.c‎

Lines changed: 63 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,14 @@
1212
#include"access/htup_details.h"
1313
#include"miscadmin.h"
1414
#include"funcapi.h"
15-
#include"utils/timestamp.h"
1615

1716
#include"raft.h"
1817
#include"util.h"
1918

2019
#include"raftable.h"
2120
#include"worker.h"
2221
#include"state.h"
22+
#include"timeout.h"
2323

2424
#include<poll.h>
2525
#include<sys/socket.h>
@@ -81,42 +81,33 @@ static void disconnect_leader(void)
8181
leadersock=-1;
8282
}
8383

84-
staticboolpoll_until_writable(intsock,inttimeout_ms)
84+
85+
staticboolpoll_until_writable(intsock,timeout_t*timeout)
8586
{
8687
structpollfdpfd= {sock,POLLOUT,0};
87-
intr=poll(&pfd,1,timeout_ms);
88+
intr=poll(&pfd,1,timeout_remaining_ms(timeout));
8889
if (r!=1)return false;
8990
return (pfd.revents&POLLOUT)!=0;
9091
}
9192

92-
staticboolpoll_until_readable(intsock,inttimeout_ms)
93+
staticboolpoll_until_readable(intsock,timeout_t*timeout)
9394
{
9495
structpollfdpfd= {sock,POLLIN,0};
95-
intr=poll(&pfd,1,timeout_ms);
96+
intremain=timeout_remaining_ms(timeout);
97+
intr=poll(&pfd,1,remain);
9698
if (r!=1)return false;
9799
return (pfd.revents&POLLIN)!=0;
98100
}
99101

100-
staticlongmsec(TimestampTztimer)
101-
{
102-
longsec;
103-
intusec;
104-
TimestampDifference(0,timer,&sec,&usec);
105-
returnsec*1000+usec /1000;
106-
}
107-
108-
staticbooltimed_write(intsock,void*data,size_tlen,inttimeout_ms)
102+
staticbooltimed_write(intsock,void*data,size_tlen,timeout_t*timeout)
109103
{
110-
TimestampTzstart,now;
111104
intsent=0;
112105

113-
now=start=GetCurrentTimestamp();
114-
115106
while (sent<len)
116107
{
117108
intnewbytes;
118-
now=GetCurrentTimestamp();
119-
if ((timeout_ms!=-1)&& (msec(now-start)>timeout_ms)){
109+
if (timeout_happened(timeout))
110+
{
120111
elog(WARNING,"write timed out");
121112
return false;
122113
}
@@ -125,12 +116,11 @@ static bool timed_write(int sock, void *data, size_t len, int timeout_ms)
125116
if (newbytes==-1)
126117
{
127118
if (errno==EAGAIN) {
128-
intremaining_ms= (timeout_ms==-1) ?-1 :timeout_ms-msec(now-start);
129-
if (poll_until_writable(sock,remaining_ms)) {
119+
if (poll_until_writable(sock,timeout)) {
130120
continue;
131121
}
132122
}
133-
elog(WARNING,"failed to write:%s",strerror(errno));
123+
elog(WARNING,"failed to write:error %d: %s",errno,strerror(errno));
134124
return false;
135125
}
136126
sent+=newbytes;
@@ -139,17 +129,15 @@ static bool timed_write(int sock, void *data, size_t len, int timeout_ms)
139129
return true;
140130
}
141131

142-
staticbooltimed_read(intsock,void*data,size_tlen,inttimeout_ms)
132+
staticbooltimed_read(intsock,void*data,size_tlen,timeout_t*timeout)
143133
{
144134
intrecved=0;
145-
TimestampTzstart,now;
146-
now=start=GetCurrentTimestamp();
147135

148136
while (recved<len)
149137
{
150138
intnewbytes;
151-
now=GetCurrentTimestamp();
152-
if ((timeout_ms!=-1)&& (msec(now-start)>timeout_ms)){
139+
if (timeout_happened(timeout))
140+
{
153141
elog(WARNING,"read timed out");
154142
return false;
155143
}
@@ -158,12 +146,11 @@ static bool timed_read(int sock, void *data, size_t len, int timeout_ms)
158146
if (newbytes==-1)
159147
{
160148
if (errno==EAGAIN) {
161-
intremaining_ms= (timeout_ms==-1) ?-1 :timeout_ms-msec(now-start);
162-
if (poll_until_readable(sock,remaining_ms)) {
149+
if (poll_until_readable(sock,timeout)) {
163150
continue;
164151
}
165152
}
166-
elog(WARNING,"failed to read:%s",strerror(errno));
153+
elog(WARNING,"failed to read:error %d: %s",errno,strerror(errno));
167154
return false;
168155
}
169156
recved+=newbytes;
@@ -172,16 +159,14 @@ static bool timed_read(int sock, void *data, size_t len, int timeout_ms)
172159
return true;
173160
}
174161

175-
staticboolconnect_leader(inttimeout_ms)
162+
staticboolconnect_leader(timeout_t*timeout)
176163
{
177164
structaddrinfo*addrs=NULL;
178165
structaddrinfohint;
179166
charportstr[6];
180167
structaddrinfo*a;
181168
intrc;
182-
183-
TimestampTznow;
184-
intelapsed_ms;
169+
intsd;
185170

186171
HostPort*leaderhp;
187172

@@ -198,23 +183,21 @@ static bool connect_leader(int timeout_ms)
198183
if ((rc=getaddrinfo(leaderhp->host,portstr,&hint,&addrs)))
199184
{
200185
disconnect_leader();
201-
fprintf(stderr,"failed to resolve address '%s:%d': %s",
202-
leaderhp->host,leaderhp->port,
203-
gai_strerror(rc));
186+
elog(WARNING,"failed to resolve address '%s:%d': %s",
187+
leaderhp->host,leaderhp->port,
188+
gai_strerror(rc));
204189
return false;
205190
}
206191

207-
fprintf(stderr,"trying [%d] %s:%d\n",*shared.leader,leaderhp->host,leaderhp->port);
208-
elapsed_ms=0;
209-
now=GetCurrentTimestamp();
192+
elog(WARNING,"trying [%d] %s:%d",*shared.leader,leaderhp->host,leaderhp->port);
210193
for (a=addrs;a!=NULL;a=a->ai_next)
211194
{
212195
intone=1;
213196

214-
intsd=socket(a->ai_family,SOCK_STREAM |SOCK_NONBLOCK,0);
197+
sd=socket(a->ai_family,SOCK_STREAM |SOCK_NONBLOCK,0);
215198
if (sd==-1)
216199
{
217-
perror("failed to create a socket");
200+
elog(WARNING,"failed to create a socket: %s",strerror(errno));
218201
continue;
219202
}
220203
setsockopt(sd,IPPROTO_TCP,TCP_NODELAY,&one,sizeof(one));
@@ -223,54 +206,54 @@ static bool connect_leader(int timeout_ms)
223206
{
224207
if (errno==EINPROGRESS)
225208
{
226-
while ((elapsed_ms <=timeout_ms)|| (timeout_ms==-1))
209+
TIMEOUT_LOOP_START(timeout);
227210
{
228-
TimestampTzpast=now;
229-
intremaining_ms= (timeout_ms==-1) ?-1 :timeout_ms-elapsed_ms;
230-
231-
if (poll_until_writable(sd,remaining_ms))
211+
if (poll_until_writable(sd,timeout))
232212
{
233213
interr;
234214
socklen_toptlen=sizeof(err);
235215
getsockopt(sd,SOL_SOCKET,SO_ERROR,&err,&optlen);
236-
if (err==0)
237-
{
238-
// success
239-
break;
240-
}
216+
if (err==0) gotosuccess;
241217
}
242-
243-
now=GetCurrentTimestamp();
244-
elapsed_ms+=msec(now-past);
245218
}
219+
TIMEOUT_LOOP_END(timeout);
220+
elog(WARNING,"connect timed out");
221+
gotofailure;
246222
}
247223
else
248224
{
249-
perror("failed to connect to an address");
225+
elog(WARNING,"failed to connect to an address: %s",strerror(errno));
250226
close(sd);
251227
continue;
252228
}
253229
}
254230

255-
/* success */
256-
freeaddrinfo(addrs);
257-
leadersock=sd;
258-
return true;
231+
gotosuccess;
259232
}
233+
failure:
260234
freeaddrinfo(addrs);
261235
disconnect_leader();
262-
fprintf(stderr,"could not connect\n");
236+
elog(WARNING,"could not connect");
263237
return false;
238+
success:
239+
freeaddrinfo(addrs);
240+
leadersock=sd;
241+
return true;
242+
}
243+
244+
staticvoidwait_ms(intms)
245+
{
246+
structtimespects= {0,ms*1000000};
247+
nanosleep(&ts,NULL);
264248
}
265249

266-
staticintget_connection(inttimeout_ms)
250+
staticintget_connection(timeout_t*timeout)
267251
{
268252
if (leadersock<0)
269253
{
270-
if (connect_leader(timeout_ms))returnleadersock;
271-
//int timeout_ms = 100;
272-
//struct timespec timeout = {0, timeout_ms * 1000000};
273-
//nanosleep(&timeout, NULL);
254+
if (connect_leader(timeout))returnleadersock;
255+
elog(WARNING,"update: connect_leader() failed");
256+
wait_ms(100);
274257
}
275258
returnleadersock;
276259
}
@@ -302,66 +285,37 @@ raftable_sql_get(PG_FUNCTION_ARGS)
302285
PG_RETURN_NULL();
303286
}
304287

305-
staticbooltry_sending_update(RaftableUpdate*ru,size_tsize,inttimeout_ms)
288+
staticbooltry_sending_update(RaftableUpdate*ru,size_tsize,timeout_t*timeout)
306289
{
307-
ints,status,remaining_ms;
308-
TimestampTzstart,now;
290+
ints,status;
309291

310-
now=start=GetCurrentTimestamp();
311-
312-
s=get_connection(timeout_ms- (now-start));
292+
s=get_connection(timeout);
313293
if (s<0)return false;
314294

315-
now=GetCurrentTimestamp();
316-
remaining_ms= (timeout_ms==-1) ?-1 :timeout_ms-msec(now-start);
317-
if ((timeout_ms!=-1)&& (msec(now-start)>timeout_ms))
295+
if (timeout_happened(timeout))
318296
{
319-
elog(WARNING,"update:connect() timed out");
297+
elog(WARNING,"update:get_connection() timed out");
320298
return false;
321299
}
322300

323-
if (!timed_write(s,&size,sizeof(size),remaining_ms))
301+
if (!timed_write(s,&size,sizeof(size),timeout))
324302
{
325303
elog(WARNING,"failed to send the update size to the leader");
326304
return false;
327305
}
328306

329-
now=GetCurrentTimestamp();
330-
remaining_ms= (timeout_ms==-1) ?-1 :timeout_ms-msec(now-start);
331-
if ((timeout_ms!=-1)&& (msec(now-start)>timeout_ms))
332-
{
333-
elog(WARNING,"update: send(size) timed out");
334-
return false;
335-
}
336-
337-
if (!timed_write(s,ru,size,remaining_ms))
307+
if (!timed_write(s,ru,size,timeout))
338308
{
339309
elog(WARNING,"failed to send the update to the leader");
340310
return false;
341311
}
342312

343-
now=GetCurrentTimestamp();
344-
remaining_ms= (timeout_ms==-1) ?-1 :timeout_ms-msec(now-start);
345-
if ((timeout_ms!=-1)&& (msec(now-start)>timeout_ms))
346-
{
347-
elog(WARNING,"update: send(body) timed out");
348-
return false;
349-
}
350-
351-
if (!timed_read(s,&status,sizeof(status),remaining_ms))
313+
if (!timed_read(s,&status,sizeof(status),timeout))
352314
{
353315
elog(WARNING,"failed to recv the update status from the leader");
354316
return false;
355317
}
356318

357-
now=GetCurrentTimestamp();
358-
remaining_ms= (timeout_ms==-1) ?-1 :timeout_ms-msec(now-start);
359-
if ((timeout_ms!=-1)&& (msec(now-start)>timeout_ms))
360-
{
361-
elog(WARNING,"update: recv(status) timed out");
362-
return false;
363-
}
364-
365319
if (status!=1)
366320
{
367321
elog(WARNING,"update: leader returned status = %d",status);
@@ -377,8 +331,8 @@ bool raftable_set(const char *key, const char *value, size_t vallen, int timeout
377331
RaftableUpdate*ru;
378332
size_tsize=sizeof(RaftableUpdate);
379333
size_tkeylen=0;
380-
TimestampTznow;
381-
intelapsed_ms;
334+
timeout_ttimeout;
335+
timeout_start(&timeout,timeout_ms);
382336

383337
Assert(wcfg.id >=0);
384338

@@ -398,27 +352,20 @@ bool raftable_set(const char *key, const char *value, size_t vallen, int timeout
398352
memcpy(f->data,key,keylen);
399353
memcpy(f->data+keylen,value,vallen);
400354

401-
elapsed_ms=0;
402-
now=GetCurrentTimestamp();
403-
while ((elapsed_ms <=timeout_ms)|| (timeout_ms==-1))
355+
TIMEOUT_LOOP_START(&timeout);
404356
{
405-
TimestampTzpast=now;
406-
intremaining_ms= (timeout_ms==-1) ?-1 :timeout_ms-elapsed_ms;
407-
if (try_sending_update(ru,size,remaining_ms))
357+
if (try_sending_update(ru,size,&timeout))
408358
{
409359
pfree(ru);
410360
return true;
411361
}
412362
else
413-
{
414363
disconnect_leader();
415-
}
416-
now=GetCurrentTimestamp();
417-
elapsed_ms+=msec(now-past);
418364
}
365+
TIMEOUT_LOOP_END(&timeout);
419366

420367
pfree(ru);
421-
elog(WARNING,"failed to set raftable value after %d ms",elapsed_ms);
368+
elog(WARNING,"failed to set raftable value after %d ms",timeout_elapsed_ms(&timeout));
422369
return false;
423370
}
424371

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp