30
30
#include "utils/tqual.h"
31
31
#include "utils/array.h"
32
32
#include "utils/builtins.h"
33
+ #include "utils/memutils.h"
33
34
34
35
#include "libdtm.h"
35
36
@@ -41,41 +42,44 @@ typedef struct
41
42
42
43
#define DTM_SHMEM_SIZE (1024*1024)
43
44
#define DTM_HASH_SIZE 1003
45
+ #define XTM_CONNECT_ATTEMPTS 10
46
+
44
47
45
48
void _PG_init (void );
46
49
void _PG_fini (void );
47
50
48
51
static void DtmEnsureConnection (void );
49
- static Snapshot DtmGetSnapshot (Snapshot snapshot );
50
- static void DtmCopySnapshot (Snapshot dst ,Snapshot src );
52
+ static Snapshot DtmGetSnapshot (void );
53
+ static void DtmMergeSnapshots (Snapshot dst ,Snapshot src );
54
+ static Snapshot DtmCopySnapshot (Snapshot snapshot );
51
55
static XidStatus DtmGetTransactionStatus (TransactionId xid ,XLogRecPtr * lsn );
52
56
static void DtmSetTransactionStatus (TransactionId xid ,int nsubxids ,TransactionId * subxids ,XidStatus status ,XLogRecPtr lsn );
53
57
static void DtmUpdateRecentXmin (void );
54
58
static void DtmInitialize ();
55
59
static void DtmXactCallback (XactEvent event ,void * arg );
56
60
57
- static bool TransactionIdIsInDtmSnapshot (Snapshot s , TransactionId xid );
58
- static bool TransactionIdIsInDoubt (Snapshot s , TransactionId xid );
61
+ static bool TransactionIdIsInDtmSnapshot (TransactionId xid );
62
+ static bool TransactionIdIsInDoubt (TransactionId xid );
59
63
60
64
static void dtm_shmem_startup (void );
61
65
62
66
static shmem_startup_hook_type prev_shmem_startup_hook ;
63
67
static HTAB * xid_in_doubt ;
64
68
static DtmState * dtm ;
65
69
static TransactionId DtmCurrentXid = InvalidTransactionId ;
70
+ static Snapshot CurrentTransactionSnapshot ;
66
71
67
72
static NodeId DtmNodeId ;
68
73
static DTMConn DtmConn ;
69
74
static SnapshotData DtmSnapshot = {HeapTupleSatisfiesMVCC };
70
- static bool DtmHasSnapshot = false ;
75
+ static SnapshotData DtmLocalSnapshot = { HeapTupleSatisfiesMVCC } ;
71
76
static bool DtmGlobalTransaction = false;
72
- static TransactionManager DtmTM = {DtmGetTransactionStatus ,DtmSetTransactionStatus ,DtmGetSnapshot };
77
+ static TransactionManager DtmTM = {DtmGetTransactionStatus ,DtmSetTransactionStatus ,DtmGetSnapshot , DtmCopySnapshot };
73
78
static DTMConn DtmConn ;
74
79
75
80
#define XTM_TRACE (fmt , ...)
76
- //#define XTM_TRACE(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
77
81
#define XTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
78
- #define XTM_CONNECT_ATTEMPTS 10
82
+ // #defineXTM_INFO(fmt, ...)
79
83
80
84
static void DtmEnsureConnection (void )
81
85
{
@@ -113,20 +117,24 @@ static void DumpSnapshot(Snapshot s, char *name)
113
117
XTM_INFO ("%s\n" ,buf );
114
118
}
115
119
116
- static bool TransactionIdIsInDtmSnapshot (Snapshot s , TransactionId xid )
120
+ static bool TransactionIdIsInDtmSnapshot (TransactionId xid )
117
121
{
118
- return xid >=s -> xmax
119
- || bsearch (& xid ,s -> xip ,s -> xcnt ,sizeof (TransactionId ),xidComparator )!= NULL ;
122
+ return xid >=DtmSnapshot . xmax
123
+ || bsearch (& xid ,DtmSnapshot . xip ,DtmSnapshot . xcnt ,sizeof (TransactionId ),xidComparator )!= NULL ;
120
124
}
121
125
122
- static bool TransactionIdIsInDoubt (Snapshot s , TransactionId xid )
126
+ static bool TransactionIdIsInDoubt (TransactionId xid )
123
127
{
124
128
bool inDoubt ;
125
129
126
- if (!TransactionIdIsInDtmSnapshot (s , xid )) {
130
+ if (!TransactionIdIsInDtmSnapshot (xid )) {
127
131
LWLockAcquire (dtm -> lock ,LW_SHARED );
128
132
inDoubt = hash_search (xid_in_doubt ,& xid ,HASH_FIND ,NULL )!= NULL ;
129
133
LWLockRelease (dtm -> lock );
134
+ if (!inDoubt ) {
135
+ XLogRecPtr lsn ;
136
+ inDoubt = CLOGTransactionIdGetStatus (xid ,& lsn )!= TRANSACTION_STATUS_IN_PROGRESS ;
137
+ }
130
138
if (inDoubt ) {
131
139
XTM_INFO ("Wait for transaction %d to complete\n" ,xid );
132
140
XactLockTableWait (xid ,NULL ,NULL ,XLTW_None );
@@ -136,50 +144,47 @@ static bool TransactionIdIsInDoubt(Snapshot s, TransactionId xid)
136
144
return false;
137
145
}
138
146
139
- static void DtmCopySnapshot (Snapshot dst ,Snapshot src )
147
+ static void DtmMergeSnapshots (Snapshot dst ,Snapshot src )
140
148
{
141
149
int i ,j ,n ;
142
- static TransactionId * buf ;
143
150
TransactionId xid ;
144
-
145
- if (buf == NULL ) {
146
- buf = (TransactionId * )malloc (GetMaxSnapshotXidCount ()* sizeof (TransactionId )* 2 );
147
- }
148
-
149
- DumpSnapshot (dst ,"local" );
150
- DumpSnapshot (src ,"DTM" );
151
+ Snapshot local ;
151
152
152
153
Assert (TransactionIdIsValid (src -> xmin )&& TransactionIdIsValid (src -> xmax ));
153
154
154
- /* Check that globall competed transactions are not included in local snapshot */
155
- RefreshLocalSnapshot :
156
- GetLocalSnapshotData (dst );
157
- for (i = 0 ;i < dst -> xcnt ;i ++ ) {
158
- if (TransactionIdIsInDoubt (src ,dst -> xip [i ])) {
159
- gotoRefreshLocalSnapshot ;
160
- }
155
+ GetLocalSnapshot :
156
+ local = GetSnapshotData (& DtmLocalSnapshot );
157
+ for (i = 0 ;i < local -> xcnt ;i ++ ) {
158
+ if (TransactionIdIsInDoubt (local -> xip [i ])) {
159
+ gotoGetLocalSnapshot ;
160
+ }
161
161
}
162
- for (xid = dst -> xmax ;xid < src -> xmax ;xid ++ ) {
163
- if (TransactionIdIsInDoubt (src , xid )) {
164
- gotoRefreshLocalSnapshot ;
162
+ for (xid = local -> xmax ;xid < src -> xmax ;xid ++ ) {
163
+ if (TransactionIdIsInDoubt (xid )) {
164
+ gotoGetLocalSnapshot ;
165
165
}
166
166
}
167
+ DumpSnapshot (local ,"local" );
168
+ DumpSnapshot (src ,"DTM" );
167
169
168
170
/* Merge two snapshots: produce most restrictive snapshots whihc includes running transactions from both of them */
169
- if (dst -> xmin > src -> xmin ) {
170
- dst -> xmin = src -> xmin ;
171
- }
172
- if (dst -> xmax > src -> xmax ) {
173
- dst -> xmax = src -> xmax ;
171
+ dst -> xmin = local -> xmin < src -> xmin ?local -> xmin :src -> xmin ;
172
+ dst -> xmax = local -> xmax < src -> xmax ?local -> xmax :src -> xmax ;
173
+
174
+ n = local -> xcnt ;
175
+ for (xid = local -> xmax ;xid <=src -> xmin ;xid ++ ) {
176
+ local -> xip [n ++ ]= xid ;
174
177
}
178
+ memcpy (local -> xip + n ,src -> xip ,src -> xcnt * sizeof (TransactionId ));
179
+ n += src -> xcnt ;
180
+ Assert (n <=GetMaxSnapshotXidCount ());
175
181
176
- memcpy (buf ,dst -> xip ,dst -> xcnt * sizeof (TransactionId ));
177
- memcpy (buf + dst -> xcnt ,src -> xip ,src -> xcnt * sizeof (TransactionId ));
178
- qsort (buf ,dst -> xcnt + src -> xcnt ,sizeof (TransactionId ),xidComparator );
182
+ qsort (local -> xip ,n ,sizeof (TransactionId ),xidComparator );
179
183
xid = InvalidTransactionId ;
180
- for (i = 0 ,j = 0 ,n = dst -> xcnt + src -> xcnt ;i < n && buf [i ]< dst -> xmax ;i ++ ) {
181
- if (buf [i ]!= xid ) {
182
- dst -> xip [j ++ ]= xid = buf [i ];
184
+
185
+ for (i = 0 ,j = 0 ;i < n && local -> xip [i ]< dst -> xmax ;i ++ ) {
186
+ if (local -> xip [i ]!= xid ) {
187
+ dst -> xip [j ++ ]= xid = local -> xip [i ];
183
188
}
184
189
}
185
190
dst -> xcnt = j ;
@@ -203,28 +208,50 @@ static void DtmUpdateRecentXmin(void)
203
208
if (RecentGlobalXmin > xmin ) {
204
209
RecentGlobalXmin = xmin ;
205
210
}
206
- RecentXmin = xmin ;
211
+ if (RecentXmin > xmin ) {
212
+ RecentXmin = xmin ;
213
+ }
207
214
}
208
215
}
209
216
210
- static Snapshot DtmGetSnapshot (Snapshot snapshot )
217
+ static Snapshot DtmCopySnapshot (Snapshot snapshot )
211
218
{
212
- if (!IsMVCCSnapshot (snapshot )|| snapshot == & CatalogSnapshotData ) {
213
- snapshot = GetLocalSnapshotData (snapshot );
214
- }else {
215
- XTM_TRACE ("XTM: DtmGetSnapshot \n" );
216
- if (DtmGlobalTransaction /* && !DtmHasSnapshot*/ ) {
217
- DtmHasSnapshot = true;
218
- DtmEnsureConnection ();
219
- DtmGlobalGetSnapshot (DtmConn ,DtmNodeId ,GetCurrentTransactionId (),& DtmSnapshot );
220
- }
221
- snapshot = GetLocalSnapshotData (snapshot );
222
- if (DtmHasSnapshot ) {
223
- DtmCopySnapshot (snapshot ,& DtmSnapshot );
224
- DtmUpdateRecentXmin ();
225
- }
219
+ Snapshot newsnap ;
220
+ Size size = sizeof (SnapshotData )+ GetMaxSnapshotXidCount ()* sizeof (TransactionId );
221
+ Size subxipoff = size ;
222
+ if (snapshot -> subxcnt > 0 ) {
223
+ size += snapshot -> subxcnt * sizeof (TransactionId );
226
224
}
227
- return snapshot ;
225
+ newsnap = (Snapshot )MemoryContextAlloc (TopTransactionContext ,size );
226
+ memcpy (newsnap ,snapshot ,sizeof (SnapshotData ));
227
+
228
+ newsnap -> regd_count = 0 ;
229
+ newsnap -> active_count = 0 ;
230
+ newsnap -> copied = true;
231
+
232
+ newsnap -> xip = (TransactionId * ) (newsnap + 1 );
233
+ if (snapshot -> xcnt > 0 )
234
+ {
235
+ memcpy (newsnap -> xip ,snapshot -> xip ,snapshot -> xcnt * sizeof (TransactionId ));
236
+ }
237
+ if (snapshot -> subxcnt > 0 &&
238
+ (!snapshot -> suboverflowed || snapshot -> takenDuringRecovery ))
239
+ {
240
+ newsnap -> subxip = (TransactionId * ) ((char * )newsnap + subxipoff );
241
+ memcpy (newsnap -> subxip ,snapshot -> subxip ,
242
+ snapshot -> subxcnt * sizeof (TransactionId ));
243
+ }
244
+ else
245
+ newsnap -> subxip = NULL ;
246
+
247
+ return newsnap ;
248
+ }
249
+
250
+
251
+ static Snapshot DtmGetSnapshot ()
252
+ {
253
+ CurrentTransactionSnapshot = GetLocalTransactionSnapshot ();
254
+ return CurrentTransactionSnapshot ;
228
255
}
229
256
230
257
@@ -243,22 +270,28 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
243
270
if (DtmGlobalTransaction ) {
244
271
/* Already should be IN_PROGRESS */
245
272
/* CLOGTransactionIdSetTreeStatus(xid, nsubxids, subxids, TRANSACTION_STATUS_IN_PROGRESS, lsn); */
246
- DtmHasSnapshot = false;
247
273
DtmGlobalTransaction = false;
248
- DtmEnsureConnection ();
249
- XTM_INFO ("Begin commit transaction %d\n" ,xid );
250
-
251
- DtmCurrentXid = xid ;
252
- LWLockAcquire (dtm -> lock ,LW_EXCLUSIVE );
253
- hash_search (xid_in_doubt ,& DtmCurrentXid ,HASH_ENTER ,NULL );
254
- LWLockRelease (dtm -> lock );
255
-
256
- if (!DtmGlobalSetTransStatus (DtmConn ,DtmNodeId ,xid ,status , true)&& status != TRANSACTION_STATUS_ABORTED ) {
257
- elog (ERROR ,"DTMD failed to set transaction status" );
274
+ CurrentTransactionSnapshot = NULL ;
275
+ if (status == TRANSACTION_STATUS_ABORTED ) {
276
+ CLOGTransactionIdSetTreeStatus (xid ,nsubxids ,subxids ,status ,lsn );
277
+ DtmEnsureConnection ();
278
+ DtmGlobalSetTransStatus (DtmConn ,DtmNodeId ,xid ,status , false);
279
+ XTM_INFO ("Abort transaction %d\n" ,xid );
280
+ return ;
281
+ }else {
282
+ DtmEnsureConnection ();
283
+ XTM_INFO ("Begin commit transaction %d\n" ,xid );
284
+ DtmCurrentXid = xid ;
285
+ LWLockAcquire (dtm -> lock ,LW_EXCLUSIVE );
286
+ hash_search (xid_in_doubt ,& DtmCurrentXid ,HASH_ENTER ,NULL );
287
+ LWLockRelease (dtm -> lock );
288
+ if (!DtmGlobalSetTransStatus (DtmConn ,DtmNodeId ,xid ,status , true)) {
289
+ elog (ERROR ,"DTMD failed to set transaction status" );
290
+ }
291
+ XTM_INFO ("Commit transaction %d\n" ,xid );
258
292
}
259
- XTM_INFO ("Commit transaction %d\n" ,xid );
260
293
}else {
261
- elog ( WARNING , "Set transaction %u status in local CLOG" ,xid );
294
+ XTM_INFO ( "Set transaction %u status in local CLOG" ,xid );
262
295
}
263
296
}else {
264
297
XidStatus gs ;
@@ -304,6 +337,7 @@ static void DtmInitialize()
304
337
HASH_ELEM |HASH_FUNCTION |HASH_COMPARE );
305
338
306
339
RegisterXactCallback (DtmXactCallback ,NULL );
340
+ DtmInitSnapshot (& DtmLocalSnapshot );
307
341
308
342
TM = & DtmTM ;
309
343
}
@@ -390,22 +424,41 @@ static void dtm_shmem_startup(void)
390
424
PG_MODULE_MAGIC ;
391
425
392
426
PG_FUNCTION_INFO_V1 (dtm_begin_transaction );
427
+ PG_FUNCTION_INFO_V1 (dtm_get_current_snapshot_xmax );
428
+ PG_FUNCTION_INFO_V1 (dtm_get_current_snapshot_xmin );
393
429
394
430
Datum
395
431
dtm_begin_transaction (PG_FUNCTION_ARGS )
396
432
{
397
433
GlobalTransactionId gtid ;
398
434
ArrayType * nodes = PG_GETARG_ARRAYTYPE_P (0 );
399
- ArrayType * xids = PG_GETARG_ARRAYTYPE_P (1 );
435
+ ArrayType * xids = PG_GETARG_ARRAYTYPE_P (1 );
400
436
gtid .xids = (TransactionId * )ARR_DATA_PTR (xids );
401
437
gtid .nodes = (NodeId * )ARR_DATA_PTR (nodes );
402
438
gtid .nNodes = ArrayGetNItems (ARR_NDIM (nodes ),ARR_DIMS (nodes ));
403
439
DtmGlobalTransaction = true;
440
+ Assert (gtid .xids [DtmNodeId ]== GetCurrentTransactionId ());
404
441
XTM_INFO ("Start transaction {%d,%d} at node %d\n" ,gtid .xids [0 ],gtid .xids [1 ],DtmNodeId );
405
- XTM_TRACE ("XTM: dtm_begin_transaction \n" );
406
442
if (DtmNodeId == gtid .nodes [0 ]) {
407
443
DtmEnsureConnection ();
408
444
DtmGlobalStartTransaction (DtmConn ,& gtid );
409
445
}
446
+ DtmEnsureConnection ();
447
+ DtmGlobalGetSnapshot (DtmConn ,DtmNodeId ,gtid .xids [DtmNodeId ],& DtmSnapshot );
448
+ Assert (CurrentTransactionSnapshot != NULL );
449
+ DtmMergeSnapshots (CurrentTransactionSnapshot ,& DtmSnapshot );
450
+ DtmUpdateRecentXmin ();
410
451
PG_RETURN_VOID ();
411
452
}
453
+
454
+ Datum
455
+ dtm_get_current_snapshot_xmin (PG_FUNCTION_ARGS )
456
+ {
457
+ PG_RETURN_INT64 (CurrentTransactionSnapshot -> xmin );
458
+ }
459
+
460
+ Datum
461
+ dtm_get_current_snapshot_xmax (PG_FUNCTION_ARGS )
462
+ {
463
+ PG_RETURN_INT64 (CurrentTransactionSnapshot -> xmax );
464
+ }