Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit1501d53

Browse files
committed
Fix bug in pg_tsdtm
1 parente9e0bdf commit1501d53

File tree

6 files changed

+98
-97
lines changed

6 files changed

+98
-97
lines changed

‎contrib/pg_shard/bench/reinit-pg_shard.sh‎

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ cp postgresql.conf.pg_shard master/postgresql.conf
2828
echo"shared_preload_libraries = 'pg_shard'">> master/postgresql.conf
2929
pg_ctl -D master -l master.log start
3030

31+
sleep 5
32+
3133
for((i=1;i<=n_shards;i++))
3234
do
3335
port=$((5432+i))

‎contrib/pg_shard/src/pg_shard.c‎

Lines changed: 79 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,9 @@ bool UseCitusDBSelectLogic = false;
9494
/* informs pg_shard to use the distributed transaction manager */
9595
boolUseDtmTransactions= false;
9696

97+
/* Use two phase commit for update transactions */
98+
boolDtmTwoPhaseCommit= false;
99+
97100
/* logs each statement used in a distributed plan */
98101
boolLogDistributedStatements= false;
99102

@@ -178,7 +181,7 @@ static ProcessUtility_hook_type PreviousProcessUtilityHook = NULL;
178181
/* XTM stuff */
179182
staticList*connectionsWithDtmTransactions=NIL;
180183
staticcsn_tcurrentGlobalTransactionId=0;
181-
staticintcurrentLocalTransactionId=0;
184+
staticintcurrentLocalTransactionId=0;
182185
staticboolcommitCallbackSet= false;
183186

184187
#defineTRACE(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
@@ -642,7 +645,7 @@ ErrorIfQueryNotSupported(Query *queryTree)
642645
/*
643646
* if (!IsA(targetEntry->expr, Const))
644647
* {
645-
*hasNonConstTargetEntryExprs = true;
648+
*hasNonConstTargetEntryExprs = true;
646649
* }
647650
*/
648651

@@ -656,7 +659,7 @@ ErrorIfQueryNotSupported(Query *queryTree)
656659
* joinTree = queryTree->jointree;
657660
* if (joinTree != NULL && contain_mutable_functions(joinTree->quals))
658661
* {
659-
*hasNonConstQualExprs = true;
662+
*hasNonConstQualExprs = true;
660663
* }
661664
*/
662665
}
@@ -669,9 +672,9 @@ ErrorIfQueryNotSupported(Query *queryTree)
669672
/*
670673
* if (hasNonConstTargetEntryExprs || hasNonConstQualExprs)
671674
* {
672-
*ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
673-
*errmsg("cannot plan sharded modification containing values "
674-
* "which are not constants or constant expressions")));
675+
*ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
676+
*errmsg("cannot plan sharded modification containing values "
677+
* "which are not constants or constant expressions")));
675678
* }
676679
*
677680
*/
@@ -1280,8 +1283,8 @@ PgShardExecutorStart(QueryDesc *queryDesc, int eflags)
12801283
/*
12811284
* if (!UseDtmTransactions)
12821285
* {
1283-
*PreventTransactionChain(topLevel, "distributed commands");
1284-
*eflags |= EXEC_FLAG_SKIP_TRIGGERS;
1286+
*PreventTransactionChain(topLevel, "distributed commands");
1287+
*eflags |= EXEC_FLAG_SKIP_TRIGGERS;
12851288
* }
12861289
*
12871290
*/
@@ -1492,6 +1495,8 @@ ExecuteMultipleShardSelect(DistributedPlan *distributedPlan,
14921495

14931496
ListCell*taskCell=NULL;
14941497

1498+
DtmTwoPhaseCommit=IsTransactionBlock();
1499+
14951500
foreach(taskCell,taskList)
14961501
{
14971502
Task*task= (Task*)lfirst(taskCell);
@@ -1519,6 +1524,7 @@ ExecuteMultipleShardSelect(DistributedPlan *distributedPlan,
15191524

15201525
tuplestore_end(tupleStore);
15211526
}
1527+
15221528
}
15231529

15241530

@@ -1893,6 +1899,7 @@ ExecuteDistributedModify(DistributedPlan *plan)
18931899

18941900
if (UseDtmTransactions)
18951901
{
1902+
DtmTwoPhaseCommit= true;
18961903
PrepareDtmTransaction(task);
18971904
}
18981905

@@ -2000,7 +2007,12 @@ PrepareDtmTransaction(Task *task)
20002007
/* already started a transaction */
20012008
continue;
20022009
}
2003-
2010+
if (!SendCommand(connection,"BEGIN"))
2011+
{
2012+
PurgeConnection(connection);
2013+
abortTransaction= true;
2014+
continue;
2015+
}
20042016
if (!currentGlobalTransactionId)
20052017
{
20062018
/* Send dtm_begin_transaction to the first node */
@@ -2013,7 +2025,7 @@ PrepareDtmTransaction(Task *task)
20132025
continue;
20142026
}
20152027
TRACE("shard_xtm: conn#%p: Sent dtm_begin() to %s:%u -> %llu\n",
2016-
connection,nodeName,nodePort,currentGlobalTransactionId);
2028+
connection,nodeName,nodePort,currentGlobalTransactionId);
20172029
}
20182030
else
20192031
{
@@ -2024,19 +2036,19 @@ PrepareDtmTransaction(Task *task)
20242036
continue;
20252037
}
20262038
TRACE("shard_xtm: conn#%p: Sent dtm_access(%llu) to %s:%u\n",
2027-
connection,currentGlobalTransactionId,nodeName,nodePort);
2039+
connection,currentGlobalTransactionId,nodeName,nodePort);
20282040
}
20292041

20302042
newTransactions=lappend(newTransactions,connection);
20312043
}
20322044

20332045
if (abortTransaction)
20342046
{
2035-
/* make sure we abort all pending transactions */
2036-
connectionsWithDtmTransactions=newTransactions;
2037-
20382047
MemoryContextSwitchTo(oldContext);
20392048

2049+
/* make sure we abort all pending transactions */
2050+
connectionsWithDtmTransactions=newTransactions;
2051+
20402052
/*
20412053
* Since pg_shard reuses connections across transactions on the master,
20422054
* we need to abort pending transactions on the workers.
@@ -2045,40 +2057,8 @@ PrepareDtmTransaction(Task *task)
20452057
ereport(ERROR, (errmsg("aborting distributed transaction due to failures")));
20462058
}
20472059

2048-
foreach(taskPlacementCell,task->taskPlacementList)
2049-
{
2050-
ShardPlacement*taskPlacement= (ShardPlacement*)lfirst(taskPlacementCell);
2051-
char*nodeName=taskPlacement->nodeName;
2052-
int32nodePort=taskPlacement->nodePort;
2053-
PGconn*connection=NULL;
2054-
2055-
connection=GetConnection(nodeName,nodePort, false);
2056-
if (connection==NULL)
2057-
{
2058-
ereport(WARNING, (errmsg("failed to connect to %s:%d",
2059-
nodeName,nodePort)));
2060-
abortTransaction= true;
2061-
continue;
2062-
}
2063-
2064-
if (list_member_ptr(connectionsWithDtmTransactions,connection))
2065-
{
2066-
/* already started a transaction */
2067-
continue;
2068-
}
2069-
2070-
if (!SendCommand(connection,"BEGIN"))
2071-
{
2072-
PurgeConnection(connection);
2073-
abortTransaction= true;
2074-
continue;
2075-
}
2076-
TRACE("shard_xtm: conn#%p: Sent BEGIN to %s:%u\n",connection,nodeName,nodePort);
2077-
2078-
}
2079-
20802060
connectionsWithDtmTransactions=list_union(connectionsWithDtmTransactions,
2081-
newTransactions);
2061+
newTransactions);
20822062

20832063
MemoryContextSwitchTo(oldContext);
20842064

@@ -2107,8 +2087,10 @@ SendDtmBeginTransaction(PGconn *connection)
21072087
char*resp=NULL;
21082088
csn_tremoteTransactionId;
21092089

2110-
2111-
result=PQexec(connection,psprintf("SELECT dtm_extend('%d.%d')",MyProcPid,++currentLocalTransactionId));
2090+
2091+
result=DtmTwoPhaseCommit
2092+
?PQexec(connection,psprintf("SELECT dtm_extend('%d.%d')",MyProcPid,++currentLocalTransactionId))
2093+
:PQexec(connection,"SELECT dtm_extend()");
21122094
if (PQresultStatus(result)!=PGRES_TUPLES_OK)
21132095
{
21142096
ReportRemoteError(connection,result);
@@ -2135,7 +2117,9 @@ SendDtmJoinTransaction(PGconn *connection, csn_t TransactionId)
21352117
PGresult*result=NULL;
21362118
boolresultOK= true;
21372119

2138-
result=PQexec(connection,psprintf("SELECT dtm_access(%llu, '%d.%d')",TransactionId,MyProcPid,currentLocalTransactionId));
2120+
result=PQexec(connection,DtmTwoPhaseCommit
2121+
?psprintf("SELECT dtm_access(%llu, '%d.%d')",TransactionId,MyProcPid,currentLocalTransactionId)
2122+
:psprintf("SELECT dtm_access(%llu)",TransactionId));
21392123
if (PQresultStatus(result)!=PGRES_TUPLES_OK)
21402124
{
21412125
ReportRemoteError(connection,result);
@@ -2170,23 +2154,28 @@ typedef bool (*DtmCommandResultHandler)(PGresult *result, void* arg);
21702154

21712155
staticboolRunDtmStatement(charconst*sql,unsignedexpectedStatus,DtmCommandResultHandlerhandler,void*arg)
21722156
{
2173-
ListCell*connectionCell=NULL;
21742157
intquerySent=0;
21752158
PGresult*result=NULL;
21762159
PGconn*connection=NULL;
21772160
boolallOk= true;
2161+
ListCell*connectionCell=NULL;
2162+
ListCell*nextCell=list_head(connectionsWithDtmTransactions);
2163+
ListCell*prevCell=NULL;
21782164

2179-
foreach(connectionCell,connectionsWithDtmTransactions)
2165+
while ((connectionCell=nextCell)!=NULL)
21802166
{
2167+
nextCell=lnext(connectionCell);
21812168
connection= (PGconn*)lfirst(connectionCell);
21822169
querySent=PQsendQuery(connection,sql);
21832170
if (!querySent)
21842171
{
21852172
ReportRemoteError(connection,NULL);
2173+
list_delete_cell(connectionsWithDtmTransactions,connectionCell,prevCell);
21862174
PurgeConnection(connection);
21872175
allOk= false;
21882176
continue;
21892177
}
2178+
prevCell=connectionCell;
21902179
TRACE("shard_xtm: conn#%p: Sent %s to %s:%s\n",connection,sql,PQhost(connection),PQport(connection));
21912180
}
21922181
foreach(connectionCell,connectionsWithDtmTransactions)
@@ -2199,7 +2188,7 @@ static bool RunDtmStatement(char const* sql, unsigned expectedStatus, DtmCommand
21992188
allOk= false;
22002189
}
22012190
PQclear(result);
2202-
PQresultStatus(result);/* consume NULL result */
2191+
PQgetResult(connection);/* consume NULL result */
22032192
}
22042193
returnallOk;
22052194
}
@@ -2234,37 +2223,46 @@ static bool DtmMaxCSN(PGresult *result, void* arg)
22342223

22352224

22362225
staticvoid
2237-
FinishDtmTransaction(XactEventevent,void*arg)
2226+
FinishDtmTransaction(XactEventevent,void*arg)
22382227
{
2239-
if (!(event==XACT_EVENT_COMMIT||event==XACT_EVENT_ABORT)
2240-
|| !connectionsWithDtmTransactions)
2241-
{
2242-
return;
2243-
}
2244-
if (event==XACT_EVENT_COMMIT)
2245-
{
2246-
csn_tmaxCSN=0;
2247-
2248-
if (!RunDtmCommand(psprintf("PREPARE TRANSACTION '%d.%d'",MyProcPid,currentLocalTransactionId))||
2249-
!RunDtmFunction(psprintf("SELECT dtm_begin_prepare('%d.%d')",MyProcPid,currentLocalTransactionId))||
2250-
!RunDtmStatement(psprintf("SELECT dtm_prepare('%d.%d',0)",MyProcPid,currentLocalTransactionId),PGRES_TUPLES_OK,DtmMaxCSN,&maxCSN)||
2251-
!RunDtmFunction(psprintf("SELECT dtm_end_prepare('%d.%d',%lld)",MyProcPid,currentLocalTransactionId,maxCSN))||
2252-
!RunDtmCommand(psprintf("COMMIT PREPARED '%d.%d'",MyProcPid,currentLocalTransactionId)))
2253-
{
2254-
RunDtmCommand(psprintf("ROLLBACK PREPARED '%d.%d'",MyProcPid,currentLocalTransactionId));
2228+
if ((event==XACT_EVENT_COMMIT||event==XACT_EVENT_ABORT)&&connectionsWithDtmTransactions)
2229+
{
2230+
if (DtmTwoPhaseCommit)
2231+
{
2232+
if (event==XACT_EVENT_COMMIT)
2233+
{
2234+
csn_tmaxCSN=0;
2235+
2236+
if (!RunDtmCommand(psprintf("PREPARE TRANSACTION '%d.%d'",
2237+
MyProcPid,currentLocalTransactionId))||
2238+
!RunDtmFunction(psprintf("SELECT dtm_begin_prepare('%d.%d')",
2239+
MyProcPid,currentLocalTransactionId))||
2240+
!RunDtmStatement(psprintf("SELECT dtm_prepare('%d.%d',0)",
2241+
MyProcPid,currentLocalTransactionId),PGRES_TUPLES_OK,DtmMaxCSN,&maxCSN)||
2242+
!RunDtmFunction(psprintf("SELECT dtm_end_prepare('%d.%d',%lld)",
2243+
MyProcPid,currentLocalTransactionId,maxCSN))||
2244+
!RunDtmCommand(psprintf("COMMIT PREPARED '%d.%d'",
2245+
MyProcPid,currentLocalTransactionId)))
2246+
{
2247+
RunDtmCommand(psprintf("ROLLBACK PREPARED '%d.%d'",
2248+
MyProcPid,currentLocalTransactionId));
2249+
}
2250+
}else {
2251+
RunDtmCommand("ROLLBACK");
2252+
}
2253+
}else {
2254+
RunDtmCommand("COMMIT");
22552255
}
2256-
}else {
2257-
RunDtmCommand("ROLLBACK");
2256+
/*
2257+
* Calling unregister inside callback itself leads to segfault when
2258+
* there are several callbacks on the same event.
2259+
*/
2260+
/*
2261+
* UnregisterXactCallback(FinishDtmTransaction, NULL);
2262+
*/
2263+
connectionsWithDtmTransactions=NIL;
2264+
currentGlobalTransactionId=0;
22582265
}
2259-
/*
2260-
* Calling unregister inside callback itself leads to segfault when
2261-
* there are several callbacks on the same event.
2262-
*/
2263-
/*
2264-
* UnregisterXactCallback(FinishDtmTransaction, NULL);
2265-
*/
2266-
connectionsWithDtmTransactions=NIL;
2267-
currentGlobalTransactionId=0;
22682266
}
22692267

22702268

‎contrib/pg_tsdtm/pg_dtm.c‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -637,7 +637,7 @@ cid_t DtmLocalAccess(DtmCurrentTrans* x, GlobalTransactionId gtid, cid_t global_
637637
id->subxids=0;
638638
}
639639
local_cid=dtm_sync(global_cid);
640-
x->snapshot=local_cid;
640+
x->snapshot=global_cid;
641641
x->is_global= true;
642642
}
643643
SpinLockRelease(&local->lock);

‎contrib/pg_tsdtm/tests/dtmbench.cpp‎

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ struct config
7979
diapason =100000;
8080
deadlockFree =false;
8181
makeSavepoints =false;
82+
maxSnapshot =false;
8283
}
8384
};
8485

@@ -156,7 +157,7 @@ void* reader(void* arg)
156157
sum +=execQuery(*txns[i],"select sum(v) from t");
157158
}
158159
if (sum != prevSum) {
159-
printf("Total=%ld snapshot=%ldm delta=%ld usec\n", sum, snapshot,getCurrentTime()-snapshot);
160+
printf("Total=%ld snapshot=%ld, delta=%ld usec\n", sum, snapshot,getCurrentTime()-snapshot);
160161
prevSum = sum;
161162
}
162163
t.proceeded +=1;

‎contrib/pg_tsdtm/tests/play.sh‎

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
ansible-playbook -i deploy/hosts deploy/cluster.yml
2+
ansible-playbook -i deploy/hosts perf.yml -e nnodes=2
3+
ansible-playbook -i deploy/hosts deploy/cluster.yml
4+
ansible-playbook -i deploy/hosts perf.yml -e nnodes=3
5+
ansible-playbook -i deploy/hosts deploy/cluster.yml
6+
ansible-playbook -i deploy/hosts perf.yml -e nnodes=4
7+
ansible-playbook -i deploy/hosts deploy/cluster.yml
8+
ansible-playbook -i deploy/hosts perf.yml -e nnodes=5
9+

‎contrib/pg_tsdtm/tests/run.sh‎

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,6 @@
1-
# ./dtmbench \
2-
# -c "dbname=postgres host=localhost user=knizhnik port=5432 sslmode=disable" \
3-
# -c "dbname=postgres host=localhost user=knizhnik port=5433 sslmode=disable" \
4-
# -c "dbname=postgres host=localhost user=knizhnik port=5434 sslmode=disable" \
5-
# -n 1000 -a 1000 -w 10 -r 1 $*
6-
7-
ansible-playbook -i deploy/hosts deploy/cluster.yml
8-
ansible-playbook -i deploy/hosts perf.yml -e nnodes=2
9-
ansible-playbook -i deploy/hosts deploy/cluster.yml
10-
ansible-playbook -i deploy/hosts perf.yml -e nnodes=3
11-
ansible-playbook -i deploy/hosts deploy/cluster.yml
12-
ansible-playbook -i deploy/hosts perf.yml -e nnodes=4
13-
ansible-playbook -i deploy/hosts deploy/cluster.yml
14-
ansible-playbook -i deploy/hosts perf.yml -e nnodes=5
1+
./dtmbench \
2+
-c"dbname=postgres host=localhost user=knizhnik port=5432 sslmode=disable" \
3+
-c"dbname=postgres host=localhost user=knizhnik port=5433 sslmode=disable" \
4+
-c"dbname=postgres host=localhost user=knizhnik port=5434 sslmode=disable" \
5+
-n 1000 -a 10000 -d 10000 -w 10 -r 1$*
156

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp