Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit267b83c

Browse files
committed
Merge changes
2 parentsb395714 +bc8d4bc commit267b83c

33 files changed

+575
-455
lines changed

‎contrib/pg_dtm/README‎

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ dtm_get_snapshot() RETURNS void
3636
libdtm api
3737
----------
3838

39+
// Sets up the host and port for DTM connection.
40+
// The defaults are "127.0.0.1" and 5431.
41+
void TuneToDtm(char *host, int port);
42+
3943
void DtmInitSnapshot(Snapshot snapshot);
4044

4145
// Starts a new global transaction of nParticipants size. Returns the

‎contrib/pg_dtm/libdtm.c‎

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ typedef struct DTMConnData
2424

2525
typedefunsigned long longxid_t;
2626

27+
intDtmdPort;
28+
char*DtmdHost;
29+
30+
2731
// Returns true if the write was successful.
2832
staticbooldtm_write_char(DTMConndtm,charc)
2933
{
@@ -231,13 +235,20 @@ static bool dtm_query(DTMConn dtm, char cmd, int argc, ...)
231235
return true;
232236
}
233237

238+
staticchar*dtmhost="127.0.0.1";
239+
staticintdtmport=5431;
240+
241+
voidTuneToDtm(char*host,intport) {
242+
dtmhost=host;
243+
dtmport=port;
244+
}
245+
234246
staticDTMConnGetConnection()
235247
{
236248
staticDTMConndtm=NULL;
237249
if (dtm==NULL)
238250
{
239-
// FIXME: add API for setting the host and port for dtm connection
240-
dtm=DtmConnect("127.0.0.1",5431);
251+
dtm=DtmConnect(dtmhost,dtmport);
241252
if (dtm==NULL) {
242253
elog(ERROR,"Failed to connect to DTMD");
243254
}

‎contrib/pg_dtm/libdtm.h‎

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88

99
#defineINVALID_XID 0
1010

11+
// Sets up the host and port for DTM connection.
12+
// The defaults are "127.0.0.1" and 5431.
13+
voidTuneToDtm(char*host,intport);
14+
1115
voidDtmInitSnapshot(Snapshotsnapshot);
1216

1317
// Starts a new global transaction. Returns the

‎contrib/pg_dtm/pg_dtm--1.0.sql‎

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,7 @@ LANGUAGE C;
1616
CREATEFUNCTIONdtm_get_current_snapshot_xmax() RETURNSinteger
1717
AS'MODULE_PATHNAME','dtm_get_current_snapshot_xmax'
1818
LANGUAGE C;
19+
20+
CREATEFUNCTIONdtm_get_current_snapshot_xcnt() RETURNSinteger
21+
AS'MODULE_PATHNAME','dtm_get_current_snapshot_xcnt'
22+
LANGUAGE C;

‎contrib/pg_dtm/pg_dtm.c‎

Lines changed: 51 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,10 @@ static bool DtmGlobalXidAssigned;
8787
staticintDtmLocalXidReserve;
8888
staticintDtmCurcid;
8989
staticSnapshotDtmLastSnapshot;
90-
staticTransactionManagerDtmTM= {DtmGetTransactionStatus,DtmSetTransactionStatus,DtmGetSnapshot,DtmGetNewTransactionId,DtmGetOldestXmin,TransactionIdIsRunning,DtmGetGlobalTransactionId };
90+
staticTransactionManagerDtmTM= {DtmGetTransactionStatus,DtmSetTransactionStatus,DtmGetSnapshot,DtmGetNewTransactionId,DtmGetOldestXmin,PgTransactionIdIsInProgress,DtmGetGlobalTransactionId,PgXidInMVCCSnapshot };
91+
92+
staticchar*DtmHost;
93+
staticintDtmPort;
9194

9295

9396
#defineXTM_TRACE(fmt, ...)
@@ -169,7 +172,7 @@ static void DtmMergeWithGlobalSnapshot(Snapshot dst)
169172
* Check that global and local snapshots are consistent: transactions marked as completed in global snapohsot
170173
* should be completed locally
171174
*/
172-
dst=GetLocalSnapshotData(dst);
175+
dst=PgGetSnapshotData(dst);
173176
for (i=0;i<dst->xcnt;i++) {
174177
if (TransactionIdIsInDoubt(dst->xip[i])) {
175178
gotoGetLocalSnapshot;
@@ -213,7 +216,7 @@ static void DtmMergeWithGlobalSnapshot(Snapshot dst)
213216
*/
214217
staticTransactionIdDtmGetOldestXmin(Relationrel,boolignoreVacuum)
215218
{
216-
TransactionIdlocalXmin=GetOldestLocalXmin(rel,ignoreVacuum);
219+
TransactionIdlocalXmin=PgGetOldestXmin(rel,ignoreVacuum);
217220
TransactionIdglobalXmin=dtm->minXid;
218221
XTM_INFO("XTM: DtmGetOldestXmin localXmin=%d, globalXmin=%d\n",localXmin,globalXmin);
219222

@@ -526,7 +529,7 @@ static Snapshot DtmGetSnapshot(Snapshot snapshot)
526529
* which PRECEDS actual transaction for which Xid is received.
527530
* This transaction doesn't need to take in accountn global snapshot
528531
*/
529-
returnGetLocalSnapshotData(snapshot);
532+
returnPgGetSnapshotData(snapshot);
530533
}
531534
if (TransactionIdIsValid(DtmNextXid)&&snapshot!=&CatalogSnapshotData) {
532535
if (!DtmHasGlobalSnapshot&& (snapshot!=DtmLastSnapshot||DtmCurcid!=snapshot->curcid)) {
@@ -543,7 +546,7 @@ static Snapshot DtmGetSnapshot(Snapshot snapshot)
543546
}
544547
}else {
545548
/* For local transactions and catalog snapshots use default GetSnapshotData implementation */
546-
snapshot=GetLocalSnapshotData(snapshot);
549+
snapshot=PgGetSnapshotData(snapshot);
547550
}
548551
DtmUpdateRecentXmin(snapshot);
549552
CurrentTransactionSnapshot=snapshot;
@@ -557,7 +560,7 @@ static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn)
557560
*/
558561
XidStatusstatus=xid >=ShmemVariableCache->nextXid
559562
?TRANSACTION_STATUS_IN_PROGRESS
560-
:CLOGTransactionIdGetStatus(xid,lsn);
563+
:PgTransactionIdGetStatus(xid,lsn);
561564
XTM_TRACE("XTM: DtmGetTransactionStatus\n");
562565
returnstatus;
563566
}
@@ -569,7 +572,7 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
569572
if (!DtmGlobalXidAssigned&&TransactionIdIsValid(DtmNextXid)) {
570573
CurrentTransactionSnapshot=NULL;
571574
if (status==TRANSACTION_STATUS_ABORTED) {
572-
CLOGTransactionIdSetTreeStatus(xid,nsubxids,subxids,status,lsn);
575+
PgTransactionIdSetTreeStatus(xid,nsubxids,subxids,status,lsn);
573576
DtmGlobalSetTransStatus(xid,status, false);
574577
XTM_INFO("Abort transaction %d\n",xid);
575578
return;
@@ -592,7 +595,7 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
592595
status=gs;
593596
}
594597
}
595-
CLOGTransactionIdSetTreeStatus(xid,nsubxids,subxids,status,lsn);
598+
PgTransactionIdSetTreeStatus(xid,nsubxids,subxids,status,lsn);
596599
}
597600

598601
staticuint32dtm_xid_hash_fn(constvoid*key,Sizekeysize)
@@ -713,6 +716,36 @@ _PG_init(void)
713716
NULL
714717
);
715718

719+
DefineCustomStringVariable(
720+
"dtm.host",
721+
"The host where DTM daemon resides",
722+
NULL,
723+
&DtmHost,
724+
"127.0.0.1",
725+
PGC_BACKEND,// context
726+
0,// flags,
727+
NULL,// GucStringCheckHook check_hook,
728+
NULL,// GucStringAssignHook assign_hook,
729+
NULL// GucShowHook show_hook
730+
);
731+
732+
DefineCustomIntVariable(
733+
"dtm.port",
734+
"The port DTM daemon is listening",
735+
NULL,
736+
&DtmPort,
737+
5431,
738+
1,
739+
INT_MAX,
740+
PGC_BACKEND,
741+
0,
742+
NULL,
743+
NULL,
744+
NULL
745+
);
746+
747+
TuneToDtm(DtmHost,DtmPort);
748+
716749
/*
717750
* Install hooks.
718751
*/
@@ -748,6 +781,7 @@ PG_FUNCTION_INFO_V1(dtm_begin_transaction);
748781
PG_FUNCTION_INFO_V1(dtm_join_transaction);
749782
PG_FUNCTION_INFO_V1(dtm_get_current_snapshot_xmax);
750783
PG_FUNCTION_INFO_V1(dtm_get_current_snapshot_xmin);
784+
PG_FUNCTION_INFO_V1(dtm_get_current_snapshot_xcnt);
751785

752786
Datum
753787
dtm_get_current_snapshot_xmin(PG_FUNCTION_ARGS)
@@ -761,11 +795,19 @@ dtm_get_current_snapshot_xmax(PG_FUNCTION_ARGS)
761795
PG_RETURN_INT32(CurrentTransactionSnapshot->xmax);
762796
}
763797

798+
Datum
799+
dtm_get_current_snapshot_xcnt(PG_FUNCTION_ARGS)
800+
{
801+
PG_RETURN_INT32(CurrentTransactionSnapshot->xcnt);
802+
}
803+
764804
Datum
765805
dtm_begin_transaction(PG_FUNCTION_ARGS)
766806
{
767807
Assert(!TransactionIdIsValid(DtmNextXid));
768-
808+
if (dtm==NULL) {
809+
elog(ERROR,"DTM is not properly initialized, please check that pg_dtm plugin was added to shared_preload_libraries list in postgresql.conf");
810+
}
769811
DtmNextXid=DtmGlobalStartTransaction(&DtmSnapshot,&dtm->minXid);
770812
Assert(TransactionIdIsValid(DtmNextXid));
771813
XTM_INFO("%d: Start global transaction %d, dtm->minXid=%d\n",getpid(),DtmNextXid,dtm->minXid);

‎contrib/pg_dtm/tests/daemons.go‎

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"sync"
99
"os"
1010
"strconv"
11+
"strings"
1112
)
1213

1314
funcread_to_channel(r io.Reader,cchanstring,wg*sync.WaitGroup) {
@@ -91,9 +92,12 @@ func postgres(bin string, datadir string, port int, nodeid int, wg *sync.WaitGro
9192
"-D",datadir,
9293
"-p",strconv.Itoa(port),
9394
"-c","dtm.node_id="+strconv.Itoa(nodeid),
95+
"-c","dtm.host=127.0.0.2",
96+
"-c","dtm.port="+strconv.Itoa(5431),
9497
"-c","autovacuum=off",
9598
"-c","fsync=off",
9699
"-c","synchronous_commit=off",
100+
"-c","shared_preload_libraries=pg_dtm",
97101
}
98102
name:="postgres "+datadir
99103
c:=make(chanstring)
@@ -118,14 +122,33 @@ func check_bin(bin *map[string]string) {
118122
}
119123
}
120124

125+
funcget_prefix(srcrootstring)string {
126+
makefile,err:=os.Open(srcroot+"/src/Makefile.global")
127+
iferr!=nil {
128+
return"."
129+
}
130+
131+
scanner:=bufio.NewScanner(makefile)
132+
forscanner.Scan() {
133+
s:=scanner.Text()
134+
ifstrings.HasPrefix(s,"prefix := ") {
135+
returnstrings.TrimPrefix(s,"prefix := ")
136+
}
137+
}
138+
return"."
139+
}
140+
121141
funcmain() {
142+
srcroot:="../../.."
143+
prefix:=get_prefix(srcroot)
144+
122145
bin:=map[string]string{
123-
"dtmd":"/home/kvap/postgrespro/contrib/pg_xtm/dtmd/bin/dtmd",
124-
"initdb":"/home/kvap/postgrespro-build/bin/initdb",
125-
"postgres":"/home/kvap/postgrespro-build/bin/postgres",
146+
"dtmd":srcroot+"/contrib/pg_dtm/dtmd/bin/dtmd",
147+
"initdb":prefix+"/bin/initdb",
148+
"postgres":prefix+"/bin/postgres",
126149
}
127150

128-
datadirs:= []string{"/tmp/data1","/tmp/data2"}
151+
datadirs:= []string{"/tmp/data1","/tmp/data2","/tmp/data3"}
129152

130153
check_bin(&bin);
131154

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp