Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitd8c0394

Browse files
committed
Add timeouts and vallen to raftable API.
1 parenta088277 commitd8c0394

File tree

7 files changed

+149
-108
lines changed

7 files changed

+149
-108
lines changed

‎contrib/raftable/README‎

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,23 +40,24 @@ The backend can also issue commands to itself through C API.
4040

4141
C API:
4242
/* Gets value by key. Returns the value or NULL if not found. */
43-
char *raftable_get(char *key);
43+
char *raftable_get(constchar *key, size_t *vallen);
4444

4545
/*
46-
* Adds/updates value by key. Returns when the value gets replicated on
47-
* current machine. Storing NULL will delete the item from the table.
46+
* Adds/updates value by key. Returns when the value gets replicated.
47+
* Storing NULL will delete the item from the table. Gives up after
48+
* 'timeout_ms' milliseconds.
4849
*/
49-
void raftable_set(char *key, char *value);
50+
void raftable_set(constchar *key,constchar *value, size_t len, int timeout_ms);
5051

5152
/*
5253
* Iterates over all items in the table, calling func(key, value, arg)
5354
* for each of them.
5455
*/
55-
void raftable_every(void (*func)(char *, char *, void *), void *arg);
56+
void raftable_every(void (*func)(constchar *,constchar *, size_t, void *), void *arg);
5657

5758
SQL API:
5859
-- set
59-
raftable(key varchar(64), value text,tries int);
60+
raftable(key varchar(64), value text,timeout_ms int);
6061

6162
-- get
6263
raftable(key varchar(64)) returns text;

‎contrib/raftable/raftable--1.0.sql‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ AS 'MODULE_PATHNAME','raftable_sql_get'
88
LANGUAGE C;
99

1010
-- set
11-
CREATEFUNCTIONraftable(keyvarchar(64), valuetext,triesint)
11+
CREATEFUNCTIONraftable(keyvarchar(64), valuetext,timeout_msint)
1212
RETURNS void
1313
AS'MODULE_PATHNAME','raftable_sql_set'
1414
LANGUAGE C;

‎contrib/raftable/raftable.c‎

Lines changed: 110 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include"access/htup_details.h"
1313
#include"miscadmin.h"
1414
#include"funcapi.h"
15+
#include"utils/timestamp.h"
1516

1617
#include"raft.h"
1718
#include"util.h"
@@ -54,9 +55,19 @@ static void *get_shared_state(void)
5455

5556
staticvoidselect_next_peer(void)
5657
{
57-
do {
58-
*shared.leader= (*shared.leader+1) %RAFTABLE_PEERS_MAX;
59-
}while (!wcfg.peers[*shared.leader].up);
58+
intorig_leader=*shared.leader;
59+
inti;
60+
for (i=0;i<RAFTABLE_PEERS_MAX;i++)
61+
{
62+
intidx= (orig_leader+i+1) %RAFTABLE_PEERS_MAX;
63+
HostPort*hp=wcfg.peers+idx;
64+
if (hp->up)
65+
{
66+
*shared.leader=idx;
67+
return;
68+
}
69+
}
70+
elog(WARNING,"all raftable peers down");
6071
}
6172

6273
staticvoiddisconnect_leader(void)
@@ -129,56 +140,116 @@ static bool connect_leader(void)
129140

130141
staticintget_connection(void)
131142
{
132-
while (leadersock<0)
143+
if (leadersock<0)
133144
{
134-
if (connect_leader())break;
145+
if (connect_leader())returnleadersock;
135146

136-
inttimeout_ms=1000;
147+
inttimeout_ms=100;
137148
structtimespectimeout= {0,timeout_ms*1000000};
138149
nanosleep(&timeout,NULL);
139150
}
140151
returnleadersock;
141152
}
142153

143-
char*raftable_get(char*key)
154+
char*raftable_get(constchar*key,size_t*len)
144155
{
145-
returnstate_get(shared.state,key);
156+
returnstate_get(shared.state,key,len);
146157
}
147158

148159
Datum
149160
raftable_sql_get(PG_FUNCTION_ARGS)
150161
{
151162
RaftableEntry*e;
152163
RaftableKeykey;
164+
size_tlen;
153165
text_to_cstring_buffer(PG_GETARG_TEXT_P(0),key.data,sizeof(key.data));
154166

155167
Assert(shared.state);
156168

157-
char*s=state_get(shared.state,key.data);
169+
char*s=state_get(shared.state,key.data,&len);
158170
if (s)
159171
{
160-
text*t=cstring_to_text(s);
172+
text*t=cstring_to_text_with_len(s,len);
161173
pfree(s);
162174
PG_RETURN_TEXT_P(t);
163175
}
164176
else
165177
PG_RETURN_NULL();
166178
}
167179

168-
boolraftable_set(char*key,char*value,inttries)
180+
staticvoidstart_timer(TimestampTz*timer)
169181
{
170-
RaftableUpdate*ru;
171-
size_tsize=sizeof(RaftableUpdate);
172-
intkeylen,vallen=0;
173-
boolok= false;
182+
*timer-=GetCurrentTimestamp();
183+
}
184+
185+
staticvoidstop_timer(TimestampTz*timer)
186+
{
187+
*timer+=GetCurrentTimestamp();
188+
}
189+
190+
staticlongmsec(TimestampTztimer)
191+
{
192+
longsec;
193+
intusec;
194+
TimestampDifference(0,timer,&sec,&usec);
195+
returnsec*1000+usec /1000;
196+
}
197+
198+
staticbooltry_sending_update(RaftableUpdate*ru,size_tsize)
199+
{
200+
ints=get_connection();
174201

175-
if (tries <=0)
202+
if (s<0)return false;
203+
204+
intsent=0,recved=0;
205+
intstatus;
206+
207+
if (write(s,&size,sizeof(size))!=sizeof(size))
208+
{
209+
disconnect_leader();
210+
elog(WARNING,"failed to send the update size to the leader");
211+
return false;
212+
}
213+
214+
while (sent<size)
176215
{
177-
elog(ERROR,"raftable set should be called with 'tries' > 0");
216+
intnewbytes=write(s, (char*)ru+sent,size-sent);
217+
if (newbytes==-1)
218+
{
219+
disconnect_leader();
220+
elog(WARNING,"failed to send the update to the leader");
221+
return false;
222+
}
223+
sent+=newbytes;
178224
}
179225

226+
recved=read(s,&status,sizeof(status));
227+
if (recved!=sizeof(status))
228+
{
229+
disconnect_leader();
230+
elog(WARNING,"failed to recv the update status from the leader");
231+
return false;
232+
}
233+
234+
if (status!=1)
235+
{
236+
disconnect_leader();
237+
elog(WARNING,"leader returned %d",status);
238+
return false;
239+
}
240+
241+
return true;
242+
}
243+
244+
boolraftable_set(constchar*key,constchar*value,size_tvallen,inttimeout_ms)
245+
{
246+
RaftableUpdate*ru;
247+
size_tsize=sizeof(RaftableUpdate);
248+
size_tkeylen=0;
249+
TimestampTznow;
250+
intelapsed_ms;
251+
180252
keylen=strlen(key)+1;
181-
if (value)vallen=strlen(value)+1;
182253

183254
size+=sizeof(RaftableField)-1;
184255
size+=keylen;
@@ -194,84 +265,54 @@ bool raftable_set(char *key, char *value, int tries)
194265
memcpy(f->data,key,keylen);
195266
memcpy(f->data+keylen,value,vallen);
196267

197-
tryagain:
198-
if (tries--)
268+
elapsed_ms=0;
269+
now=GetCurrentTimestamp();
270+
while ((elapsed_ms <=timeout_ms)|| (timeout_ms==-1))
199271
{
200-
ints=get_connection();
201-
intsent=0,recved=0;
202-
intstatus;
203-
204-
if (write(s,&size,sizeof(size))!=sizeof(size))
272+
TimestampTzpast=now;
273+
if (try_sending_update(ru,size))
205274
{
206-
disconnect_leader();
207-
elog(WARNING,"failed[%d] to send the update size to the leader",tries);
208-
gototryagain;
209-
}
210-
211-
while (sent<size)
212-
{
213-
intnewbytes=write(s, (char*)ru+sent,size-sent);
214-
if (newbytes==-1)
215-
{
216-
disconnect_leader();
217-
elog(WARNING,"failed[%d] to send the update to the leader",tries);
218-
gototryagain;
219-
}
220-
sent+=newbytes;
221-
}
222-
223-
recved=read(s,&status,sizeof(status));
224-
if (recved!=sizeof(status))
225-
{
226-
disconnect_leader();
227-
elog(WARNING,"failed to recv the update status from the leader\n");
228-
gototryagain;
275+
pfree(ru);
276+
return true;
229277
}
230-
gotosuccess;
231-
}
232-
else
233-
{
234-
gotofailure;
278+
now=GetCurrentTimestamp();
279+
elapsed_ms+=msec(now-past);
235280
}
236281

237-
failure:
238-
elog(WARNING,"failed all tries to set raftable value\n");
239282
pfree(ru);
283+
elog(WARNING,"failed to set raftable value after %d ms",timeout_ms);
240284
return false;
241-
242-
success:
243-
pfree(ru);
244-
return true;
245285
}
246286

247287
Datum
248288
raftable_sql_set(PG_FUNCTION_ARGS)
249289
{
250290
char*key=text_to_cstring(PG_GETARG_TEXT_P(0));
251-
inttries=PG_GETARG_INT32(2);
291+
inttimeout_ms=PG_GETARG_INT32(2);
252292
if (PG_ARGISNULL(1))
253-
raftable_set(key,NULL,tries);
293+
raftable_set(key,NULL,0,timeout_ms);
254294
else
255295
{
256296
char*value=text_to_cstring(PG_GETARG_TEXT_P(1));
257-
raftable_set(key,value,tries);
297+
raftable_set(key,value,strlen(value),timeout_ms);
258298
pfree(value);
259299
}
260300
pfree(key);
261301

262302
PG_RETURN_VOID();
263303
}
264304

265-
voidraftable_every(void (*func)(char*,char*,void*),void*arg)
305+
voidraftable_every(void (*func)(constchar*,constchar*,size_t,void*),void*arg)
266306
{
267307
void*scan;
268308
char*key,*value;
309+
size_tlen;
269310
Assert(shared.state);
270311

271312
scan=state_scan(shared.state);
272-
while (state_next(shared.state,scan,&key,&value))
313+
while (state_next(shared.state,scan,&key,&value,&len))
273314
{
274-
func(key,value,arg);
315+
func(key,value,len,arg);
275316
pfree(key);
276317
pfree(value);
277318
}
@@ -281,6 +322,7 @@ Datum
281322
raftable_sql_list(PG_FUNCTION_ARGS)
282323
{
283324
char*key,*value;
325+
size_tlen;
284326
FuncCallContext*funcctx;
285327
MemoryContextoldcontext;
286328

@@ -309,14 +351,14 @@ raftable_sql_list(PG_FUNCTION_ARGS)
309351

310352
funcctx=SRF_PERCALL_SETUP();
311353

312-
if (state_next(shared.state,funcctx->user_fctx,&key,&value))
354+
if (state_next(shared.state,funcctx->user_fctx,&key,&value,&len))
313355
{
314356
HeapTupletuple;
315357
Datumvals[2];
316358
boolisnull[2];
317359

318-
vals[0]=CStringGetTextDatum(key);
319-
vals[1]=CStringGetTextDatum(value);
360+
vals[0]=PointerGetDatum(cstring_to_text(key));
361+
vals[1]=PointerGetDatum(cstring_to_text_with_len(value,len));
320362
isnull[0]=isnull[1]= false;
321363

322364
tuple=heap_form_tuple(funcctx->tuple_desc,vals,isnull);

‎contrib/raftable/raftable.h‎

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,19 @@
22
#define__RAFTABLE_H__
33

44
/* Gets value by key. Returns the value or NULL if not found. */
5-
char*raftable_get(char*key);
5+
char*raftable_get(constchar*key,size_t*vallen);
66

77
/*
88
* Adds/updates value by key. Returns when the value gets replicated.
9-
* Storing NULL will delete the item from the table.Give up after 'tries'
10-
*tries have failed.
9+
* Storing NULL will delete the item from the table.Gives up after 'timeout_ms'
10+
*milliseconds.
1111
*/
12-
boolraftable_set(char*key,char*value,inttries);
12+
boolraftable_set(constchar*key,constchar*value,size_tvallen,inttimeout_ms);
1313

1414
/*
1515
* Iterates over all items in the table, calling func(key, value, arg)
1616
* for each of them.
1717
*/
18-
voidraftable_every(void (*func)(char*,char*,void*),void*arg);
18+
voidraftable_every(void (*func)(constchar*,constchar*,size_t,void*),void*arg);
1919

2020
#endif

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp