Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit660f187

Browse files
committed
Merge branch 'xtm' of gitlab.postgrespro.ru:pgpro-dev/postgrespro into xtm
2 parentsb1cd83c +78dd16d commit660f187

File tree

4 files changed

+107
-53
lines changed

4 files changed

+107
-53
lines changed

‎contrib/pg_xtm/pg_dtm.c‎

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,14 @@ static TransactionId DtmMinXid;
8686
staticboolDtmHasGlobalSnapshot;
8787
staticboolDtmIsGlobalTransaction;
8888
staticintDtmLocalXidReserve;
89+
staticintDtmCurcid;
90+
staticSnapshotDtmLastSnapshot;
8991
staticTransactionManagerDtmTM= {DtmGetTransactionStatus,DtmSetTransactionStatus,DtmGetSnapshot,DtmGetNewTransactionId,DtmGetOldestXmin };
9092

9193

9294
#defineXTM_TRACE(fmt, ...)
93-
//#define XTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
94-
#defineXTM_INFO(fmt, ...)
95+
#defineXTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
96+
//#define XTM_INFO(fmt, ...)
9597

9698
staticvoidDumpSnapshot(Snapshots,char*name)
9799
{
@@ -100,8 +102,8 @@ static void DumpSnapshot(Snapshot s, char *name)
100102
char*cursor=buf;
101103
cursor+=sprintf(
102104
cursor,
103-
"snapshot %s for transaction %d: xmin=%d, xmax=%d, active=[",
104-
name,GetCurrentTransactionId(),s->xmin,s->xmax
105+
"snapshot %s(%p) for transaction %d: xmin=%d, xmax=%d, active=[",
106+
name,s,GetCurrentTransactionId(),s->xmin,s->xmax
105107
);
106108
for (i=0;i<s->xcnt;i++) {
107109
if (i==0) {
@@ -480,10 +482,12 @@ DtmGetNewTransactionId(bool isSubXact)
480482

481483
staticSnapshotDtmGetSnapshot(Snapshotsnapshot)
482484
{
483-
if (TransactionIdIsValid(DtmNextXid)) {
484-
if (!DtmHasGlobalSnapshot) {
485+
if (TransactionIdIsValid(DtmNextXid)&&IsMVCCSnapshot(snapshot)&&snapshot!=&CatalogSnapshotData) {
486+
if (!DtmHasGlobalSnapshot&& (snapshot!=DtmLastSnapshot||DtmCurcid!=snapshot->curcid)) {
485487
DtmGlobalGetSnapshot(DtmNextXid,&DtmSnapshot,&DtmMinXid);
486488
}
489+
DtmCurcid=snapshot->curcid;
490+
DtmLastSnapshot=snapshot;
487491
DtmMergeSnapshots(snapshot,&DtmSnapshot);
488492
if (!IsolationUsesXactSnapshot()) {
489493
DtmHasGlobalSnapshot= false;
@@ -595,6 +599,7 @@ DtmXactCallback(XactEvent event, void *arg)
595599
LWLockRelease(dtm->hashLock);
596600
}
597601
DtmNextXid=InvalidTransactionId;
602+
DtmLastSnapshot=NULL;
598603
}
599604
}
600605
}
@@ -701,6 +706,7 @@ dtm_begin_transaction(PG_FUNCTION_ARGS)
701706

702707
DtmHasGlobalSnapshot= true;
703708
DtmIsGlobalTransaction= true;
709+
DtmLastSnapshot=NULL;
704710

705711
PG_RETURN_INT32(DtmNextXid);
706712
}
@@ -716,6 +722,7 @@ Datum dtm_join_transaction(PG_FUNCTION_ARGS)
716722

717723
DtmHasGlobalSnapshot= true;
718724
DtmIsGlobalTransaction= true;
725+
DtmLastSnapshot=NULL;
719726

720727
PG_RETURN_VOID();
721728
}

‎contrib/pg_xtm/tests/daemons.go‎

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,9 @@ func postgres(bin string, datadir string, port int, nodeid int, wg *sync.WaitGro
9191
"-D",datadir,
9292
"-p",strconv.Itoa(port),
9393
"-c","dtm.node_id="+strconv.Itoa(nodeid),
94+
"-c","autovacuum=off",
95+
"-c","fsync=off",
96+
"-c","synchronous_commit=off",
9497
}
9598
name:="postgres "+datadir
9699
c:=make(chanstring)

‎contrib/pg_xtm/tests/transfers.go‎

Lines changed: 89 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"fmt"
55
"sync"
66
"math/rand"
7+
"time"
78
"github.com/jackc/pgx"
89
)
910

@@ -34,10 +35,10 @@ var running = false
3435
varnodes []int32= []int32{0,1}
3536

3637
funcasyncCommit(conn*pgx.Conn,wg*sync.WaitGroup) {
37-
exec(conn,"commit")
38+
exec(conn,"commit")
3839
wg.Done()
3940
}
40-
41+
4142
funccommit(conn1,conn2*pgx.Conn) {
4243
varwg sync.WaitGroup
4344
wg.Add(2)
@@ -66,33 +67,53 @@ func prepare_db() {
6667
exec(conn2,"create extension pg_dtm")
6768
exec(conn2,"drop table if exists t")
6869
exec(conn2,"create table t(u int primary key, v int)")
69-
70+
7071
// xid = execQuery(conn1, "select dtm_begin_transaction(2)")
7172
// exec(conn2, "select dtm_join_transaction($1)", xid)
7273

7374
// strt transaction
7475
exec(conn1,"begin transaction isolation level "+ISOLATION_LEVEL)
7576
exec(conn2,"begin transaction isolation level "+ISOLATION_LEVEL)
76-
77+
7778
fori:=0;i<N_ACCOUNTS;i++ {
7879
exec(conn1,"insert into t values($1, $2)",i,INIT_AMOUNT)
7980
exec(conn2,"insert into t values($1, $2)",i,INIT_AMOUNT)
8081
}
81-
82+
8283
commit(conn1,conn2)
8384
}
8485

8586
funcmax(a,bint64)int64 {
8687
ifa>=b {
8788
returna
88-
}
89+
}
8990
returnb
9091
}
9192

92-
functransfer(idint,wg*sync.WaitGroup) {
93+
funcprogress(totalint,cCommitschanint,cAbortschanint) {
94+
commits:=0
95+
aborts:=0
96+
start:=time.Now()
97+
fornewcommits:=rangecCommits {
98+
newaborts:=<-cAborts
99+
commits+=newcommits
100+
aborts+=newaborts
101+
iftime.Since(start).Seconds()>1 {
102+
fmt.Printf(
103+
"progress %0.2f%%: %d commits, %d aborts\n",
104+
float32(commits)*100.0/float32(total),commits,aborts,
105+
)
106+
start=time.Now()
107+
}
108+
}
109+
}
110+
111+
functransfer(idint,cCommitschanint,cAbortschanint,wg*sync.WaitGroup) {
93112
varerrerror
94113
varxidint32
95-
varnConflicts=0
114+
varnAborts=0
115+
varnCommits=0
116+
varmyCommits=0
96117

97118
conn1,err:=pgx.Connect(cfg1)
98119
checkErr(err)
@@ -102,10 +123,11 @@ func transfer(id int, wg *sync.WaitGroup) {
102123
checkErr(err)
103124
deferconn2.Close()
104125

105-
fori:=0;i<N_ITERATIONS;i++ {
106-
//amount := 2*rand.Intn(2) - 1
107-
amount:=1
108-
account1:=rand.Intn(N_ACCOUNTS)
126+
start:=time.Now()
127+
formyCommits<N_ITERATIONS {
128+
amount:=2*rand.Intn(2000)-1
129+
//amount := 1
130+
account1:=rand.Intn(N_ACCOUNTS)
109131
account2:=rand.Intn(N_ACCOUNTS)
110132

111133
xid=execQuery(conn1,"select dtm_begin_transaction(2)")
@@ -114,25 +136,35 @@ func transfer(id int, wg *sync.WaitGroup) {
114136
// start transaction
115137
exec(conn1,"begin transaction isolation level "+ISOLATION_LEVEL)
116138
exec(conn2,"begin transaction isolation level "+ISOLATION_LEVEL)
117-
118-
ok1:=execUpdate(conn1,"update t set v = v + $1 where u=$2",amount,account1)
119-
ok2:=execUpdate(conn2,"update t set v = v - $1 where u=$2",amount,account2)
120-
if!ok1||!ok2 {
139+
140+
ok1:=execUpdate(conn1,"update t set v = v + $1 where u=$2",amount,account1)
141+
ok2:=execUpdate(conn2,"update t set v = v - $1 where u=$2",amount,account2)
142+
if!ok1||!ok2 {
121143
exec(conn1,"rollback")
122144
exec(conn2,"rollback")
123-
nConflicts+=1
124-
i-=1
125-
}else {
145+
nAborts+=1
146+
}else {
126147
commit(conn1,conn2)
127-
}
148+
nCommits+=1
149+
myCommits+=1
150+
}
151+
152+
iftime.Since(start).Seconds()>1 {
153+
cCommits<-nCommits
154+
cAborts<-nAborts
155+
nCommits=0
156+
nAborts=0
157+
start=time.Now()
158+
}
128159
}
129-
fmt.Println("Test completed with ",nConflicts," conflicts")
160+
cCommits<-nCommits
161+
cAborts<-nAborts
130162
wg.Done()
131163
}
132164

133165
funcinspect(wg*sync.WaitGroup) {
134166
varsum1,sum2,sumint64
135-
varprevSumint64=0
167+
varprevSumint64=0
136168
varxidint32
137169

138170
{
@@ -142,28 +174,33 @@ func inspect(wg *sync.WaitGroup) {
142174
conn2,err:=pgx.Connect(cfg2)
143175
checkErr(err)
144176

145-
forrunning {
177+
forrunning {
178+
xid=execQuery(conn1,"select dtm_begin_transaction(2)")
179+
exec(conn2,"select dtm_join_transaction($1)",xid)
180+
181+
exec(conn1,"begin transaction isolation level "+ISOLATION_LEVEL)
182+
exec(conn2,"begin transaction isolation level "+ISOLATION_LEVEL)
183+
184+
sum1=execQuery64(conn1,"select sum(v) from t")
185+
sum2=execQuery64(conn2,"select sum(v) from t")
186+
187+
sum=sum1+sum2
188+
if (sum!=prevSum) {
189+
xmin1:=execQuery(conn1,"select dtm_get_current_snapshot_xmin()")
190+
xmax1:=execQuery(conn1,"select dtm_get_current_snapshot_xmax()")
191+
xmin2:=execQuery(conn2,"select dtm_get_current_snapshot_xmin()")
192+
xmax2:=execQuery(conn2,"select dtm_get_current_snapshot_xmax()")
193+
fmt.Printf(
194+
"Total=%d xid=%d snap1=[%d, %d) snap2=[%d, %d)\n",
195+
sum,xid,xmin1,xmax1,xmin2,xmax2,
196+
)
197+
prevSum=sum
198+
}
146199

147-
148-
xid=execQuery(conn1,"select dtm_begin_transaction(2)")
149-
exec(conn2,"select dtm_join_transaction($1)",xid)
150-
151-
exec(conn1,"begin transaction isolation level "+ISOLATION_LEVEL)
152-
exec(conn2,"begin transaction isolation level "+ISOLATION_LEVEL)
153-
154-
sum1=execQuery64(conn1,"select sum(v) from t")
155-
sum2=execQuery64(conn2,"select sum(v) from t")
156-
157-
sum=sum1+sum2
158-
if (sum!=prevSum) {
159-
fmt.Println("Total = ",sum,"xid=",xid,"snap1={",execQuery(conn1,"select dtm_get_current_snapshot_xmin()"),execQuery(conn1,"select dtm_get_current_snapshot_xmax()"),"}, snap2={",execQuery(conn2,"select dtm_get_current_snapshot_xmin()"),execQuery(conn2,"select dtm_get_current_snapshot_xmax()"),"}")
160-
prevSum=sum
161-
}
162-
163-
commit(conn1,conn2)
164-
}
165-
conn1.Close()
166-
conn2.Close()
200+
commit(conn1,conn2)
201+
}
202+
conn1.Close()
203+
conn2.Close()
167204
}
168205
wg.Done()
169206
}
@@ -174,9 +211,13 @@ func main() {
174211

175212
prepare_db()
176213

214+
cCommits:=make(chanint)
215+
cAborts:=make(chanint)
216+
goprogress(TRANSFER_CONNECTIONS*N_ITERATIONS,cCommits,cAborts)
217+
177218
transferWg.Add(TRANSFER_CONNECTIONS)
178219
fori:=0;i<TRANSFER_CONNECTIONS;i++ {
179-
gotransfer(i,&transferWg)
220+
gotransfer(i,cCommits,cAborts,&transferWg)
180221
}
181222
running=true
182223
inspectWg.Add(1)
@@ -185,6 +226,8 @@ func main() {
185226
transferWg.Wait()
186227
running=false
187228
inspectWg.Wait()
229+
230+
fmt.Printf("done\n")
188231
}
189232

190233
funcexec(conn*pgx.Conn,stmtstring,arguments...interface{}) {
@@ -216,10 +259,11 @@ func execQuery64(conn *pgx.Conn, stmt string, arguments ...interface{}) int64 {
216259
checkErr(err)
217260
returnresult
218261
}
262+
219263
funccheckErr(errerror) {
220264
iferr!=nil {
221265
panic(err)
222266
}
223267
}
224268

225-
269+
// vim: expandtab ts=4 sts=4 sw=4

‎src/backend/utils/time/tqual.c‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -970,7 +970,7 @@ HeapTupleSatisfiesDirty(HeapTuple htup, Snapshot snapshot,
970970
* and more contention on the PGXACT array.
971971
*/
972972
bool
973-
HeapTupleSatisfiesMVCC(HeapTuplehtup,Snapshotsnapshot,
973+
_HeapTupleSatisfiesMVCC(HeapTuplehtup,Snapshotsnapshot,
974974
Bufferbuffer)
975975
{
976976
HeapTupleHeadertuple=htup->t_data;
@@ -1156,7 +1156,7 @@ HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
11561156

11571157
return false;
11581158
}
1159-
#if0
1159+
#if1
11601160
bool
11611161
HeapTupleSatisfiesMVCC(HeapTuplehtup,Snapshotsnapshot,
11621162
Bufferbuffer)

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp