Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit9a76754

Browse files
committed
Implement DTM with local range reserving and common XIDs.
1 parent3b14aea commit9a76754

File tree

10 files changed

+442
-490
lines changed

10 files changed

+442
-490
lines changed

‎contrib/pg_xtm/README‎

Lines changed: 59 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -36,40 +36,25 @@ dtm_get_snapshot() RETURNS void
3636
libdtm api
3737
----------
3838

39-
typedef unsigned long long xid_t;
39+
void DtmInitSnapshot(Snapshot snapshot);
4040

41-
typedef int NodeId;
41+
// Starts new global transaction
42+
TransactionId DtmGlobalStartTransaction(int nParticipants, Snapshot shaposhot);
4243

43-
typedef struct {
44-
TransactionId* xids;
45-
NodeId* nodes;
46-
int nNodes;
47-
} GlobalTransactionId;
48-
49-
// Connects to the specified DTM.
50-
DTMConn DtmConnect(char *host, int port);
51-
52-
// Disconnects from the DTM. Do not use the 'dtm' pointer after this call, or
53-
// bad things will happen.
54-
void DtmDisconnect(DTMConn dtm);
55-
56-
// Creates an entry for a new global transaction. Returns 'true' on success, or
57-
// 'false' otherwise.
58-
bool DtmGlobalStartTransaction(DTMConn dtm, GlobalTransactionId* gtid);
59-
60-
// Asks DTM for a fresh snapshot. Returns 'true' on success, or 'false'
61-
// otherwise.
62-
bool DtmGlobalGetSnapshot(DTMConn dtm, NodeId nodeid, TransactionId xid, Snapshot snapshot);
44+
// Get existed DTM snapshot.
45+
void DtmGlobalGetSnapshot(TransactionId xid, Snapshot snapshot);
6346

6447
// Commits transaction only once all participants have called this function,
65-
// does not change CLOG otherwise. Returns 'true' on success, 'false' if
66-
// something failed on the daemon side.
67-
bool DtmGlobalSetTransStatus(DTMConn dtm, NodeId nodeid, TransactionId xid, XidStatus status);
48+
// does not change CLOG otherwise.
49+
void DtmGlobalSetTransStatus(TransactionId xid, XidStatus status, bool wait);
6850

6951
// Gets the status of the transaction identified by 'xid'. Returns the status
7052
// on success, or -1 otherwise. If 'wait' is true, then it does not return
7153
// until the transaction is finished.
72-
XidStatus DtmGlobalGetTransStatus(DTMConn dtm, NodeId nodeid, TransactionId xid, bool wait);
54+
XidStatus DtmGlobalGetTransStatus(TransactionId xid, bool wait);
55+
56+
// Reserve XIDs for local transaction
57+
TransactioinId DtmGlobalReserve(int nXids);
7358

7459
--------------------
7560
Backend-DTM Protocol
@@ -85,56 +70,63 @@ The queries from backend to DTM should be formatted according to this syntax.
8570

8671
The commands:
8772

88-
'b': begin(size, node0, xid0, node1, xid1, ...)
89-
Starts a global transaction using 'xid0' on 'node0', 'xid1' on 'node1'
90-
and so on. The 'size' is the number of nodes, so for example if 'size'
91-
== 3 there are 6 values expected after it.
92-
93-
The DTM replies with '+' if transaction started, or '-' if failed.
73+
'r': reserve(minxid, minsize)
74+
Claims a sequence ≥ minsize of xids ≥ minxid for local usage. This will
75+
prevent DTM from using those values for global transactions.
9476

95-
'c': commit(node, xid, wait)
96-
Tells the DTM to vote for commit of the global transaction identified
97-
by the given 'node:xid' pair.
77+
The DTM replies with:
78+
'+'<hex16 min><hex16 max> if reserved a range [min, max]
79+
'-' on failure
9880

99-
The DTM replies with '+' if committed, or '-' if aborted or failed.
81+
'b': begin(size)
82+
Starts a global transaction and assign a 'xid' to it. 'size' is used
83+
for vote results calculation. The DTM also creates and returns the
84+
snapshot.
85+
86+
The DTM replies with:
87+
'+'<hex16 xid><snapshot> if transaction started successfully
88+
'-' on failure
89+
90+
See the 'snapshot' command description for the snapshot format.
91+
92+
's': status(xid, wait)
93+
Asks the DTM about the status of the global transaction identified
94+
by the given 'xid'.
10095

10196
If 'wait' is true, DTM will not reply until it considers the
102-
transaction finished (all nodes committed, or at least one aborted).
97+
transaction finished (all nodes voted, or one dead).
98+
99+
The DTM replies with:
100+
"+0" if not started
101+
"+c" if committed
102+
"+a" if aborted
103+
"+?" if in progress
104+
'-' if failed
105+
106+
'y': for(xid, wait)
107+
Tells the DTM to vote for commit of the global transaction identified
108+
by the given 'xid'.
109+
110+
The reply and 'wait' logic is the same as for the 'status' command.
103111

104-
'a':abort(node, xid)
112+
'n':against(xid, wait)
105113
Tells the DTM to vote againts commit of the global transaction
106-
identified by the given 'node:xid' pair. This query not have the 'wait'
107-
parameter, because the DTM will not wait for all votes if one is
108-
against the commit.
109-
110-
The DTM replies with '+' if aborted, or '-' if failed. The backend
111-
probably should ignore this reply anyway :)
112-
113-
'h': snapshot(node, xid)
114-
Tells the DTM to give a snapshot for the global transaction identified
115-
by the given 'node:xid' pair. The DTM will create a snapshot for every
116-
participant, so when they ask for the snapshot it will reply with the
117-
"same" snapshot. When a node asks for a snapshot once again, the DTM
118-
generates a fresh version for every participant. So be careful not to
119-
ask for a snapshot from the same node the second time, until all other
120-
nodes also ask for that snapshot.
114+
identified by the given 'xid'.
121115

122-
TheDTM replies with '+' followed by a snapshot intheform:
116+
Thereply and 'wait' logic is the same as forthe'status' command.
123117

124-
<hex16 xmin><hex16 xmax><hex16 xcnt><hex16 xip[0]>...
118+
'h': snapshot(xid)
119+
Tells the DTM to generate a snapshot for the global transaction
120+
identified by the given 'xid'. The DTM will create a snapshot for every
121+
participant, so when each of them asks for the snapshot it will reply
122+
with the same snapshot. The DTM generates a fresh version if the same
123+
client asks for a snapshot again for the same transaction.
125124

126-
In case of a failure, the DTM replies with '-'.
125+
Joins the global transaction identified by the given 'xid', if not
126+
joined already.
127127

128-
's': status(node, xid, wait)
129-
Asks the DTM about the status of the global transaction identified
130-
by the given 'node:xid' pair.
128+
The DTM replies with '+' followed by a snapshot in the form:
131129

132-
The DTM replies with:
133-
"+0" if not started;
134-
"+c" if committed;
135-
"+a" if aborted;
136-
"+?" if in progress;
137-
'-' if failed.
130+
<hex16 xmin><hex16 xmax><hex16 xcnt><hex16 xip[0]>...
138131

139-
If 'wait' is true, DTM will not reply until it considers the
140-
transaction finished (all nodes committed, or at least one aborted).
132+
In case of a failure, the DTM replies with '-'.

‎contrib/pg_xtm/dtmd/include/limits.h‎

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,4 @@
77
#defineBITS_PER_NODE 4
88
#defineMAX_NODES (1 << BITS_PER_NODE)
99

10-
#defineMUX_XID(NODE,XID) (((XID) << (BITS_PER_NODE)) + NODE)
11-
1210
#endif

‎contrib/pg_xtm/dtmd/include/parser.h‎

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@
66
#include"int.h"
77
#include"limits.h"
88

9+
#defineCMD_RESERVE 'r'
910
#defineCMD_BEGIN 'b'
10-
#defineCMD_COMMIT'c'
11-
#defineCMD_ABORT 'a'
11+
#defineCMD_FOR 'y'
12+
#defineCMD_AGAINST 'n'
1213
#defineCMD_SNAPSHOT 'h'
1314
#defineCMD_STATUS 's'
1415

‎contrib/pg_xtm/dtmd/include/snapshot.h‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ typedef struct Snapshot {
99
xid_txmax;
1010
intnactive;
1111
xid_tactive[MAX_TRANSACTIONS];
12+
inttimes_sent;
1213
}Snapshot;
1314

1415
char*snapshot_serialize(Snapshot*s);

‎contrib/pg_xtm/dtmd/include/transaction.h‎

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,32 +9,29 @@
99

1010
#defineMAX_SNAPSHOTS_PER_TRANS 8
1111

12+
#defineCHAR_TO_INDEX(C) ((C) - 'a')
13+
1214
typedefstructTransaction {
13-
// true if the transaction was started on the node
14-
boolactive;
15+
xid_txid;
1516

16-
intclient_id;
17-
intnode;
18-
intvote;
17+
intsize;
1918

20-
xid_txid;
21-
Snapshotsnapshot[MAX_SNAPSHOTS_PER_TRANS];
19+
// for + against ≤ size
20+
intvotes_for;
21+
intvotes_against;
2222

23-
// if this is equal to seqno, we need to generate a new snapshot (for each node)
24-
intsnapshot_no;
25-
}Transaction;
23+
Snapshotsnapshots[MAX_SNAPSHOTS_PER_TRANS];
24+
intsnapshots_count;// will wrap around if exceeds max snapshots
2625

27-
#defineCHAR_TO_INDEX(C) ((C) - 'a')
28-
typedefstructGlobalTransaction {
29-
intn_snapshots;
30-
Transactionparticipants[MAX_NODES];
3126
void*listeners[CHAR_TO_INDEX('z')];// we are going to use 'a' to 'z' for indexing
32-
}GlobalTransaction;
27+
}Transaction;
3328

34-
intglobal_transaction_status(GlobalTransaction*gt);
35-
boolglobal_transaction_mark(clog_tclg,GlobalTransaction*gt,intstatus);
36-
voidglobal_transaction_clear(GlobalTransaction*gt);
37-
voidglobal_transaction_push_listener(GlobalTransaction*gt,charcmd,void*listener);
38-
void*global_transaction_pop_listener(GlobalTransaction*gt,charcmd);
29+
Snapshot*transaction_latest_snapshot(Transaction*t);
30+
Snapshot*transaction_snapshot(Transaction*t,intsnapno);
31+
inttransaction_status(Transaction*t);
32+
voidtransaction_clear(Transaction*t);
33+
voidtransaction_push_listener(Transaction*t,charcmd,void*listener);
34+
void*transaction_pop_listener(Transaction*t,charcmd);
35+
booltransaction_participate(Transaction*t,intclientid);
3936

4037
#endif

‎contrib/pg_xtm/dtmd/include/util.h‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
char*join_path(constchar*dir,constchar*file);
1616
boolinrange(xid_tmin,xid_tx,xid_tmax);
1717
intfalloc(intfd,off64_tsize);
18+
char*destructive_concat(char*a,char*b);
1819

1920
#ifndefDEBUG
2021
#defineshout(...)

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp