Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitcae87f3

Browse files
committed
Better integration of DTM in FDW
1 parentc9f37d6 commitcae87f3

File tree

12 files changed

+63
-40
lines changed

12 files changed

+63
-40
lines changed

‎contrib/pg_xtm/dtmd/include/transaction.h‎

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ typedef struct Transaction {
1515
xid_txid;
1616

1717
intsize;// number of paritcipants
18-
intmax_size;// maximal number of participants
1918

2019
// for + against ≤ size
2120
intvotes_for;

‎contrib/pg_xtm/dtmd/src/main.c‎

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -257,18 +257,11 @@ static char *onbegin(void *stream, void *clientdata, cmd_t *cmd) {
257257
);
258258

259259
CHECK(
260-
cmd->argc==1,
260+
cmd->argc==0,
261261
clientdata,
262262
"BEGIN: wrong number of arguments"
263263
);
264264

265-
intsize=cmd->argv[0];
266-
CHECK(
267-
size <=MAX_NODES,
268-
clientdata,
269-
"BEGIN: 'size' > MAX_NODES"
270-
);
271-
272265
CHECK(
273266
CLIENT_XID(clientdata)==INVALID_XID,
274267
clientdata,
@@ -280,7 +273,6 @@ static char *onbegin(void *stream, void *clientdata, cmd_t *cmd) {
280273

281274
prev_gxid=t->xid=next_gxid++;
282275
t->snapshots_count=0;
283-
t->max_size=size;
284276
t->size=1;
285277

286278
CLIENT_SNAPSENT(clientdata)=0;
@@ -438,11 +430,6 @@ static char *onsnapshot(void *stream, void *clientdata, cmd_t *cmd) {
438430
CLIENT_SNAPSENT(clientdata)=0;
439431
CLIENT_XID(clientdata)=t->xid;
440432
t->size+=1;
441-
CHECK(
442-
t->size <=t->max_size,
443-
clientdata,
444-
"SNAPSHOT: too many participants"
445-
);
446433
}
447434

448435
CHECK(

‎contrib/pg_xtm/dtmd/src/transaction.c‎

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ void transaction_clear(Transaction *t) {
2828

2929
t->xid=INVALID_XID;
3030
t->size=0;
31-
t->max_size=0;
3231
t->votes_for=0;
3332
t->votes_against=0;
3433
t->snapshots_count=0;

‎contrib/pg_xtm/libdtm.c‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -281,15 +281,15 @@ void DtmInitSnapshot(Snapshot snapshot)
281281
// transaction id, fills the 'snapshot' and 'gxmin' on success. 'gxmin' is the
282282
// smallest xmin among all snapshots known to DTM. Returns INVALID_XID
283283
// otherwise.
284-
TransactionIdDtmGlobalStartTransaction(intnParticipants,Snapshotsnapshot,TransactionId*gxmin)
284+
TransactionIdDtmGlobalStartTransaction(Snapshotsnapshot,TransactionId*gxmin)
285285
{
286286
boolok;
287287
xid_txid;
288288
xid_tnumber;
289289
DTMConndtm=GetConnection();
290290

291291
// query
292-
if (!dtm_query(dtm,'b',1,nParticipants)) gotofailure;
292+
if (!dtm_query(dtm,'b',0)) gotofailure;
293293

294294
// response
295295
if (!dtm_read_bool(dtm,&ok)) gotofailure;

‎contrib/pg_xtm/libdtm.h‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@
1010

1111
voidDtmInitSnapshot(Snapshotsnapshot);
1212

13-
// Starts a new global transaction of nParticipants size. Returns the
13+
// Starts a new global transaction. Returns the
1414
// transaction id, fills the 'snapshot' and 'gxmin' on success. 'gxmin' is the
1515
// smallest xmin among all snapshots known to DTM. Returns INVALID_XID
1616
// otherwise.
17-
TransactionIdDtmGlobalStartTransaction(intnParticipants,Snapshotsnapshot,TransactionId*gxmin);
17+
TransactionIdDtmGlobalStartTransaction(Snapshotsnapshot,TransactionId*gxmin);
1818

1919
// Asks the DTM for a fresh snapshot. Fills the 'snapshot' and 'gxmin' on
2020
// success. 'gxmin' is the smallest xmin among all snapshots known to DTM.

‎contrib/pg_xtm/pg_dtm--1.0.sql‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
22
\echo Use"CREATE EXTENSION pg_dtm" to load this file. \quit
33

4-
CREATEFUNCTIONdtm_begin_transaction(n_participantsinteger) RETURNSinteger
4+
CREATEFUNCTIONdtm_begin_transaction() RETURNSinteger
55
AS'MODULE_PATHNAME','dtm_begin_transaction'
66
LANGUAGE C;
77

‎contrib/pg_xtm/pg_dtm.c‎

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ static bool DtmTransactionIdIsInProgress(TransactionId xid);
7474
staticTransactionIdDtmGetNextXid(void);
7575
staticTransactionIdDtmGetNewTransactionId(boolisSubXact);
7676
staticTransactionIdDtmGetOldestXmin(Relationrel,boolignoreVacuum);
77+
staticTransactionIdDtmGetGlobalTransactionId(void);
7778

7879
staticboolTransactionIdIsInSnapshot(TransactionIdxid,Snapshotsnapshot);
7980
staticboolTransactionIdIsInDoubt(TransactionIdxid);
@@ -92,7 +93,7 @@ static bool DtmGlobalXidAssigned;
9293
staticintDtmLocalXidReserve;
9394
staticintDtmCurcid;
9495
staticSnapshotDtmLastSnapshot;
95-
staticTransactionManagerDtmTM= {DtmGetTransactionStatus,DtmSetTransactionStatus,DtmGetSnapshot,DtmGetNewTransactionId,DtmGetOldestXmin,DtmTransactionIdIsInProgress };
96+
staticTransactionManagerDtmTM= {DtmGetTransactionStatus,DtmSetTransactionStatus,DtmGetSnapshot,DtmGetNewTransactionId,DtmGetOldestXmin,DtmTransactionIdIsInProgress,DtmGetGlobalTransactionId };
9697

9798

9899
#defineXTM_TRACE(fmt, ...)
@@ -323,6 +324,12 @@ static TransactionId DtmGetNextXid()
323324
returnxid;
324325
}
325326

327+
TransactionId
328+
DtmGetGlobalTransactionId()
329+
{
330+
returnDtmNextXid;
331+
}
332+
326333
TransactionId
327334
DtmGetNewTransactionId(boolisSubXact)
328335
{
@@ -667,8 +674,8 @@ static void DtmInitialize()
667674
staticvoid
668675
DtmXactCallback(XactEventevent,void*arg)
669676
{
677+
XTM_INFO("%d: DtmXactCallbackevent=%d isGlobal=%d, nextxid=%d\n",getpid(),event,DtmGlobalXidAssigned,DtmNextXid);
670678
if (event==XACT_EVENT_COMMIT||event==XACT_EVENT_ABORT) {
671-
XTM_INFO("%d: DtmXactCallbackevent=%d isGlobal=%d, nextxid=%d\n",getpid(),event,DtmGlobalXidAssigned,DtmNextXid);
672679
if (DtmGlobalXidAssigned) {
673680
DtmGlobalXidAssigned= false;
674681
}elseif (TransactionIdIsValid(DtmNextXid)) {
@@ -780,10 +787,9 @@ dtm_get_current_snapshot_xmax(PG_FUNCTION_ARGS)
780787
Datum
781788
dtm_begin_transaction(PG_FUNCTION_ARGS)
782789
{
783-
intnParticipants=PG_GETARG_INT32(0);
784790
Assert(!TransactionIdIsValid(DtmNextXid));
785791

786-
DtmNextXid=DtmGlobalStartTransaction(nParticipants,&DtmSnapshot,&dtm->minXid);
792+
DtmNextXid=DtmGlobalStartTransaction(&DtmSnapshot,&dtm->minXid);
787793
Assert(TransactionIdIsValid(DtmNextXid));
788794
XTM_INFO("%d: Start global transaction %d, dtm->minXid=%d\n",getpid(),DtmNextXid,dtm->minXid);
789795

‎contrib/pg_xtm/tests/transfers-fdw.go‎

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package main
33
import (
44
"fmt"
55
"sync"
6-
"strconv"
76
"math/rand"
87
"time"
98
"github.com/jackc/pgx"
@@ -91,7 +90,6 @@ func progress(total int, cCommits chan int, cAborts chan int) {
9190

9291
functransfer(idint,cCommitschanint,cAbortschanint,wg*sync.WaitGroup) {
9392
varerrerror
94-
varxidint32
9593
varnAborts=0
9694
varnCommits=0
9795
varmyCommits=0
@@ -106,10 +104,7 @@ func transfer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup) {
106104
account1:=rand.Intn(N_ACCOUNTS)
107105
account2:=^rand.Intn(N_ACCOUNTS)
108106

109-
exec(conn,"begin")
110-
xid=execQuery(conn,"select dtm_begin_transaction(2)")
111-
exec(conn,"select postgres_fdw_exec('t_fdw'::regclass::oid, 'select public.dtm_join_transaction("+strconv.Itoa(int(xid))+")')")
112-
exec(conn,"commit")
107+
exec(conn,"select dtm_begin_transaction()")
113108

114109
exec(conn,"begin transaction isolation level "+ISOLATION_LEVEL)
115110

@@ -141,25 +136,21 @@ func transfer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup) {
141136
funcinspect(wg*sync.WaitGroup) {
142137
varsumint64
143138
varprevSumint64=0
144-
varxidint32
145139

146140
{
147141
conn,err:=pgx.Connect(cfg1)
148142
checkErr(err)
149143

150144
forrunning {
151145

152-
exec(conn,"begin")
153-
xid=execQuery(conn,"select dtm_begin_transaction(2)")
154-
exec(conn,"select postgres_fdw_exec('t_fdw'::regclass::oid, 'select public.dtm_join_transaction("+strconv.Itoa(int(xid))+")')")
155-
exec(conn,"commit")
146+
exec(conn,"select dtm_begin_transaction()")
156147

157148
exec(conn,"begin transaction isolation level "+ISOLATION_LEVEL)
158149

159150
sum=execQuery64(conn,"select sum(v) from t")
160151

161152
if (sum!=prevSum) {
162-
fmt.Printf("Total=%d xid=%d\n",sum,xid)
153+
fmt.Printf("Total=%d\n",sum)
163154
prevSum=sum
164155
}
165156

‎contrib/pg_xtm/tests/transfers.go‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ func transfer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup) {
130130
src:=conn[0]
131131
dst:=conn[1]
132132

133-
xid=execQuery(src,"select dtm_begin_transaction(2)")
133+
xid=execQuery(src,"select dtm_begin_transaction()")
134134
exec(dst,"select dtm_join_transaction($1)",xid)
135135

136136
// start transaction
@@ -176,7 +176,7 @@ func inspect(wg *sync.WaitGroup) {
176176
checkErr(err)
177177

178178
forrunning {
179-
xid=execQuery(conn1,"select dtm_begin_transaction(2)")
179+
xid=execQuery(conn1,"select dtm_begin_transaction()")
180180
exec(conn2,"select dtm_join_transaction($1)",xid)
181181

182182
exec(conn1,"begin transaction isolation level "+ISOLATION_LEVEL)

‎contrib/postgres_fdw/connection.c‎

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
#include"postgres_fdw.h"
1616

1717
#include"access/xact.h"
18+
#include"access/xtm.h"
19+
#include"access/transam.h"
1820
#include"mb/pg_wchar.h"
1921
#include"miscadmin.h"
2022
#include"utils/hsearch.h"
@@ -401,11 +403,21 @@ begin_remote_xact(ConnCacheEntry *entry)
401403
/* Start main transaction if we haven't yet */
402404
if (entry->xact_depth <=0)
403405
{
406+
TransactionIdgxid=GetTransactionManager()->GetGlobalTransactionId();
404407
constchar*sql;
405408

406409
elog(DEBUG3,"starting remote transaction on connection %p",
407410
entry->conn);
408411

412+
if (TransactionIdIsValid(gxid)) {
413+
charstmt[64];
414+
PGresult*res;
415+
416+
snprintf(stmt,sizeof(stmt),"select public.dtm_join_transaction(%d)",gxid);
417+
res=PQexec(entry->conn,stmt);
418+
PQclear(res);
419+
}
420+
409421
if (IsolationIsSerializable())
410422
sql="START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
411423
else

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp