Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitdc43a58

Browse files
committed
FDW bug fixes
1 parentc7d2d41 commitdc43a58

File tree

3 files changed

+119
-143
lines changed

3 files changed

+119
-143
lines changed

‎contrib/pg_shard/bench/dtmbench.cpp‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ void initializeDatabase()
158158
exec(txn,"CREATE EXTENSION pg_shard");
159159
exec(txn,"create table t(u int primary key, v int)");
160160
exec(txn,"SELECT master_create_distributed_table(table_name := 't', partition_column := 'u')");
161-
exec(txn,"SELECT master_create_worker_shards(table_name := 't', shard_count :=100, replication_factor := 1)");
161+
exec(txn,"SELECT master_create_worker_shards(table_name := 't', shard_count :=9, replication_factor := 1)");
162162
for (int i =0; i < cfg.nAccounts; i++) {
163163
exec(txn,"insert into t values (%d,0)", i);
164164
}

‎contrib/postgres_fdw/connection.c‎

Lines changed: 116 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -650,8 +650,8 @@ pgfdw_xact_callback(XactEvent event, void *arg)
650650
{
651651
switch (event)
652652
{
653-
caseXACT_EVENT_PARALLEL_COMMIT:
654-
caseXACT_EVENT_COMMIT:
653+
caseXACT_EVENT_PARALLEL_PRE_COMMIT:
654+
caseXACT_EVENT_PRE_COMMIT:
655655
{
656656
csn_tmaxCSN=0;
657657

@@ -668,160 +668,134 @@ pgfdw_xact_callback(XactEvent event, void *arg)
668668
{
669669
RunDtmCommand(psprintf("ROLLBACK PREPARED '%d.%d'",
670670
MyProcPid,currentLocalTransactionId));
671-
}
672-
break;
671+
ereport(ERROR,
672+
(errcode(ERRCODE_TRANSACTION_ROLLBACK),
673+
errmsg("transaction was aborted at one of the shards")));
674+
break;
675+
}
676+
return;
673677
}
674-
caseXACT_EVENT_ABORT:
675-
RunDtmCommand("ROLLBACK");
676-
break;
677-
caseXACT_EVENT_PRE_PREPARE:
678-
ereport(ERROR,
679-
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
680-
errmsg("cannot prepare a transaction that modified remote tables")));
681-
break;
682678
default:
683-
break;
679+
break;
684680
}
685-
686-
currentGlobalTransactionId=0;
687-
688-
hash_seq_init(&scan,ConnectionHash);
689-
while ((entry= (ConnCacheEntry*)hash_seq_search(&scan)))
690-
{
691-
/* Ignore cache entry if no open connection right now */
692-
if (entry->conn==NULL)
693-
continue;
694-
/* Reset state to show we're out of a transaction */
695-
entry->xact_depth=0;
696-
697-
/*
698-
* If the connection isn't in a good idle state, discard it to
699-
* recover. Next GetConnection will open a new connection.
700-
*/
701-
if (PQstatus(entry->conn)!=CONNECTION_OK||
702-
PQtransactionStatus(entry->conn)!=PQTRANS_IDLE)
703-
{
704-
elog(DEBUG3,"discarding connection %p, conn status=%d, trans status=%d",entry->conn,PQstatus(entry->conn),PQtransactionStatus(entry->conn));
705-
PQfinish(entry->conn);
706-
entry->conn=NULL;
707-
}
708-
}
709-
}else {
710-
/*
711-
* Scan all connection cache entries to find open remote transactions, and
712-
* close them.
713-
*/
714-
hash_seq_init(&scan,ConnectionHash);
715-
while ((entry= (ConnCacheEntry*)hash_seq_search(&scan)))
716-
{
717-
PGresult*res;
681+
}
682+
/*
683+
* Scan all connection cache entries to find open remote transactions, and
684+
* close them.
685+
*/
686+
hash_seq_init(&scan,ConnectionHash);
687+
while ((entry= (ConnCacheEntry*)hash_seq_search(&scan)))
688+
{
689+
PGresult*res;
718690

719-
/* Ignore cache entry if no open connection right now */
720-
if (entry->conn==NULL)
721-
continue;
691+
/* Ignore cache entry if no open connection right now */
692+
if (entry->conn==NULL)
693+
continue;
722694

723-
/* If it has an open remote transaction, try to close it */
724-
if (entry->xact_depth>0)
725-
{
726-
elog(DEBUG3,"closing remote transaction on connection %p event %d",
727-
entry->conn,event);
695+
/* If it has an open remote transaction, try to close it */
696+
if (entry->xact_depth>0)
697+
{
698+
elog(DEBUG3,"closing remote transaction on connection %p event %d",
699+
entry->conn,event);
728700

729-
switch (event)
730-
{
731-
caseXACT_EVENT_PARALLEL_PRE_COMMIT:
732-
caseXACT_EVENT_PRE_COMMIT:
733-
/* Commit all remote transactions during pre-commit */
734-
do_sql_send_command(entry->conn,"COMMIT TRANSACTION");
735-
continue;
701+
switch (event)
702+
{
703+
caseXACT_EVENT_PARALLEL_PRE_COMMIT:
704+
caseXACT_EVENT_PRE_COMMIT:
705+
/* Commit all remote transactions during pre-commit */
706+
do_sql_send_command(entry->conn,"COMMIT TRANSACTION");
707+
continue;
736708

737-
caseXACT_EVENT_PRE_PREPARE:
738-
/*
739-
* We disallow remote transactions that modified anything,
740-
* since it's not very reasonable to hold them open until
741-
* the prepared transaction is committed. For the moment,
742-
* throw error unconditionally; later we might allow
743-
* read-only cases. Note that the error will cause us to
744-
* come right back here with event == XACT_EVENT_ABORT, so
745-
* we'll clean up the connection state at that point.
746-
*/
747-
ereport(ERROR,
748-
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
749-
errmsg("cannot prepare a transaction that modified remote tables")));
750-
break;
751-
752-
caseXACT_EVENT_PARALLEL_COMMIT:
753-
caseXACT_EVENT_COMMIT:
754-
caseXACT_EVENT_PREPARE:
755-
do_sql_wait_command(entry->conn,"COMMIT TRANSACTION");
756-
/*
757-
* If there were any errors in subtransactions, and we
758-
* made prepared statements, do a DEALLOCATE ALL to make
759-
* sure we get rid of all prepared statements. This is
760-
* annoying and not terribly bulletproof, but it's
761-
* probably not worth trying harder.
762-
*
763-
* DEALLOCATE ALL only exists in 8.3 and later, so this
764-
* constrains how old a server postgres_fdw can
765-
* communicate with. We intentionally ignore errors in
766-
* the DEALLOCATE, so that we can hobble along to some
767-
* extent with older servers (leaking prepared statements
768-
* as we go; but we don't really support update operations
769-
* pre-8.3 anyway).
770-
*/
709+
caseXACT_EVENT_PRE_PREPARE:
710+
/*
711+
* We disallow remote transactions that modified anything,
712+
* since it's not very reasonable to hold them open until
713+
* the prepared transaction is committed. For the moment,
714+
* throw error unconditionally; later we might allow
715+
* read-only cases. Note that the error will cause us to
716+
* come right back here with event == XACT_EVENT_ABORT, so
717+
* we'll clean up the connection state at that point.
718+
*/
719+
ereport(ERROR,
720+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
721+
errmsg("cannot prepare a transaction that modified remote tables")));
722+
break;
723+
724+
caseXACT_EVENT_PARALLEL_COMMIT:
725+
caseXACT_EVENT_COMMIT:
726+
caseXACT_EVENT_PREPARE:
727+
if (!currentGlobalTransactionId)
728+
{
729+
do_sql_wait_command(entry->conn,"COMMIT TRANSACTION");
730+
}
731+
/*
732+
* If there were any errors in subtransactions, and we
733+
* made prepared statements, do a DEALLOCATE ALL to make
734+
* sure we get rid of all prepared statements. This is
735+
* annoying and not terribly bulletproof, but it's
736+
* probably not worth trying harder.
737+
*
738+
* DEALLOCATE ALL only exists in 8.3 and later, so this
739+
* constrains how old a server postgres_fdw can
740+
* communicate with. We intentionally ignore errors in
741+
* the DEALLOCATE, so that we can hobble along to some
742+
* extent with older servers (leaking prepared statements
743+
* as we go; but we don't really support update operations
744+
* pre-8.3 anyway).
745+
*/
746+
if (entry->have_prep_stmt&&entry->have_error)
747+
{
748+
res=PQexec(entry->conn,"DEALLOCATE ALL");
749+
PQclear(res);
750+
}
751+
entry->have_prep_stmt= false;
752+
entry->have_error= false;
753+
break;
754+
755+
caseXACT_EVENT_PARALLEL_ABORT:
756+
caseXACT_EVENT_ABORT:
757+
/* Assume we might have lost track of prepared statements */
758+
entry->have_error= true;
759+
/* If we're aborting, abort all remote transactions too */
760+
res=PQexec(entry->conn,"ABORT TRANSACTION");
761+
/* Note: can't throw ERROR, it would be infinite loop */
762+
if (PQresultStatus(res)!=PGRES_COMMAND_OK)
763+
pgfdw_report_error(WARNING,res,entry->conn, true,
764+
"ABORT TRANSACTION");
765+
else
766+
{
767+
PQclear(res);
768+
/* As above, make sure to clear any prepared stmts */
771769
if (entry->have_prep_stmt&&entry->have_error)
772770
{
773771
res=PQexec(entry->conn,"DEALLOCATE ALL");
774772
PQclear(res);
775773
}
776774
entry->have_prep_stmt= false;
777775
entry->have_error= false;
778-
break;
779-
780-
caseXACT_EVENT_PARALLEL_ABORT:
781-
caseXACT_EVENT_ABORT:
782-
/* Assume we might have lost track of prepared statements */
783-
entry->have_error= true;
784-
/* If we're aborting, abort all remote transactions too */
785-
res=PQexec(entry->conn,"ABORT TRANSACTION");
786-
/* Note: can't throw ERROR, it would be infinite loop */
787-
if (PQresultStatus(res)!=PGRES_COMMAND_OK)
788-
pgfdw_report_error(WARNING,res,entry->conn, true,
789-
"ABORT TRANSACTION");
790-
else
791-
{
792-
PQclear(res);
793-
/* As above, make sure to clear any prepared stmts */
794-
if (entry->have_prep_stmt&&entry->have_error)
795-
{
796-
res=PQexec(entry->conn,"DEALLOCATE ALL");
797-
PQclear(res);
798-
}
799-
entry->have_prep_stmt= false;
800-
entry->have_error= false;
801-
}
802-
break;
803-
804-
caseXACT_EVENT_START:
805-
caseXACT_EVENT_ABORT_PREPARED:
806-
caseXACT_EVENT_COMMIT_PREPARED:
807-
break;
808-
}
809-
}
810-
/* Reset state to show we're out of a transaction */
811-
entry->xact_depth=0;
812-
813-
/*
814-
* If the connection isn't in a good idle state, discard it to
815-
* recover. Next GetConnection will open a new connection.
816-
*/
817-
if (PQstatus(entry->conn)!=CONNECTION_OK||
818-
PQtransactionStatus(entry->conn)!=PQTRANS_IDLE)
819-
{
820-
elog(DEBUG3,"discarding connection %p, conn status=%d, trans status=%d",entry->conn,PQstatus(entry->conn),PQtransactionStatus(entry->conn));
821-
PQfinish(entry->conn);
822-
entry->conn=NULL;
776+
}
777+
break;
778+
779+
caseXACT_EVENT_START:
780+
caseXACT_EVENT_ABORT_PREPARED:
781+
caseXACT_EVENT_COMMIT_PREPARED:
782+
break;
823783
}
824784
}
785+
/* Reset state to show we're out of a transaction */
786+
entry->xact_depth=0;
787+
788+
/*
789+
* If the connection isn't in a good idle state, discard it to
790+
* recover. Next GetConnection will open a new connection.
791+
*/
792+
if (PQstatus(entry->conn)!=CONNECTION_OK||
793+
PQtransactionStatus(entry->conn)!=PQTRANS_IDLE)
794+
{
795+
elog(WARNING,"discarding connection %p, conn status=%d, trans status=%d",entry->conn,PQstatus(entry->conn),PQtransactionStatus(entry->conn));
796+
PQfinish(entry->conn);
797+
entry->conn=NULL;
798+
}
825799
}
826800
if (event!=XACT_EVENT_PARALLEL_PRE_COMMIT&&event!=XACT_EVENT_PRE_COMMIT) {
827801
/*
@@ -833,6 +807,8 @@ pgfdw_xact_callback(XactEvent event, void *arg)
833807

834808
/* Also reset cursor numbering for next transaction */
835809
cursor_number=0;
810+
811+
currentGlobalTransactionId=0;
836812
}
837813
}
838814

‎contrib/postgres_fdw/tests/dtmbench.cpp‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,8 @@ void initializeDatabase()
160160
int accountsPerShard = (cfg.nAccounts + cfg.nShards -1)/cfg.nShards;
161161
for (int i =0; i < cfg.nShards; i++)
162162
{
163-
exec(txn,"alter table t_fdw%i add check (u between %d and %d)", i+1, accountsPerShard*i, accountsPerShard-1);
164-
exec(txn,"insert into t_fdw%i (select generate_series(%d,%d), %d)", i+1, accountsPerShard*i, accountsPerShard-1,0);
163+
exec(txn,"alter table t_fdw%i add check (u between %d and %d)", i+1, accountsPerShard*i, accountsPerShard*(i+1)-1);
164+
exec(txn,"insert into t_fdw%i (select generate_series(%d,%d), %d)", i+1, accountsPerShard*i, accountsPerShard*(i+1)-1,0);
165165
}
166166
txn.commit();
167167
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp