Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit17a908a

Browse files
knizhnikkelvich
authored andcommitted
2PC fixes
1 parent75e4922 commit17a908a

File tree

3 files changed

+26
-19
lines changed

3 files changed

+26
-19
lines changed

‎multimaster.c

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -643,33 +643,36 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
643643
{
644644
MTM_TRACE("%d: End transaction %d, prepared=%d, distributed=%d -> %s\n",MyProcPid,x->xid,x->isPrepared,x->isDistributed,commit ?"commit" :"abort");
645645
if (x->isDistributed&& (x->isPrepared||x->isReplicated)) {
646-
MtmTransState*ts;
646+
MtmTransState*ts=NULL;
647647
MtmLock(LW_EXCLUSIVE);
648648
if (x->isPrepared) {
649649
ts=hash_search(xid2state,&x->xid,HASH_FIND,NULL);
650650
Assert(ts!=NULL);
651651
}else {
652652
MtmTransMap*hm= (MtmTransMap*)hash_search(gid2xid,x->gid,HASH_REMOVE,NULL);
653-
Assert(hm!=NULL);
654-
ts=hm->state;
653+
if (hm!=NULL) {
654+
ts=hm->state;
655+
}
655656
}
656-
if (commit) {
657-
ts->status=TRANSACTION_STATUS_COMMITTED;
658-
if (x->csn>ts->csn) {
659-
ts->csn=x->csn;
660-
MtmSyncClock(ts->csn);
657+
if (ts!=NULL) {
658+
if (commit) {
659+
ts->status=TRANSACTION_STATUS_COMMITTED;
660+
if (x->csn>ts->csn) {
661+
ts->csn=x->csn;
662+
MtmSyncClock(ts->csn);
663+
}
664+
}else {
665+
ts->status=TRANSACTION_STATUS_ABORTED;
666+
if (x->isReplicated&&TransactionIdIsValid(x->gtid.xid)) {
667+
/*
668+
* Send notification only of ABORT happens during transaction processing at replicas,
669+
* do not send notification if ABORT is receiver from master
670+
*/
671+
MtmSendNotificationMessage(ts);/* send notification to coordinator */
672+
}
661673
}
662-
}else {
663-
ts->status=TRANSACTION_STATUS_ABORTED;
664-
if (x->isReplicated&&TransactionIdIsValid(x->gtid.xid)) {
665-
/*
666-
* Send notification only of ABORT happens during transaction processing at replicas,
667-
* do not send notification if ABORT is receiver from master
668-
*/
669-
MtmSendNotificationMessage(ts);/* send notification to coordinator */
670-
}
674+
MtmAdjustSubtransactions(ts);
671675
}
672-
MtmAdjustSubtransactions(ts);
673676
MtmUnlock();
674677
}
675678
x->snapshot=INVALID_CSN;
@@ -1691,6 +1694,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
16911694
DestReceiver*dest,char*completionTag)
16921695
{
16931696
boolskipCommand;
1697+
MTM_TRACE("%d: Process utility statement %s\n",MyProcPid,queryString);
16941698
switch (nodeTag(parsetree))
16951699
{
16961700
caseT_TransactionStmt:
@@ -1702,8 +1706,10 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
17021706
if (dtmTx.isDistributed&&dtmTx.containsDML) {
17031707
chargid[MULTIMASTER_MAX_GID_SIZE];
17041708
MtmGenerateGid(gid);
1709+
MTM_TRACE("%d: Start 2PC with GID=%s for %s\n",MyProcPid,gid,queryString);
17051710
if (!IsTransactionBlock()) {
17061711
elog(WARNING,"Start transaction block for %d",dtmTx.xid);
1712+
BeginTransactionBlock();
17071713
CommitTransactionCommand();
17081714
StartTransactionCommand();
17091715
}

‎pglogical_apply.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -623,6 +623,7 @@ process_remote_insert(StringInfo s, Relation rel)
623623
char*ddl=TextDatumGetCString(new_tuple.values[Anum_mtm_ddl_log_query-1]);
624624
intrc;
625625
SPI_connect();
626+
MTM_TRACE("%d: Execute utility statement %s\n",MyProcPid,ddl);
626627
rc=SPI_execute(ddl, false,0);
627628
SPI_finish();
628629
if (rc!=SPI_OK_UTILITY) {

‎pglogical_receiver.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ pglogical_receiver_main(Datum main_arg)
274274
if (originId!=InvalidRepOriginId) {
275275
originStartPos=replorigin_get_progress(originId, false);
276276
elog(WARNING,"Restart logical receiver at position %lx from node %d",originStartPos,args->remote_node);
277-
}else {
277+
}else {
278278
elog(WARNING,"Start logical receiver from node %d",args->remote_node);
279279
}
280280
CommitTransactionCommand();

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp