Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit523a7bc

Browse files
committed
Fix snapshot merging
1 parentcfe0454 commit523a7bc

File tree

13 files changed

+188
-105
lines changed

13 files changed

+188
-105
lines changed

‎contrib/pg_gtm/dtmd/bin/dtmd‎

5.2 KB
Binary file not shown.

‎contrib/pg_gtm/tests/transfers.go‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@ import (
99
)
1010

1111
const (
12-
TRANSFER_CONNECTIONS=4
12+
TRANSFER_CONNECTIONS=8
1313
INIT_AMOUNT=10000
1414
N_ITERATIONS=10000
15-
N_ACCOUNTS=1//100000
15+
N_ACCOUNTS=8//100000
1616
)
1717

1818

‎contrib/pg_xtm/dtmd/src/main.c‎

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,14 +316,17 @@ static char *onabort(void *stream, void *clientdata, cmd_t *cmd) {
316316
staticvoidgen_snapshot(Snapshot*s,intnode) {
317317
s->nactive=0;
318318
s->xmin=xmax[node];
319-
s->xmax=s->xmin+1;
319+
s->xmax=0;
320320
inti;
321321
for (i=0;i<transactions_count;i++) {
322322
Transaction*t=transactions[i].participants+node;
323323
if (t->active) {
324324
if (t->xid<s->xmin) {
325325
s->xmin=t->xid;
326326
}
327+
if (t->xid >=s->xmax) {
328+
s->xmax=t->xid+1;
329+
}
327330
s->active[s->nactive++]=t->xid;
328331
}
329332
}

‎contrib/pg_xtm/pg_dtm--1.0.sql‎

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,11 @@
44
CREATEFUNCTIONdtm_begin_transaction(nodesinteger[], xidsinteger[]) RETURNS void
55
AS'MODULE_PATHNAME','dtm_begin_transaction'
66
LANGUAGE C;
7+
8+
CREATEFUNCTIONdtm_get_current_snapshot_xmin() RETURNSbigint
9+
AS'MODULE_PATHNAME','dtm_get_current_snapshot_xmin'
10+
LANGUAGE C;
11+
12+
CREATEFUNCTIONdtm_get_current_snapshot_xmax() RETURNSbigint
13+
AS'MODULE_PATHNAME','dtm_get_current_snapshot_xmax'
14+
LANGUAGE C;

‎contrib/pg_xtm/pg_dtm.c‎

Lines changed: 128 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include"utils/tqual.h"
3131
#include"utils/array.h"
3232
#include"utils/builtins.h"
33+
#include"utils/memutils.h"
3334

3435
#include"libdtm.h"
3536

@@ -41,41 +42,44 @@ typedef struct
4142

4243
#defineDTM_SHMEM_SIZE (1024*1024)
4344
#defineDTM_HASH_SIZE 1003
45+
#defineXTM_CONNECT_ATTEMPTS 10
46+
4447

4548
void_PG_init(void);
4649
void_PG_fini(void);
4750

4851
staticvoidDtmEnsureConnection(void);
49-
staticSnapshotDtmGetSnapshot(Snapshotsnapshot);
50-
staticvoidDtmCopySnapshot(Snapshotdst,Snapshotsrc);
52+
staticSnapshotDtmGetSnapshot(void);
53+
staticvoidDtmMergeSnapshots(Snapshotdst,Snapshotsrc);
54+
staticSnapshotDtmCopySnapshot(Snapshotsnapshot);
5155
staticXidStatusDtmGetTransactionStatus(TransactionIdxid,XLogRecPtr*lsn);
5256
staticvoidDtmSetTransactionStatus(TransactionIdxid,intnsubxids,TransactionId*subxids,XidStatusstatus,XLogRecPtrlsn);
5357
staticvoidDtmUpdateRecentXmin(void);
5458
staticvoidDtmInitialize();
5559
staticvoidDtmXactCallback(XactEventevent,void*arg);
5660

57-
staticboolTransactionIdIsInDtmSnapshot(Snapshots,TransactionIdxid);
58-
staticboolTransactionIdIsInDoubt(Snapshots,TransactionIdxid);
61+
staticboolTransactionIdIsInDtmSnapshot(TransactionIdxid);
62+
staticboolTransactionIdIsInDoubt(TransactionIdxid);
5963

6064
staticvoiddtm_shmem_startup(void);
6165

6266
staticshmem_startup_hook_typeprev_shmem_startup_hook;
6367
staticHTAB*xid_in_doubt;
6468
staticDtmState*dtm;
6569
staticTransactionIdDtmCurrentXid=InvalidTransactionId;
70+
staticSnapshotCurrentTransactionSnapshot;
6671

6772
staticNodeIdDtmNodeId;
6873
staticDTMConnDtmConn;
6974
staticSnapshotDataDtmSnapshot= {HeapTupleSatisfiesMVCC };
70-
staticboolDtmHasSnapshot=false;
75+
staticSnapshotDataDtmLocalSnapshot={HeapTupleSatisfiesMVCC };
7176
staticboolDtmGlobalTransaction= false;
72-
staticTransactionManagerDtmTM= {DtmGetTransactionStatus,DtmSetTransactionStatus,DtmGetSnapshot };
77+
staticTransactionManagerDtmTM= {DtmGetTransactionStatus,DtmSetTransactionStatus,DtmGetSnapshot,DtmCopySnapshot };
7378
staticDTMConnDtmConn;
7479

7580
#defineXTM_TRACE(fmt, ...)
76-
//#define XTM_TRACE(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
7781
#define XTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
78-
#defineXTM_CONNECT_ATTEMPTS 10
82+
//#defineXTM_INFO(fmt, ...)
7983

8084
staticvoidDtmEnsureConnection(void)
8185
{
@@ -113,20 +117,24 @@ static void DumpSnapshot(Snapshot s, char *name)
113117
XTM_INFO("%s\n",buf);
114118
}
115119

116-
staticboolTransactionIdIsInDtmSnapshot(Snapshots,TransactionIdxid)
120+
staticboolTransactionIdIsInDtmSnapshot(TransactionIdxid)
117121
{
118-
returnxid >=s->xmax
119-
||bsearch(&xid,s->xip,s->xcnt,sizeof(TransactionId),xidComparator)!=NULL;
122+
returnxid >=DtmSnapshot.xmax
123+
||bsearch(&xid,DtmSnapshot.xip,DtmSnapshot.xcnt,sizeof(TransactionId),xidComparator)!=NULL;
120124
}
121125

122-
staticboolTransactionIdIsInDoubt(Snapshots,TransactionIdxid)
126+
staticboolTransactionIdIsInDoubt(TransactionIdxid)
123127
{
124128
boolinDoubt;
125129

126-
if (!TransactionIdIsInDtmSnapshot(s,xid)) {
130+
if (!TransactionIdIsInDtmSnapshot(xid)) {
127131
LWLockAcquire(dtm->lock,LW_SHARED);
128132
inDoubt=hash_search(xid_in_doubt,&xid,HASH_FIND,NULL)!=NULL;
129133
LWLockRelease(dtm->lock);
134+
if (!inDoubt) {
135+
XLogRecPtrlsn;
136+
inDoubt=CLOGTransactionIdGetStatus(xid,&lsn)!=TRANSACTION_STATUS_IN_PROGRESS;
137+
}
130138
if (inDoubt) {
131139
XTM_INFO("Wait for transaction %d to complete\n",xid);
132140
XactLockTableWait(xid,NULL,NULL,XLTW_None);
@@ -136,50 +144,47 @@ static bool TransactionIdIsInDoubt(Snapshot s, TransactionId xid)
136144
return false;
137145
}
138146

139-
staticvoidDtmCopySnapshot(Snapshotdst,Snapshotsrc)
147+
staticvoidDtmMergeSnapshots(Snapshotdst,Snapshotsrc)
140148
{
141149
inti,j,n;
142-
staticTransactionId*buf;
143150
TransactionIdxid;
144-
145-
if (buf==NULL) {
146-
buf= (TransactionId*)malloc(GetMaxSnapshotXidCount()*sizeof(TransactionId)*2);
147-
}
148-
149-
DumpSnapshot(dst,"local");
150-
DumpSnapshot(src,"DTM");
151+
Snapshotlocal;
151152

152153
Assert(TransactionIdIsValid(src->xmin)&&TransactionIdIsValid(src->xmax));
153154

154-
/* Check that globall competed transactions are not included in local snapshot */
155-
RefreshLocalSnapshot:
156-
GetLocalSnapshotData(dst);
157-
for (i=0;i<dst->xcnt;i++) {
158-
if (TransactionIdIsInDoubt(src,dst->xip[i])) {
159-
gotoRefreshLocalSnapshot;
160-
}
155+
GetLocalSnapshot:
156+
local=GetSnapshotData(&DtmLocalSnapshot);
157+
for (i=0;i<local->xcnt;i++) {
158+
if (TransactionIdIsInDoubt(local->xip[i])) {
159+
gotoGetLocalSnapshot;
160+
}
161161
}
162-
for (xid=dst->xmax;xid<src->xmax;xid++) {
163-
if (TransactionIdIsInDoubt(src,xid)) {
164-
gotoRefreshLocalSnapshot;
162+
for (xid=local->xmax;xid<src->xmax;xid++) {
163+
if (TransactionIdIsInDoubt(xid)) {
164+
gotoGetLocalSnapshot;
165165
}
166166
}
167+
DumpSnapshot(local,"local");
168+
DumpSnapshot(src,"DTM");
167169

168170
/* Merge two snapshots: produce most restrictive snapshots whihc includes running transactions from both of them */
169-
if (dst->xmin>src->xmin) {
170-
dst->xmin=src->xmin;
171-
}
172-
if (dst->xmax>src->xmax) {
173-
dst->xmax=src->xmax;
171+
dst->xmin=local->xmin<src->xmin ?local->xmin :src->xmin;
172+
dst->xmax=local->xmax<src->xmax ?local->xmax :src->xmax;
173+
174+
n=local->xcnt;
175+
for (xid=local->xmax;xid <=src->xmin;xid++) {
176+
local->xip[n++]=xid;
174177
}
178+
memcpy(local->xip+n,src->xip,src->xcnt*sizeof(TransactionId));
179+
n+=src->xcnt;
180+
Assert(n <=GetMaxSnapshotXidCount());
175181

176-
memcpy(buf,dst->xip,dst->xcnt*sizeof(TransactionId));
177-
memcpy(buf+dst->xcnt,src->xip,src->xcnt*sizeof(TransactionId));
178-
qsort(buf,dst->xcnt+src->xcnt,sizeof(TransactionId),xidComparator);
182+
qsort(local->xip,n,sizeof(TransactionId),xidComparator);
179183
xid=InvalidTransactionId;
180-
for (i=0,j=0,n=dst->xcnt+src->xcnt;i<n&&buf[i]<dst->xmax;i++) {
181-
if (buf[i]!=xid) {
182-
dst->xip[j++]=xid=buf[i];
184+
185+
for (i=0,j=0;i<n&&local->xip[i]<dst->xmax;i++) {
186+
if (local->xip[i]!=xid) {
187+
dst->xip[j++]=xid=local->xip[i];
183188
}
184189
}
185190
dst->xcnt=j;
@@ -203,28 +208,50 @@ static void DtmUpdateRecentXmin(void)
203208
if (RecentGlobalXmin>xmin) {
204209
RecentGlobalXmin=xmin;
205210
}
206-
RecentXmin=xmin;
211+
if (RecentXmin>xmin) {
212+
RecentXmin=xmin;
213+
}
207214
}
208215
}
209216

210-
staticSnapshotDtmGetSnapshot(Snapshotsnapshot)
217+
staticSnapshotDtmCopySnapshot(Snapshotsnapshot)
211218
{
212-
if (!IsMVCCSnapshot(snapshot)||snapshot==&CatalogSnapshotData) {
213-
snapshot=GetLocalSnapshotData(snapshot);
214-
}else {
215-
XTM_TRACE("XTM: DtmGetSnapshot \n");
216-
if (DtmGlobalTransaction/* && !DtmHasSnapshot*/) {
217-
DtmHasSnapshot= true;
218-
DtmEnsureConnection();
219-
DtmGlobalGetSnapshot(DtmConn,DtmNodeId,GetCurrentTransactionId(),&DtmSnapshot);
220-
}
221-
snapshot=GetLocalSnapshotData(snapshot);
222-
if (DtmHasSnapshot) {
223-
DtmCopySnapshot(snapshot,&DtmSnapshot);
224-
DtmUpdateRecentXmin();
225-
}
219+
Snapshotnewsnap;
220+
Sizesize=sizeof(SnapshotData)+GetMaxSnapshotXidCount()*sizeof(TransactionId);
221+
Sizesubxipoff=size;
222+
if (snapshot->subxcnt>0) {
223+
size+=snapshot->subxcnt*sizeof(TransactionId);
226224
}
227-
returnsnapshot;
225+
newsnap= (Snapshot)MemoryContextAlloc(TopTransactionContext,size);
226+
memcpy(newsnap,snapshot,sizeof(SnapshotData));
227+
228+
newsnap->regd_count=0;
229+
newsnap->active_count=0;
230+
newsnap->copied= true;
231+
232+
newsnap->xip= (TransactionId*) (newsnap+1);
233+
if (snapshot->xcnt>0)
234+
{
235+
memcpy(newsnap->xip,snapshot->xip,snapshot->xcnt*sizeof(TransactionId));
236+
}
237+
if (snapshot->subxcnt>0&&
238+
(!snapshot->suboverflowed||snapshot->takenDuringRecovery))
239+
{
240+
newsnap->subxip= (TransactionId*) ((char*)newsnap+subxipoff);
241+
memcpy(newsnap->subxip,snapshot->subxip,
242+
snapshot->subxcnt*sizeof(TransactionId));
243+
}
244+
else
245+
newsnap->subxip=NULL;
246+
247+
returnnewsnap;
248+
}
249+
250+
251+
staticSnapshotDtmGetSnapshot()
252+
{
253+
CurrentTransactionSnapshot=GetLocalTransactionSnapshot();
254+
returnCurrentTransactionSnapshot;
228255
}
229256

230257

@@ -243,22 +270,28 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
243270
if (DtmGlobalTransaction) {
244271
/* Already should be IN_PROGRESS */
245272
/* CLOGTransactionIdSetTreeStatus(xid, nsubxids, subxids, TRANSACTION_STATUS_IN_PROGRESS, lsn); */
246-
DtmHasSnapshot= false;
247273
DtmGlobalTransaction= false;
248-
DtmEnsureConnection();
249-
XTM_INFO("Begin commit transaction %d\n",xid);
250-
251-
DtmCurrentXid=xid;
252-
LWLockAcquire(dtm->lock,LW_EXCLUSIVE);
253-
hash_search(xid_in_doubt,&DtmCurrentXid,HASH_ENTER,NULL);
254-
LWLockRelease(dtm->lock);
255-
256-
if (!DtmGlobalSetTransStatus(DtmConn,DtmNodeId,xid,status, true)&&status!=TRANSACTION_STATUS_ABORTED) {
257-
elog(ERROR,"DTMD failed to set transaction status");
274+
CurrentTransactionSnapshot=NULL;
275+
if (status==TRANSACTION_STATUS_ABORTED) {
276+
CLOGTransactionIdSetTreeStatus(xid,nsubxids,subxids,status,lsn);
277+
DtmEnsureConnection();
278+
DtmGlobalSetTransStatus(DtmConn,DtmNodeId,xid,status, false);
279+
XTM_INFO("Abort transaction %d\n",xid);
280+
return;
281+
}else {
282+
DtmEnsureConnection();
283+
XTM_INFO("Begin commit transaction %d\n",xid);
284+
DtmCurrentXid=xid;
285+
LWLockAcquire(dtm->lock,LW_EXCLUSIVE);
286+
hash_search(xid_in_doubt,&DtmCurrentXid,HASH_ENTER,NULL);
287+
LWLockRelease(dtm->lock);
288+
if (!DtmGlobalSetTransStatus(DtmConn,DtmNodeId,xid,status, true)) {
289+
elog(ERROR,"DTMD failed to set transaction status");
290+
}
291+
XTM_INFO("Commit transaction %d\n",xid);
258292
}
259-
XTM_INFO("Commit transaction %d\n",xid);
260293
}else {
261-
elog(WARNING,"Set transaction %u status in local CLOG" ,xid);
294+
XTM_INFO("Set transaction %u status in local CLOG" ,xid);
262295
}
263296
}else {
264297
XidStatusgs;
@@ -304,6 +337,7 @@ static void DtmInitialize()
304337
HASH_ELEM |HASH_FUNCTION |HASH_COMPARE);
305338

306339
RegisterXactCallback(DtmXactCallback,NULL);
340+
DtmInitSnapshot(&DtmLocalSnapshot);
307341

308342
TM=&DtmTM;
309343
}
@@ -390,22 +424,41 @@ static void dtm_shmem_startup(void)
390424
PG_MODULE_MAGIC;
391425

392426
PG_FUNCTION_INFO_V1(dtm_begin_transaction);
427+
PG_FUNCTION_INFO_V1(dtm_get_current_snapshot_xmax);
428+
PG_FUNCTION_INFO_V1(dtm_get_current_snapshot_xmin);
393429

394430
Datum
395431
dtm_begin_transaction(PG_FUNCTION_ARGS)
396432
{
397433
GlobalTransactionIdgtid;
398434
ArrayType*nodes=PG_GETARG_ARRAYTYPE_P(0);
399-
ArrayType*xids=PG_GETARG_ARRAYTYPE_P(1);
435+
ArrayType*xids=PG_GETARG_ARRAYTYPE_P(1);
400436
gtid.xids= (TransactionId*)ARR_DATA_PTR(xids);
401437
gtid.nodes= (NodeId*)ARR_DATA_PTR(nodes);
402438
gtid.nNodes=ArrayGetNItems(ARR_NDIM(nodes),ARR_DIMS(nodes));
403439
DtmGlobalTransaction= true;
440+
Assert(gtid.xids[DtmNodeId]==GetCurrentTransactionId());
404441
XTM_INFO("Start transaction {%d,%d} at node %d\n",gtid.xids[0],gtid.xids[1],DtmNodeId);
405-
XTM_TRACE("XTM: dtm_begin_transaction \n");
406442
if (DtmNodeId==gtid.nodes[0]) {
407443
DtmEnsureConnection();
408444
DtmGlobalStartTransaction(DtmConn,&gtid);
409445
}
446+
DtmEnsureConnection();
447+
DtmGlobalGetSnapshot(DtmConn,DtmNodeId,gtid.xids[DtmNodeId],&DtmSnapshot);
448+
Assert(CurrentTransactionSnapshot!=NULL);
449+
DtmMergeSnapshots(CurrentTransactionSnapshot,&DtmSnapshot);
450+
DtmUpdateRecentXmin();
410451
PG_RETURN_VOID();
411452
}
453+
454+
Datum
455+
dtm_get_current_snapshot_xmin(PG_FUNCTION_ARGS)
456+
{
457+
PG_RETURN_INT64(CurrentTransactionSnapshot->xmin);
458+
}
459+
460+
Datum
461+
dtm_get_current_snapshot_xmax(PG_FUNCTION_ARGS)
462+
{
463+
PG_RETURN_INT64(CurrentTransactionSnapshot->xmax);
464+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp