Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit6096224

Browse files
committed
Add recovery of the proper value for prev_/next_gxid in DTMD.
1 parenta4ca34c commit6096224

File tree

3 files changed

+62
-23
lines changed

3 files changed

+62
-23
lines changed

‎contrib/pg_dtm/dtmd/include/raft.h‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,6 @@ void raft_tick(raft_t *r, int msec);
140140
voidraft_handle_message(raft_t*r,raft_msg_t*m);
141141
raft_msg_t*raft_recv_message(raft_t*r);
142142
intraft_create_udp_socket(raft_t*r);
143-
voidraft_start_next_term(raft_t*r);
143+
voidraft_ensure_term(raft_t*r,intterm);
144144

145145
#endif

‎contrib/pg_dtm/dtmd/src/main.c‎

Lines changed: 57 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,6 @@ Transaction* transaction_hash[MAX_TRANSACTIONS];
3131
// reserve something in (next, x) range otherwise, moving 'next' after 'x'.
3232
xid_tprev_gxid,next_gxid;
3333

34-
xid_tthreshold_gxid;// when to start worrying about starting a new term
35-
xid_tlast_gxid;// the greatest gxid we can provide on BEGIN or RESERVE
36-
3734
xid_tglobal_xmin=INVALID_XID;
3835

3936
staticTransaction*find_transaction(xid_txid) {
@@ -262,14 +259,41 @@ static void onhello(client_t client, int argc, xid_t *argv) {
262259
}
263260
}
264261

262+
// the greatest gxid we can provide on BEGIN or RESERVE
263+
staticxid_tlast_xid_in_term() {
264+
returnraft.term*XIDS_PER_TERM-1;
265+
}
266+
267+
staticxid_tfirst_xid_in_term() {
268+
return (raft.term-1)*XIDS_PER_TERM;
269+
}
270+
271+
staticintxid2term(xid_txid) {
272+
intterm=xid /XIDS_PER_TERM+1;
273+
returnterm;
274+
}
275+
276+
// when to start worrying about starting a new term
277+
staticxid_tget_threshold_xid() {
278+
returnlast_xid_in_term()-NEW_TERM_THRESHOLD;
279+
}
280+
281+
staticboolxid_is_safe(xid_txid) {
282+
returnxid <=last_xid_in_term();
283+
}
284+
285+
staticboolxid_is_disturbing(xid_txid) {
286+
returninrange(next_gxid+1,get_threshold_xid(),xid);
287+
}
288+
265289
staticvoidset_next_gxid(xid_tvalue) {
266290
assert(next_gxid<value);// The value should only grow.
267291

268292
if (use_raft&&raft.role==ROLE_LEADER) {
269-
assert(value <=last_gxid);
270-
if (inrange(next_gxid+1,threshold_gxid,value)) {
293+
assert(xid_is_safe(value));
294+
if (xid_is_disturbing(value)) {
271295
// Time to worry has come.
272-
raft_start_next_term(&raft);
296+
raft_ensure_term(&raft,xid2term(value));
273297
}else {
274298
// It is either too early to worry,
275299
// or we have already increased the term.
@@ -293,6 +317,15 @@ static void set_next_gxid(xid_t value) {
293317
next_gxid=value;
294318
}
295319

320+
staticbooluse_xid(xid_txid) {
321+
if (!xid_is_safe(xid)) {
322+
return false;
323+
}
324+
shout("setting next_gxid to %u\n",xid+1);
325+
set_next_gxid(xid+1);
326+
return true;
327+
}
328+
296329
staticvoidonreserve(client_tclient,intargc,xid_t*argv) {
297330
CHECK(argc==3,client,"RESERVE: wrong number of arguments");
298331

@@ -317,11 +350,10 @@ static void onreserve(client_t client, int argc, xid_t *argv) {
317350
minxid=max_of_xids(minxid,next_gxid);
318351
maxxid=max_of_xids(maxxid,minxid+minsize-1);
319352
CHECK(
320-
maxxid <=last_gxid,
353+
use_xid(maxxid),
321354
client,
322355
"not enough xids left in this term"
323356
);
324-
set_next_gxid(maxxid+1);
325357
}
326358
debug(
327359
"[%d] RESERVE: allocating range %u-%u\n",
@@ -371,13 +403,13 @@ static void onbegin(client_t client, int argc, xid_t *argv) {
371403
transaction_clear(t);
372404
l2_list_link(&active_transactions,&t->elem);
373405

406+
t->xid=next_gxid;
374407
CHECK(
375-
next_gxid <=last_gxid,
408+
use_xid(next_gxid),
376409
client,
377410
"not enought xids left in this term"
378411
);
379-
prev_gxid=t->xid=next_gxid;
380-
set_next_gxid(next_gxid+1);
412+
prev_gxid=t->xid;
381413
t->snapshots_count=0;
382414
t->size=1;
383415

@@ -865,9 +897,16 @@ int main(int argc, char **argv) {
865897

866898
next_gxid=MIN_XID;
867899
clg=clog_open(datadir);
868-
set_next_gxid(clog_find_last_used(clg)+1);
900+
901+
xid_tlast_used_xid=clog_find_last_used(clg);
902+
shout("will use %u\n",last_used_xid);
903+
if (!use_xid(last_used_xid)) {
904+
shout("could not set last used xid to %u\n",last_used_xid);
905+
returnEXIT_FAILURE;
906+
}
907+
raft.term=xid2term(next_gxid);
908+
869909
prev_gxid=next_gxid-1;
870-
last_gxid=INVALID_XID;
871910
debug("initial next_gxid = %u\n",next_gxid);
872911
if (!clg) {
873912
shout("could not open clog at '%s'\n",datadir);
@@ -906,6 +945,7 @@ int main(int argc, char **argv) {
906945

907946
mstimer_tt;
908947
mstimer_reset(&t);
948+
intold_term=0;
909949
while (true) {
910950
intms=mstimer_reset(&t);
911951
raft_msg_t*m=NULL;
@@ -933,19 +973,16 @@ int main(int argc, char **argv) {
933973
server_set_enabled(server,raft.role==ROLE_LEADER);
934974

935975
// Update the gxid limits based on current term and leadership.
936-
xid_trecent_last_gxid=raft.term*XIDS_PER_TERM;
937-
if (last_gxid<recent_last_gxid) {
938-
shout("updating last_gxid from %u to %u\n",last_gxid,recent_last_gxid);
939-
last_gxid=recent_last_gxid;
940-
threshold_gxid=last_gxid-NEW_TERM_THRESHOLD;
976+
if (old_term<raft.term) {
941977
if (raft.role==ROLE_FOLLOWER) {
942978
// If we become a leader, we will use
943979
// the range of xids after the current
944980
// last_gxid.
945-
prev_gxid=last_gxid;
946-
next_gxid=prev_gxid+1;
981+
prev_gxid=last_xid_in_term();
982+
set_next_gxid(prev_gxid+1);
947983
shout("updated range to %u-%u\n",prev_gxid,next_gxid);
948984
}
985+
old_term=raft.term;
949986
}
950987
}else {
951988
server_set_enabled(server, true);

‎contrib/pg_dtm/dtmd/src/raft.c‎

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -565,9 +565,11 @@ static void raft_set_term(raft_t *r, int term) {
565565
r->votes=0;
566566
}
567567

568-
voidraft_start_next_term(raft_t*r) {
568+
voidraft_ensure_term(raft_t*r,intterm) {
569569
assert(r->role==ROLE_LEADER);
570-
r->term++;
570+
if (term>r->term) {
571+
r->term=term;
572+
}
571573
}
572574

573575
staticvoidraft_handle_claim(raft_t*r,raft_msg_claim_t*m) {

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp