Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit8a7dfb1

Browse files
committed
Use hash table for checking in-doubt transactions
1 parent0a03a1b commit8a7dfb1

File tree

3 files changed

+118
-15
lines changed

3 files changed

+118
-15
lines changed

‎contrib/pg_xtm/libdtm.c‎

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,13 +91,12 @@ DTMConn DtmConnect(char *host, int port) {
9191

9292
for (a=addrs;a!=NULL;a=a->ai_next) {
9393
DTMConndtm;
94+
intone=1;
9495
intsock=socket(a->ai_family,a->ai_socktype,a->ai_protocol);
9596
if (sock==-1) {
9697
perror("failed to create a socket");
9798
continue;
9899
}
99-
100-
intone=1;
101100
setsockopt(sock,IPPROTO_TCP,TCP_NODELAY,&one,sizeof(one));
102101

103102
if (connect(sock,a->ai_addr,a->ai_addrlen)==-1) {

‎contrib/pg_xtm/pg_dtm.c‎

Lines changed: 114 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,14 @@
3333

3434
#include"libdtm.h"
3535

36-
#defineMIN_DELAY 10000
37-
#defineMAX_DELAY 100000
36+
typedefstruct
37+
{
38+
LWLockIdlock;/* protect access to hash table */
39+
}DtmState;
40+
41+
42+
#defineDTM_SHMEM_SIZE (1024*1024)
43+
#defineDTM_HASH_SIZE 1003
3844

3945
void_PG_init(void);
4046
void_PG_fini(void);
@@ -45,9 +51,19 @@ static void DtmCopySnapshot(Snapshot dst, Snapshot src);
4551
staticXidStatusDtmGetTransactionStatus(TransactionIdxid,XLogRecPtr*lsn);
4652
staticvoidDtmSetTransactionStatus(TransactionIdxid,intnsubxids,TransactionId*subxids,XidStatusstatus,XLogRecPtrlsn);
4753
staticvoidDtmUpdateRecentXmin(void);
54+
staticvoidDtmInitialize();
55+
staticvoidDtmXactCallback(XactEventevent,void*arg);
56+
4857
staticboolTransactionIdIsInDtmSnapshot(Snapshots,TransactionIdxid);
4958
staticboolTransactionIdIsInDoubt(Snapshots,TransactionIdxid);
5059

60+
staticvoiddtm_shmem_startup(void);
61+
62+
staticshmem_startup_hook_typeprev_shmem_startup_hook;
63+
staticHTAB*xid_in_doubt;
64+
staticDtmState*dtm;
65+
staticTransactionIdDtmCurrentXid=InvalidTransactionId;
66+
5167
staticNodeIdDtmNodeId;
5268
staticDTMConnDtmConn;
5369
staticSnapshotDataDtmSnapshot= {HeapTupleSatisfiesMVCC };
@@ -105,10 +121,13 @@ static bool TransactionIdIsInDtmSnapshot(Snapshot s, TransactionId xid)
105121

106122
staticboolTransactionIdIsInDoubt(Snapshots,TransactionIdxid)
107123
{
124+
boolinDoubt;
125+
108126
if (!TransactionIdIsInDtmSnapshot(s,xid)) {
109-
XLogRecPtrlsn;
110-
XidStatusstatus=CLOGTransactionIdGetStatus(xid,&lsn);
111-
if (status!=TRANSACTION_STATUS_IN_PROGRESS) {
127+
LWLockAcquire(dtm->lock,LW_SHARED);
128+
inDoubt=hash_search(xid_in_doubt,&xid,HASH_FIND,NULL)!=NULL;
129+
LWLockRelease(dtm->lock);
130+
if (inDoubt) {
112131
XTM_INFO("Wait for transaction %d to complete\n",xid);
113132
XactLockTableWait(xid,NULL,NULL,XLTW_None);
114133
return true;
@@ -191,7 +210,7 @@ static void DtmUpdateRecentXmin(void)
191210
staticSnapshotDtmGetSnapshot(Snapshotsnapshot)
192211
{
193212
XTM_TRACE("XTM: DtmGetSnapshot \n");
194-
if (DtmGlobalTransaction&& !DtmHasSnapshot) {
213+
if (DtmGlobalTransaction/* && !DtmHasSnapshot*/) {
195214
DtmHasSnapshot= true;
196215
DtmEnsureConnection();
197216
DtmGlobalGetSnapshot(DtmConn,DtmNodeId,GetCurrentTransactionId(),&DtmSnapshot);
@@ -224,7 +243,12 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
224243
DtmGlobalTransaction= false;
225244
DtmEnsureConnection();
226245
XTM_INFO("Begin commit transaction %d\n",xid);
227-
CLOGTransactionIdSetTreeStatus(xid,nsubxids,subxids,TRANSACTION_STATUS_COMMITTED,lsn);
246+
247+
DtmCurrentXid=xid;
248+
LWLockAcquire(dtm->lock,LW_EXCLUSIVE);
249+
hash_search(xid_in_doubt,&DtmCurrentXid,HASH_ENTER,NULL);
250+
LWLockRelease(dtm->lock);
251+
228252
if (!DtmGlobalSetTransStatus(DtmConn,DtmNodeId,xid,status, true)&&status!=TRANSACTION_STATUS_ABORTED) {
229253
elog(ERROR,"DTMD failed to set transaction status");
230254
}
@@ -243,15 +267,80 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
243267
CLOGTransactionIdSetTreeStatus(xid,nsubxids,subxids,status,lsn);
244268
}
245269

270+
staticuint32dtm_xid_hash_fn(constvoid*key,Sizekeysize)
271+
{
272+
return (uint32)*(TransactionId*)key;
273+
}
274+
275+
staticintdtm_xid_match_fn(constvoid*key1,constvoid*key2,Sizekeysize)
276+
{
277+
return*(TransactionId*)key1-*(TransactionId*)key2;
278+
}
279+
280+
281+
staticvoidDtmInitialize()
282+
{
283+
boolfound;
284+
staticHASHCTLinfo;
285+
286+
LWLockAcquire(AddinShmemInitLock,LW_EXCLUSIVE);
287+
dtm=ShmemInitStruct("dtm",sizeof(DtmState),&found);
288+
if (!found)
289+
{
290+
dtm->lock=LWLockAssign();
291+
}
292+
LWLockRelease(AddinShmemInitLock);
293+
294+
info.keysize=sizeof(TransactionId);
295+
info.entrysize=sizeof(TransactionId);
296+
info.hash=dtm_xid_hash_fn;
297+
info.match=dtm_xid_match_fn;
298+
xid_in_doubt=ShmemInitHash("xid_in_doubt",DTM_HASH_SIZE,DTM_HASH_SIZE,
299+
&info,
300+
HASH_ELEM |HASH_FUNCTION |HASH_COMPARE);
301+
302+
RegisterXactCallback(DtmXactCallback,NULL);
303+
304+
TM=&DtmTM;
305+
}
306+
307+
staticvoid
308+
DtmXactCallback(XactEventevent,void*arg)
309+
{
310+
if (event==XACT_EVENT_COMMIT&&DtmCurrentXid!=InvalidTransactionId) {
311+
LWLockAcquire(dtm->lock,LW_EXCLUSIVE);
312+
hash_search(xid_in_doubt,&DtmCurrentXid,HASH_REMOVE,NULL);
313+
LWLockRelease(dtm->lock);
314+
}
315+
}
316+
317+
246318
/*
247319
* ***************************************************************************
248320
*/
249321

250322
void
251323
_PG_init(void)
252324
{
253-
TM=&DtmTM;
254-
325+
/*
326+
* In order to create our shared memory area, we have to be loaded via
327+
* shared_preload_libraries. If not, fall out without hooking into any of
328+
* the main system. (We don't throw error here because it seems useful to
329+
* allow the cs_* functions to be created even when the
330+
* module isn't active. The functions must protect themselves against
331+
* being called then, however.)
332+
*/
333+
if (!process_shared_preload_libraries_in_progress)
334+
return;
335+
336+
/*
337+
* Request additional shared resources. (These are no-ops if we're not in
338+
* the postmaster process.) We'll allocate or attach to the shared
339+
* resources in imcs_shmem_startup().
340+
*/
341+
RequestAddinShmemSpace(DTM_SHMEM_SIZE);
342+
RequestAddinLWLocks(1);
343+
255344
DefineCustomIntVariable("dtm.node_id",
256345
"Identifier of node in distributed cluster for DTM",
257346
NULL,
@@ -264,6 +353,12 @@ _PG_init(void)
264353
NULL,
265354
NULL,
266355
NULL);
356+
357+
/*
358+
* Install hooks.
359+
*/
360+
prev_shmem_startup_hook=shmem_startup_hook;
361+
shmem_startup_hook=dtm_shmem_startup;
267362
}
268363

269364
/*
@@ -272,6 +367,16 @@ _PG_init(void)
272367
void
273368
_PG_fini(void)
274369
{
370+
shmem_startup_hook=prev_shmem_startup_hook;
371+
}
372+
373+
374+
staticvoiddtm_shmem_startup(void)
375+
{
376+
if (prev_shmem_startup_hook) {
377+
prev_shmem_startup_hook();
378+
}
379+
DtmInitialize();
275380
}
276381

277382
/*

‎contrib/pg_xtm/tests/transfers.go‎

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package main
33
import (
44
"fmt"
55
"sync"
6-
// "math/rand"
6+
"math/rand"
77
"github.com/jackc/pgx"
88
)
99

@@ -93,7 +93,6 @@ func max(a, b int64) int64 {
9393

9494
functransfer(idint,wg*sync.WaitGroup) {
9595
varerrerror
96-
// var sum1, sum2, sum int32
9796
varxids []int32=make([]int32,2)
9897

9998
conn1,err:=pgx.Connect(cfg1)
@@ -107,8 +106,8 @@ func transfer(id int, wg *sync.WaitGroup) {
107106
fori:=0;i<N_ITERATIONS;i++ {
108107
//amount := 2*rand.Intn(2) - 1
109108
amount:=1
110-
account1:=id//rand.Intn(N_ACCOUNTS)
111-
account2:=id//rand.Intn(N_ACCOUNTS)
109+
account1:=rand.Intn(N_ACCOUNTS)
110+
account2:=rand.Intn(N_ACCOUNTS)
112111

113112
// strt transaction
114113
exec(conn1,"begin")

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp