Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit0f1b88d

Browse files
committed
FDW is workings
1 parenta09fc09 commit0f1b88d

File tree

6 files changed

+40
-85
lines changed

6 files changed

+40
-85
lines changed

‎contrib/pg_xtm/pg_dtm.c‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,8 @@ static TransactionManager DtmTM = { DtmGetTransactionStatus, DtmSetTransactionSt
9696

9797

9898
#defineXTM_TRACE(fmt, ...)
99-
#defineXTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
100-
//#define XTM_INFO(fmt, ...)
99+
//#define XTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
100+
#defineXTM_INFO(fmt, ...)
101101

102102
staticvoidDumpSnapshot(Snapshots,char*name)
103103
{

‎contrib/pg_xtm/tests/transfers-fdw.go‎

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
)
1111

1212
const (
13-
TRANSFER_CONNECTIONS=2
13+
TRANSFER_CONNECTIONS=8
1414
INIT_AMOUNT=10000
1515
N_ITERATIONS=10000
1616
N_ACCOUNTS=TRANSFER_CONNECTIONS//100000
@@ -35,8 +35,6 @@ var cfg2 = pgx.ConnConfig{
3535
varrunning=false
3636

3737
funcprepare_db() {
38-
// var xid int32
39-
4038
conn1,err:=pgx.Connect(cfg1)
4139
checkErr(err)
4240
deferconn1.Close()
@@ -66,7 +64,7 @@ func prepare_db() {
6664

6765
fori:=0;i<N_ACCOUNTS;i++ {
6866
exec(conn1,"insert into t values($1, $2)",i,INIT_AMOUNT)
69-
exec(conn2,"insert into t values($1, $2)",-i,INIT_AMOUNT)
67+
exec(conn2,"insert into t values($1, $2)",^i,INIT_AMOUNT)
7068
}
7169

7270
exec(conn1,"commit")
@@ -106,7 +104,7 @@ func transfer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup) {
106104
formyCommits<N_ITERATIONS {
107105
amount:=1
108106
account1:=rand.Intn(N_ACCOUNTS)
109-
account2:=-rand.Intn(N_ACCOUNTS)
107+
account2:=^rand.Intn(N_ACCOUNTS)
110108

111109
exec(conn,"begin")
112110
xid=execQuery(conn,"select dtm_begin_transaction(2)")
@@ -150,6 +148,7 @@ func inspect(wg *sync.WaitGroup) {
150148
checkErr(err)
151149

152150
forrunning {
151+
153152
exec(conn,"begin")
154153
xid=execQuery(conn,"select dtm_begin_transaction(2)")
155154
exec(conn,"select postgres_fdw_exec('t_fdw'::regclass::oid, 'select public.dtm_join_transaction("+strconv.Itoa(int(xid))+")')")

‎contrib/pg_xtm/tests/transfers.go‎

Lines changed: 22 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,10 @@ import (
1111
const (
1212
TRANSFER_CONNECTIONS=8
1313
INIT_AMOUNT=10000
14-
N_ITERATIONS=100000
14+
N_ITERATIONS=10000
1515
N_ACCOUNTS=TRANSFER_CONNECTIONS//100000
16-
//ISOLATION_LEVEL = "repeatable read"
17-
ISOLATION_LEVEL="read committed"
18-
GLOBAL_UPDATES=true
19-
LOCAL_UPDATES=false
16+
ISOLATION_LEVEL="repeatable read"
17+
//ISOLATION_LEVEL = "read committed"
2018
)
2119

2220

@@ -50,8 +48,6 @@ func commit(conn1, conn2 *pgx.Conn) {
5048
}
5149

5250
funcprepare_db() {
53-
// var xid int32
54-
5551
conn1,err:=pgx.Connect(cfg1)
5652
checkErr(err)
5753
deferconn1.Close()
@@ -70,9 +66,6 @@ func prepare_db() {
7066
exec(conn2,"drop table if exists t")
7167
exec(conn2,"create table t(u int primary key, v int)")
7268

73-
// xid = execQuery(conn1, "select dtm_begin_transaction(2)")
74-
// exec(conn2, "select dtm_join_transaction($1)", xid)
75-
7669
// strt transaction
7770
exec(conn1,"begin transaction isolation level "+ISOLATION_LEVEL)
7871
exec(conn2,"begin transaction isolation level "+ISOLATION_LEVEL)
@@ -134,63 +127,27 @@ func transfer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup) {
134127
account1:=rand.Intn(N_ACCOUNTS)
135128
account2:=rand.Intn(N_ACCOUNTS)
136129

137-
if (account1>=account2) {
138-
continue
139-
}
130+
src:=conn[0]
131+
dst:=conn[1]
140132

141-
srci:=rand.Intn(2)
142-
dsti:=rand.Intn(2)
143-
if (srci>dsti) {
144-
continue
145-
}
133+
xid=execQuery(src,"select dtm_begin_transaction(2)")
134+
exec(dst,"select dtm_join_transaction($1)",xid)
146135

147-
src:=conn[srci]
148-
dst:=conn[dsti]
136+
// start transaction
137+
exec(src,"begin transaction isolation level "+ISOLATION_LEVEL)
138+
exec(dst,"begin transaction isolation level "+ISOLATION_LEVEL)
149139

150-
ifsrc==dst {
151-
// local update
152-
if!LOCAL_UPDATES {
153-
// which we do not want
154-
continue
155-
}
140+
ok1:=execUpdate(src,"update t set v = v - $1 where u=$2",amount,account1)
141+
ok2:=execUpdate(dst,"update t set v = v + $1 where u=$2",amount,account2)
156142

157-
exec(src,"begin transaction isolation level "+ISOLATION_LEVEL)
158-
ok1:=execUpdate(src,"update t set v = v - $1 where u=$2",amount,account1)
159-
ok2:=execUpdate(src,"update t set v = v + $1 where u=$2",amount,account2)
160-
if!ok1||!ok2 {
161-
exec(src,"rollback")
162-
nAborts+=1
163-
}else {
164-
exec(src,"commit")
165-
nCommits+=1
166-
myCommits+=1
167-
}
143+
if!ok1||!ok2 {
144+
exec(src,"rollback")
145+
exec(dst,"rollback")
146+
nAborts+=1
168147
}else {
169-
// global update
170-
if!GLOBAL_UPDATES {
171-
// which we do not want
172-
continue
173-
}
174-
175-
xid=execQuery(src,"select dtm_begin_transaction(2)")
176-
exec(dst,"select dtm_join_transaction($1)",xid)
177-
178-
// start transaction
179-
exec(src,"begin transaction isolation level "+ISOLATION_LEVEL)
180-
exec(dst,"begin transaction isolation level "+ISOLATION_LEVEL)
181-
182-
ok1:=execUpdate(src,"update t set v = v - $1 where u=$2",amount,account1)
183-
ok2:=execUpdate(dst,"update t set v = v + $1 where u=$2",amount,account2)
184-
185-
if!ok1||!ok2 {
186-
exec(src,"rollback")
187-
exec(dst,"rollback")
188-
nAborts+=1
189-
}else {
190-
commit(src,dst)
191-
nCommits+=1
192-
myCommits+=1
193-
}
148+
commit(src,dst)
149+
nCommits+=1
150+
myCommits+=1
194151
}
195152

196153
iftime.Since(start).Seconds()>1 {
@@ -285,9 +242,9 @@ func execUpdate(conn *pgx.Conn, stmt string, arguments ...interface{}) bool {
285242
varerrerror
286243
// fmt.Println(stmt)
287244
_,err=conn.Exec(stmt,arguments... )
288-
iferr!=nil {
289-
fmt.Println(err)
290-
}
245+
//if err != nil {
246+
// fmt.Println(err)
247+
//}
291248
returnerr==nil
292249
}
293250

‎src/backend/access/heap/heapam.c‎

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3632,20 +3632,15 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
36323632

36333633
#if0
36343634
{
3635-
charbuf[256];
3636-
sprintf(buf,"backend-%d.trace",getpid());
3637-
FILE*f=fopen(buf,"a");
36383635
Snapshots=GetTransactionSnapshot();
3639-
fprintf(f,"xid=%d: old.ctid=[%x-%x,%x], old.xmin=%d, old.xmax=%d, old.mask=%x, new.xmin=%d, new.xmax=%d, new.flags=%x, snap.xmin=%d, snap.xmax=%d, xcnt=%d, xip={%d,%d,%d,%d,%d}\n",
3640-
xid,
3636+
fprintf(stderr,"pid=%d transaction %d update tuple: old.ctid=[%x-%x,%x], old.xmin=%d, old.xmax=%d, old.mask=%x, new.xmin=%d, new.xmax=%d, new.flags=%x, snap.xmin=%d, snap.xmax=%d\n",
3637+
getpid(),xid,
36413638
oldtup.t_data->t_ctid.ip_blkid.bi_hi,
36423639
oldtup.t_data->t_ctid.ip_blkid.bi_lo,
36433640
oldtup.t_data->t_ctid.ip_posid,
36443641
HeapTupleHeaderGetRawXmin(oldtup.t_data),HeapTupleHeaderGetRawXmax(oldtup.t_data),oldtup.t_data->t_infomask,
36453642
xid,xmax_new_tuple,infomask_new_tuple,
3646-
s->xmin,s->xmax,s->xcnt,s->xip[0],s->xip[1],s->xip[2],s->xip[3],s->xip[4]
3647-
);
3648-
fclose(f);
3643+
s->xmin,s->xmax);
36493644
}
36503645
Assert(xmax_new_tuple!=xid|| (newtup->t_data->t_infomask&HEAP_XMAX_LOCK_ONLY)!=0);
36513646
#endif

‎src/backend/access/heap/visibilitymap.c‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer heapBuf,
254254
Pagepage;
255255
char*map;
256256

257-
#if1
257+
#if0
258258
fprintf(stderr,"Visibilitymap cutoff %d, RecentLocalDataXmin=%d\n",cutoff_xid,RecentGlobalDataXmin);
259259
// return;
260260
#endif

‎src/backend/utils/time/tqual.c‎

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -970,7 +970,7 @@ HeapTupleSatisfiesDirty(HeapTuple htup, Snapshot snapshot,
970970
* and more contention on the PGXACT array.
971971
*/
972972
bool
973-
_HeapTupleSatisfiesMVCC(HeapTuplehtup,Snapshotsnapshot,
973+
HeapTupleSatisfiesMVCC(HeapTuplehtup,Snapshotsnapshot,
974974
Bufferbuffer)
975975
{
976976
HeapTupleHeadertuple=htup->t_data;
@@ -1156,7 +1156,7 @@ _HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
11561156

11571157
return false;
11581158
}
1159-
#if1
1159+
#if0
11601160
bool
11611161
HeapTupleSatisfiesMVCC(HeapTuplehtup,Snapshotsnapshot,
11621162
Bufferbuffer)
@@ -1166,8 +1166,12 @@ HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
11661166
HeapTupleHeadertuple=htup->t_data;
11671167
TransactionIdcurxid=GetCurrentTransactionId();
11681168
if (TransactionIdIsNormal(curxid)) {
1169-
fprintf(stderr,"pid=%d Transaction %d, [%d,%d) visibility check for tuple {%d,%d} %x = %d\n",
1170-
getpid(),curxid,snapshot->xmin,snapshot->xmax,HeapTupleHeaderGetRawXmin(tuple),HeapTupleHeaderGetRawXmax(tuple),tuple->t_infomask,result);
1169+
fprintf(stderr,"pid=%d Transaction %d, [%d,%d) visibility check for tuple [%x-%x,%x] {%d,%d} %x = %d\n",
1170+
getpid(),curxid,snapshot->xmin,snapshot->xmax,
1171+
tuple->t_ctid.ip_blkid.bi_hi,
1172+
tuple->t_ctid.ip_blkid.bi_lo,
1173+
tuple->t_ctid.ip_posid,
1174+
HeapTupleHeaderGetRawXmin(tuple),HeapTupleHeaderGetRawXmax(tuple),tuple->t_infomask,result);
11711175
}
11721176

11731177
returnresult;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp