Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commiteb21867

Browse files
committed
Wrap standalone statements with transaction block
2 parents7da0e33 +02ed44b commiteb21867

File tree

9 files changed

+49
-30
lines changed

9 files changed

+49
-30
lines changed

‎contrib/mmts/arbiter.c‎

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -243,30 +243,34 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
243243
if (!MtmResolveHostByName(host,addrs,&n_addrs)) {
244244
elog(ERROR,"Arbiter failed to resolve host '%s' by name",host);
245245
}
246-
Retry:
247-
sd=socket(AF_INET,SOCK_STREAM,0);
248-
if (sd<0) {
249-
elog(ERROR,"Arbiter failed to create socket: %d",errno);
250-
}
246+
247+
Retry:
248+
251249
while (1) {
252250
intrc=-1;
251+
252+
sd=socket(AF_INET,SOCK_STREAM,0);
253+
if (sd<0) {
254+
elog(ERROR,"Arbiter failed to create socket: %d",errno);
255+
}
253256
for (i=0;i<n_addrs;++i) {
254257
memcpy(&sock_inet.sin_addr,&addrs[i],sizeofsock_inet.sin_addr);
255258
do {
256259
rc=connect(sd, (structsockaddr*)&sock_inet,sizeof(sock_inet));
257260
}while (rc<0&&errno==EINTR);
258-
261+
259262
if (rc >=0||errno==EINPROGRESS) {
260263
break;
261264
}
262265
}
263266
if (rc<0) {
264267
if ((errno!=ENOENT&&errno!=ECONNREFUSED&&errno!=EINPROGRESS)||max_attempts==0) {
265-
elog(WARNING,"Arbiter failed to connect to %s:%d: %d",host,port,errno);
268+
elog(WARNING,"Arbiter failed to connect to %s:%d:error=%d",host,port,errno);
266269
return-1;
267270
}else {
268271
max_attempts-=1;
269-
MtmSleep(MtmConnectTimeout);
272+
elog(WARNING,"Arbiter trying to connect to %s:%d: error=%d",host,port,errno);
273+
MtmSleep(5*MtmConnectTimeout);
270274
}
271275
continue;
272276
}else {
@@ -601,7 +605,7 @@ static void MtmTransReceiver(Datum arg)
601605
}while (n<0&&errno==EINTR);
602606
}while (n<0&&MtmRecovery());
603607

604-
if (rc<0) {
608+
if (n<0) {
605609
elog(ERROR,"Arbiter failed to select sockets: %d",errno);
606610
}
607611
for (i=0;i<nNodes;i++) {

‎contrib/mmts/multimaster.c‎

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ typedef struct {
6464
boolisReplicated;/* transaction on replica */
6565
boolisDistributed;/* transaction performed INSERT/UPDATE/DELETE and has to be replicated to other nodes */
6666
boolisPrepared;/* transaction is perpared at first stage of 2PC */
67+
boolisTransactionBlock;/* is transaction block */
6768
boolcontainsDML;/* transaction contains DML statements */
6869
XidStatusstatus;/* transaction status */
6970
csn_tsnapshot;/* transaction snaphsot */
@@ -590,7 +591,7 @@ MtmXactCallback(XactEvent event, void *arg)
590591
MtmEndTransaction(&MtmTx, false);
591592
break;
592593
caseXACT_EVENT_COMMIT_COMMAND:
593-
if (!IsTransactionBlock()) {
594+
if (!MtmTx.isTransactionBlock) {
594595
MtmTwoPhaseCommit(&MtmTx);
595596
}
596597
break;
@@ -629,6 +630,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
629630
x->isReplicated= false;
630631
x->isDistributed=MtmIsUserTransaction();
631632
x->isPrepared= false;
633+
x->isTransactionBlock=IsTransactionBlock();
632634
if (x->isDistributed&&Mtm->status!=MTM_ONLINE) {
633635
/* reject all user's transactions at offline cluster */
634636
MtmUnlock();
@@ -1930,19 +1932,20 @@ MtmGenerateGid(char* gid)
19301932

19311933
staticboolMtmTwoPhaseCommit(MtmCurrentTrans*x)
19321934
{
1933-
if (x->isDistributed&&x->containsDML) {
1935+
if (!x->isReplicated&& (x->isDistributed&&x->containsDML)) {
19341936
MtmGenerateGid(x->gid);
1935-
if (!IsTransactionBlock()) {
1936-
elog(WARNING,"Start transaction block for %d",x->xid);
1937+
if (!x->isTransactionBlock) {
1938+
elog(WARNING,"Start transaction block for %s",x->gid);
19371939
BeginTransactionBlock();
1940+
x->isTransactionBlock= true;
19381941
CommitTransactionCommand();
19391942
StartTransactionCommand();
19401943
}
19411944
if (!PrepareTransactionBlock(x->gid))
19421945
{
19431946
elog(WARNING,"Failed to prepare transaction %s",x->gid);
19441947
/* ??? Should we do explicit rollback */
1945-
}else {
1948+
}else {
19461949
CommitTransactionCommand();
19471950
StartTransactionCommand();
19481951
if (MtmGetCurrentTransactionStatus()==TRANSACTION_STATUS_ABORTED) {
@@ -1970,8 +1973,11 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
19701973
TransactionStmt*stmt= (TransactionStmt*)parsetree;
19711974
switch (stmt->kind)
19721975
{
1976+
caseTRANS_STMT_BEGIN:
1977+
MtmTx.isTransactionBlock= true;
1978+
break;
19731979
caseTRANS_STMT_COMMIT:
1974-
if (MtmTwoPhaseCommit(&MtmTx)) {
1980+
if (MtmTwoPhaseCommit(&MtmTx)) {
19751981
return;
19761982
}
19771983
break;
@@ -2036,9 +2042,6 @@ MtmExecutorFinish(QueryDesc *queryDesc)
20362042
}
20372043
}
20382044
}
2039-
if (MtmTx.isDistributed&&MtmTx.containsDML&& !IsTransactionBlock()) {
2040-
MtmTwoPhaseCommit(&MtmTx);
2041-
}
20422045
}
20432046
if (PreviousExecutorFinishHook!=NULL)
20442047
{

‎contrib/mmts/pglogical_apply.c‎

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -480,13 +480,15 @@ MtmBeginSession(void)
480480
}
481481

482482
staticvoid
483-
MtmEndSession(void)
483+
MtmEndSession(boolunlock)
484484
{
485485
if (replorigin_session_origin!=InvalidRepOriginId) {
486486
MTM_TRACE("%d: Begin reset replorigin session: %d\n",MyProcPid,replorigin_session_origin);
487487
replorigin_session_origin=InvalidRepOriginId;
488488
replorigin_session_reset();
489-
MtmUnlockNode(MtmReplicationNode);
489+
if (unlock) {
490+
MtmUnlockNode(MtmReplicationNode);
491+
}
490492
MTM_TRACE("%d: End reset replorigin session: %d\n",MyProcPid,replorigin_session_origin);
491493
}
492494
}
@@ -568,7 +570,7 @@ process_remote_commit(StringInfo in)
568570
default:
569571
Assert(false);
570572
}
571-
MtmEndSession();
573+
MtmEndSession(true);
572574
}
573575

574576
staticvoid
@@ -935,7 +937,7 @@ void MtmExecutor(int id, void* work, size_t size)
935937
EmitErrorReport();
936938
FlushErrorState();
937939
MTM_TRACE("%d: REMOTE begin abort transaction %d\n",MyProcPid,MtmGetCurrentTransactionId());
938-
MtmEndSession();
940+
MtmEndSession(false);
939941
AbortCurrentTransaction();
940942
MTM_TRACE("%d: REMOTE end abort transaction %d\n",MyProcPid,MtmGetCurrentTransactionId());
941943
}

‎contrib/mmts/tests/dtmbench.cpp‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ void* reader(void* arg)
129129
result r = txn.exec("select sum(v) from t");
130130
int64_t sum = r[0][0].as(int64_t());
131131
if (sum != prevSum) {
132-
//r = txn.exec("select mtm_get_snapshot()");
132+
r = txn.exec("select mtm_get_snapshot()");
133133
printf("Total=%ld, snapshot=%ld\n", sum, r[0][0].as(int64_t()));
134134
prevSum = sum;
135135
}

‎contrib/mmts/tests/pg_hba.conf‎

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,4 +90,7 @@ host all all ::1/128 trust
9090
# replication privilege.
9191
local replication knizhnik trust
9292
host replication knizhnik 127.0.0.1/32 trust
93+
local replication stas trust
94+
host replication stas ::1/128 trust
95+
host replication stas 127.0.0.1/32 trust
9396
#host replication knizhnik ::1/128 trust

‎contrib/mmts/tests/postgresql.conf.mm‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -625,4 +625,4 @@
625625
# Add settingsfor extensions here
626626

627627
multimaster.workers=8
628-
multimaster.queue_size=1073741824
628+
multimaster.queue_size=104857600 # 100mb

‎contrib/mmts/tests/reinit-mm.sh‎

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,12 @@ export PATH=~/code/postgres_cluster/install/bin/:$PATH
33
ulimit -c unlimited
44
pkill -9 postgres
55
pkill -9 arbiter
6+
7+
cd~/code/postgres_cluster/contrib/mmts/
8+
make install
9+
cd~/code/postgres_cluster/contrib/mmts/tests
10+
11+
612
rm -fr node?*.log dtm
713
mkdir dtm
814
conn_str=""
@@ -28,9 +34,10 @@ do
2834
echo"multimaster.conn_strings = '$conn_str'">> node$i/postgresql.conf
2935
echo"multimaster.node_id =$i">> node$i/postgresql.conf
3036
cp pg_hba.conf node$i
31-
pg_ctl -D node$i -l node$i.log start
37+
pg_ctl -w -D node$i -l node$i.log start
3238
done
3339

34-
sleep 5
40+
# sleep 5
41+
# psql -c "create extension multimaster;" postgres
3542

3643
echo Done

‎src/backend/access/transam/twophase.c‎

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1248,9 +1248,9 @@ ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
12481248

12491249
hdr= (TwoPhaseFileHeader*)xlrec;
12501250
bufptr=xlrec+MAXALIGN(sizeof(TwoPhaseFileHeader));
1251-
bufptr+=MAXALIGN(hdr->gidlen);
12521251

12531252
strncpy(parsed->twophase_gid,bufptr,hdr->gidlen);
1253+
bufptr+=MAXALIGN(hdr->gidlen);
12541254

12551255
parsed->twophase_xid=hdr->xid;
12561256
parsed->dbId=hdr->database;
@@ -1269,8 +1269,6 @@ ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
12691269

12701270
parsed->msgs= (SharedInvalidationMessage*)bufptr;
12711271
bufptr+=MAXALIGN(hdr->ninvalmsgs*sizeof(SharedInvalidationMessage));
1272-
1273-
// strncpy(parsed->twophase_gid, bufptr, hdr->gidlen);
12741272
}
12751273

12761274

‎src/backend/utils/cache/inval.c‎

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -603,8 +603,10 @@ LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
603603
elseif (msg->rm.dbId==MyDatabaseId)
604604
InvalidateCatalogSnapshot();
605605
}
606-
else
606+
else {
607+
*(int*)0=0;
607608
elog(FATAL,"unrecognized SI message ID: %d",msg->id);
609+
}
608610
}
609611

610612
/*

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp