Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit2c64afe

Browse files
committed
always add local node as participant
1 parent14e6f4e commit2c64afe

File tree

3 files changed

+40
-37
lines changed

3 files changed

+40
-37
lines changed

‎contrib/postgres_fdw/connection.c

Lines changed: 31 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -485,35 +485,37 @@ begin_remote_xact(ConnCacheEntry *entry)
485485
MemoryContextSwitchTo(oldcxt);
486486

487487

488-
res=PQexec(entry->conn,psprintf("SELECT pg_global_snaphot_create('%s')",
489-
two_phase_xact_gid));
488+
// res = PQexec(entry->conn, psprintf("SELECT pg_global_snaphot_create('%s')",
489+
// two_phase_xact_gid));
490+
491+
// if (PQresultStatus(res) != PGRES_TUPLES_OK)
492+
// {
493+
// pgfdw_report_error(ERROR, res, entry->conn, true, sql);
494+
// }
495+
// resp = PQgetvalue(res, 0, 0);
496+
// if (resp == NULL || (*resp) == '\0' || sscanf(resp, "%ld", &current_global_cid) != 1)
497+
// {
498+
// pgfdw_report_error(ERROR, res, entry->conn, true, sql);
499+
// }
500+
// PQclear(res);
501+
502+
503+
current_global_cid=DtmLocalExtend(two_phase_xact_gid);
504+
}
505+
// else
506+
// {
507+
Assert(two_phase_xact_gid);
508+
/* join the new participant */
509+
res=PQexec(entry->conn,
510+
psprintf("SELECT pg_global_snaphot_join("UINT64_FORMAT", '%s')",
511+
current_global_cid,two_phase_xact_gid));
490512

491513
if (PQresultStatus(res)!=PGRES_TUPLES_OK)
492514
{
493515
pgfdw_report_error(ERROR,res,entry->conn, true,sql);
494516
}
495-
resp=PQgetvalue(res,0,0);
496-
if (resp==NULL|| (*resp)=='\0'||sscanf(resp,"%ld",&current_global_cid)!=1)
497-
{
498-
pgfdw_report_error(ERROR,res,entry->conn, true,sql);
499-
}
500517
PQclear(res);
501-
502-
503-
// current_global_cid = DtmLocalExtend(two_phase_xact_gid);
504-
}
505-
506-
Assert(two_phase_xact_gid);
507-
/* join the new participant */
508-
res=PQexec(entry->conn,
509-
psprintf("SELECT pg_global_snaphot_join("UINT64_FORMAT", '%s')",
510-
current_global_cid,two_phase_xact_gid));
511-
512-
if (PQresultStatus(res)!=PGRES_TUPLES_OK)
513-
{
514-
pgfdw_report_error(ERROR,res,entry->conn, true,sql);
515-
}
516-
PQclear(res);
518+
// }
517519
}
518520

519521
/* A new potential participant for 2PC */
@@ -1045,22 +1047,22 @@ finalize_dtm(void)
10451047
{
10461048
char*gid=two_phase_xact_gid;// != NULL? two_phase_xact_gid : "";
10471049
cid_tmaxCSN=0;
1048-
//cid_t localCSN = 0;
1050+
cid_tlocalCSN=0;
10491051

10501052
Assert(gid);
10511053

1052-
//DtmLocalBeginPrepare(gid);
1054+
DtmLocalBeginPrepare(gid);
10531055
BroadcastFunc(psprintf("SELECT pg_global_snaphot_begin_prepare('%s')",
10541056
gid));
10551057

10561058
/* Collect CSNs and choose max */
1057-
//localCSN = DtmLocalPrepare(gid, 0);
1059+
localCSN=DtmLocalPrepare(gid,0);
10581060
BroadcastStmt(psprintf("SELECT pg_global_snaphot_prepare('%s', 0)",
10591061
gid),PGRES_TUPLES_OK,DtmMaxCSN,&maxCSN);
1060-
//if (localCSN > maxCSN)
1061-
//maxCSN = localCSN;
1062+
if (localCSN>maxCSN)
1063+
maxCSN=localCSN;
10621064

1063-
//DtmLocalEndPrepare(gid, maxCSN);
1065+
DtmLocalEndPrepare(gid,maxCSN);
10641066
BroadcastFunc(psprintf("SELECT pg_global_snaphot_end_prepare('%s',"UINT64_FORMAT")",
10651067
gid,maxCSN));
10661068

‎src/backend/access/transam/global_snapshot.c

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ static HTAB *xid2status;
8888
staticHTAB*gtid2xid;
8989
staticDtmNodeState*local;
9090
staticuint64totalSleepInterrupts;
91-
staticintDtmVacuumDelay=2;/* sec */
91+
staticintDtmVacuumDelay=10;/* sec */
9292
staticboolDtmRecordCommits=0;
9393

9494
DtmCurrentTransdtm_tx;// XXXX: make static
@@ -489,19 +489,20 @@ DtmLocalBegin(DtmCurrentTrans * x)
489489
* Returns snapshot of current transaction.
490490
*/
491491
cid_t
492-
DtmLocalExtend(DtmCurrentTrans*x,GlobalTransactionIdgtid)
492+
DtmLocalExtend(GlobalTransactionIdgtid)
493493
{
494-
//DtmCurrentTrans *x = &dtm_tx;
494+
DtmCurrentTrans*x=&dtm_tx;
495495

496496
if (gtid!=NULL)
497497
{
498498
SpinLockAcquire(&local->lock);
499499
{
500500
DtmTransId*id= (DtmTransId*)hash_search(gtid2xid,gtid,HASH_ENTER,NULL);
501501

502-
id->xid=x->xid;
502+
id->xid=GetCurrentTransactionId();
503503
id->nSubxids=0;
504504
id->subxids=0;
505+
x->xid=id->xid;
505506
}
506507
strncpy(x->gtid,gtid,MAX_GTID_SIZE);
507508
SpinLockRelease(&local->lock);
@@ -681,7 +682,7 @@ DtmLocalCommit(DtmCurrentTrans * x)
681682

682683
ts= (DtmTransStatus*)hash_search(xid2status,&x->xid,HASH_ENTER,&found);
683684
ts->status=TRANSACTION_STATUS_COMMITTED;
684-
if (x->is_prepared)
685+
if (found)
685686
{
686687
inti;
687688
DtmTransStatus*sts=ts;
@@ -755,7 +756,7 @@ DtmLocalAbort(DtmCurrentTrans * x)
755756

756757
Assert(TransactionIdIsValid(x->xid));
757758
ts= (DtmTransStatus*)hash_search(xid2status,&x->xid,HASH_ENTER,&found);
758-
if (x->is_prepared)
759+
if (found)
759760
{
760761
Assert(found);
761762
Assert(x->is_global);
@@ -902,7 +903,7 @@ Datum
902903
pg_global_snaphot_create(PG_FUNCTION_ARGS)
903904
{
904905
GlobalTransactionIdgtid=text_to_cstring(PG_GETARG_TEXT_PP(0));
905-
cid_tcid=DtmLocalExtend(&dtm_tx,gtid);
906+
cid_tcid=DtmLocalExtend(gtid);
906907

907908
DTM_TRACE((stderr,"Backend %d extends transaction %u(%s) to global with cid=%lu\n",getpid(),dtm_tx.xid,gtid,cid));
908909
PG_RETURN_INT64(cid);

‎src/include/access/global_snapshot.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ voidDtmInitialize(void);
3030
voidDtmLocalBegin(DtmCurrentTrans*x);
3131

3232
/* Extend local transaction to global by assigning upper bound CSN which is returned to coordinator */
33-
cid_tDtmLocalExtend(DtmCurrentTrans*x,GlobalTransactionIdgtid);
33+
externcid_tDtmLocalExtend(GlobalTransactionIdgtid);
3434

3535
/* Function called at first access to any datanode except first one involved in distributed transaction */
3636
cid_tDtmLocalAccess(DtmCurrentTrans*x,GlobalTransactionIdgtid,cid_tsnapshot);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp