|
21 | 21 | #include"access/transam.h"
|
22 | 22 | #include"access/xlog.h"
|
23 | 23 | #include"libpq-int.h"
|
| 24 | +#include"access/xlog.h" |
24 | 25 | #include"mb/pg_wchar.h"
|
25 | 26 | #include"miscadmin.h"
|
26 | 27 | #include"pgstat.h"
|
|
30 | 31 | #include"utils/memutils.h"
|
31 | 32 | #include"utils/syscache.h"
|
32 | 33 |
|
33 |
| - |
34 | 34 | /*
|
35 | 35 | * Connection cache hash table entry
|
36 | 36 | *
|
@@ -103,6 +103,55 @@ static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
|
103 | 103 | staticboolpgfdw_get_cleanup_result(PGconn*conn,TimestampTzendtime,
|
104 | 104 | PGresult**result);
|
105 | 105 |
|
| 106 | +staticintDistributedTransactionCount; |
| 107 | +staticintDistributedTransactionParticipantsCount; |
| 108 | +staticchar*DistributedTransactionGid; |
| 109 | + |
| 110 | +/* Parallel send of sql statement to all paritcipants nodes |
| 111 | + * and wait status |
| 112 | + */ |
| 113 | +staticbool |
| 114 | +BroadcastStatement(charconst*sql,unsignedexpectedStatus) |
| 115 | +{ |
| 116 | +HASH_SEQ_STATUSscan; |
| 117 | +ConnCacheEntry*entry; |
| 118 | +boolallOk= true; |
| 119 | + |
| 120 | +hash_seq_init(&scan,ConnectionHash); |
| 121 | +while ((entry= (ConnCacheEntry*)hash_seq_search(&scan))) |
| 122 | +{ |
| 123 | +if (entry->xact_depth>0) |
| 124 | +{ |
| 125 | +do_sql_send_command(entry->conn,sql); |
| 126 | +} |
| 127 | +} |
| 128 | + |
| 129 | +hash_seq_init(&scan,ConnectionHash); |
| 130 | +while ((entry= (ConnCacheEntry*)hash_seq_search(&scan))) |
| 131 | +{ |
| 132 | +if (entry->xact_depth>0) |
| 133 | +{ |
| 134 | +PGresult*result=PQgetResult(entry->conn); |
| 135 | + |
| 136 | +if (PQresultStatus(result)!=expectedStatus) |
| 137 | +{ |
| 138 | +elog(WARNING,"Failed command %s: status=%d, expected status=%d",sql,PQresultStatus(result),expectedStatus); |
| 139 | +pgfdw_report_error(ERROR,result,entry->conn, true,sql); |
| 140 | +allOk= false; |
| 141 | +} |
| 142 | +PQclear(result); |
| 143 | +PQgetResult(entry->conn);/* consume NULL result */ |
| 144 | +} |
| 145 | +} |
| 146 | +returnallOk; |
| 147 | +} |
| 148 | + |
| 149 | +staticbool |
| 150 | +BroadcastCommand(charconst*sql) |
| 151 | +{ |
| 152 | +returnBroadcastStatement(sql,PGRES_COMMAND_OK); |
| 153 | +} |
| 154 | + |
106 | 155 |
|
107 | 156 | /*
|
108 | 157 | * Get a PGconn which can be used to execute queries on the remote PostgreSQL
|
@@ -457,31 +506,23 @@ begin_remote_xact(ConnCacheEntry *entry)
|
457 | 506 | /* Start main transaction if we haven't yet */
|
458 | 507 | if (entry->xact_depth <=0)
|
459 | 508 | {
|
460 |
| -TransactionIdgxid=GetTransactionManager()->GetGlobalTransactionId(); |
461 | 509 | constchar*sql;
|
462 | 510 |
|
463 | 511 | elog(DEBUG3,"starting remote transaction on connection %p",
|
464 | 512 | entry->conn);
|
465 | 513 |
|
466 |
| -// XXXX? |
467 |
| -// |
468 |
| -// if (UseTsDtmTransactions && TransactionIdIsValid(gxid)) |
469 |
| -// { |
470 |
| -// charstmt[64]; |
471 |
| -// snprintf(stmt, sizeof(stmt), "select public.dtm_join_transaction(%d)", gxid); |
472 |
| -// res = PQexec(entry->conn, stmt); |
473 |
| -// PQclear(res); |
474 |
| -// } |
475 |
| - |
476 | 514 | if (IsolationIsSerializable())
|
477 | 515 | sql="START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
|
478 |
| -else |
| 516 | +elseif (UseRepeatableRead) |
479 | 517 | sql="START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
|
| 518 | +else |
| 519 | +sql="START TRANSACTION"; |
480 | 520 | entry->changing_xact_state= true;
|
481 | 521 | do_sql_command(entry->conn,sql);
|
482 | 522 | entry->xact_depth=1;
|
483 | 523 | entry->changing_xact_state= false;
|
484 | 524 |
|
| 525 | + |
485 | 526 | if (UseTsDtmTransactions)
|
486 | 527 | {
|
487 | 528 | if (currentConnection==NULL)
|
@@ -516,6 +557,8 @@ begin_remote_xact(ConnCacheEntry *entry)
|
516 | 557 | PQclear(res);
|
517 | 558 | }
|
518 | 559 | }
|
| 560 | + |
| 561 | +DistributedTransactionParticipantsCount+=1; |
519 | 562 | }
|
520 | 563 |
|
521 | 564 | /*
|
@@ -805,55 +848,19 @@ pgfdw_xact_callback(XactEvent event, void *arg)
|
805 | 848 | {
|
806 | 849 | HASH_SEQ_STATUSscan;
|
807 | 850 | ConnCacheEntry*entry;
|
808 |
| - |
809 |
| -// /* Do nothing for this events */ |
810 |
| -// switch (event) |
811 |
| -// { |
812 |
| -// case XACT_EVENT_START: |
813 |
| -// case XACT_EVENT_COMMIT_PREPARED: |
814 |
| -// case XACT_EVENT_ABORT_PREPARED: |
815 |
| -// return; |
816 |
| -// default: |
817 |
| -// break; |
818 |
| -// } |
| 851 | +booltwo_phase_commit; |
819 | 852 |
|
820 | 853 | /* Quick exit if no connections were touched in this transaction. */
|
821 | 854 | if (!xact_got_connection)
|
822 | 855 | return;
|
823 | 856 |
|
824 |
| -if (currentGlobalTransactionId!=0) |
825 |
| -{ |
826 |
| -switch (event) |
827 |
| -{ |
828 |
| -caseXACT_EVENT_PARALLEL_PRE_COMMIT: |
829 |
| -caseXACT_EVENT_PRE_COMMIT: |
830 |
| -{ |
831 |
| -csn_tmaxCSN=0; |
832 |
| - |
833 |
| -if (!RunDtmCommand(psprintf("PREPARE TRANSACTION '%d.%d'", |
834 |
| -MyProcPid,currentLocalTransactionId))|| |
835 |
| -!RunDtmFunction(psprintf("SELECT pg_global_snaphot_begin_prepare('%d.%d')", |
836 |
| -MyProcPid,currentLocalTransactionId))|| |
837 |
| -!RunDtmStatement(psprintf("SELECT pg_global_snaphot_prepare('%d.%d',0)", |
838 |
| -MyProcPid,currentLocalTransactionId),PGRES_TUPLES_OK,DtmMaxCSN,&maxCSN)|| |
839 |
| -!RunDtmFunction(psprintf("SELECT pg_global_snaphot_end_prepare('%d.%d',%lld)", |
840 |
| -MyProcPid,currentLocalTransactionId,maxCSN))|| |
841 |
| -!RunDtmCommand(psprintf("COMMIT PREPARED '%d.%d'", |
842 |
| -MyProcPid,currentLocalTransactionId))) |
843 |
| -{ |
844 |
| -RunDtmCommand(psprintf("ROLLBACK PREPARED '%d.%d'", |
845 |
| -MyProcPid,currentLocalTransactionId)); |
846 |
| -ereport(ERROR, |
847 |
| -(errcode(ERRCODE_TRANSACTION_ROLLBACK), |
848 |
| -errmsg("transaction was aborted at one of the shards"))); |
849 |
| -break; |
850 |
| -} |
851 |
| -return; |
852 |
| -} |
853 |
| -default: |
854 |
| -break; |
855 |
| -} |
856 |
| -} |
| 857 | +/***********************************************************************************************/ |
| 858 | + |
| 859 | +/* Check if we need to perform 2PC commit: number of paritcipants should be greater than 1 */ |
| 860 | +two_phase_commit=Use2PC |
| 861 | +&& (TransactionIdIsValid(GetCurrentTransactionIdIfAny())+DistributedTransactionParticipantsCount)>1; |
| 862 | + |
| 863 | +/***********************************************************************************************/ |
857 | 864 |
|
858 | 865 | /*
|
859 | 866 | * Scan all connection cache entries to find open remote transactions, and
|
@@ -891,10 +898,9 @@ pgfdw_xact_callback(XactEvent event, void *arg)
|
891 | 898 | * we can't issue any more commands against it.
|
892 | 899 | */
|
893 | 900 | pgfdw_reject_incomplete_xact_state_change(entry);
|
894 |
| - |
895 |
| -/* Commit all remote transactions during pre-commit */ |
896 |
| -if (!currentGlobalTransactionId) |
| 901 | +if (!two_phase_commit&& !currentGlobalTransactionId) |
897 | 902 | {
|
| 903 | +/* Commit all remote transactions during pre-commit */ |
898 | 904 | entry->changing_xact_state= true;
|
899 | 905 | do_sql_command(entry->conn,"COMMIT TRANSACTION");
|
900 | 906 | entry->changing_xact_state= false;
|
@@ -922,6 +928,13 @@ pgfdw_xact_callback(XactEvent event, void *arg)
|
922 | 928 | }
|
923 | 929 | entry->have_prep_stmt= false;
|
924 | 930 | entry->have_error= false;
|
| 931 | + |
| 932 | +if (two_phase_commit) |
| 933 | +{ |
| 934 | +/* Do not reset xact_depth and break connection: we still need them for second phase |
| 935 | + */ |
| 936 | +continue; |
| 937 | +} |
925 | 938 | break;
|
926 | 939 | caseXACT_EVENT_PRE_PREPARE:
|
927 | 940 |
|
@@ -1025,21 +1038,77 @@ pgfdw_xact_callback(XactEvent event, void *arg)
|
1025 | 1038 | disconnect_pg_server(entry);
|
1026 | 1039 | }
|
1027 | 1040 | }
|
1028 |
| -// if (event != XACT_EVENT_PARALLEL_PRE_COMMIT && event != XACT_EVENT_PRE_COMMIT) |
1029 |
| -// { |
1030 |
| -/* |
1031 |
| - * Regardless of the event type, we can now mark ourselves as out of the |
1032 |
| - * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE, |
1033 |
| - * this saves a useless scan of the hashtable during COMMIT or PREPARE.) |
1034 |
| - */ |
1035 |
| -xact_got_connection= false; |
1036 | 1041 |
|
1037 |
| -/* Also reset cursor numbering for next transaction */ |
1038 |
| -cursor_number=0; |
| 1042 | +/***********************************************************************************************/ |
| 1043 | + |
| 1044 | +/* |
| 1045 | + * In case of 2PC broadcast PREPARE TRANSACTION statement. |
| 1046 | + * We are using BroadcasrCommand instead of sending them in the connection |
| 1047 | + * iterator above because we want to process them in parallel |
| 1048 | + */ |
| 1049 | + |
| 1050 | +if (currentGlobalTransactionId!=0&& |
| 1051 | +(event==XACT_EVENT_PARALLEL_PRE_COMMIT||event==XACT_EVENT_PRE_COMMIT)) |
| 1052 | +{ |
| 1053 | +csn_tmaxCSN=0; |
| 1054 | + |
| 1055 | +if (!RunDtmCommand(psprintf("PREPARE TRANSACTION '%d.%d'", |
| 1056 | +MyProcPid,currentLocalTransactionId))|| |
| 1057 | +!RunDtmFunction(psprintf("SELECT pg_global_snaphot_begin_prepare('%d.%d')", |
| 1058 | +MyProcPid,currentLocalTransactionId))|| |
| 1059 | +!RunDtmStatement(psprintf("SELECT pg_global_snaphot_prepare('%d.%d',0)", |
| 1060 | +MyProcPid,currentLocalTransactionId),PGRES_TUPLES_OK,DtmMaxCSN,&maxCSN)|| |
| 1061 | +!RunDtmFunction(psprintf("SELECT pg_global_snaphot_end_prepare('%d.%d',%lld)", |
| 1062 | +MyProcPid,currentLocalTransactionId,maxCSN))|| |
| 1063 | +!RunDtmCommand(psprintf("COMMIT PREPARED '%d.%d'", |
| 1064 | +MyProcPid,currentLocalTransactionId))) |
| 1065 | +{ |
| 1066 | +RunDtmCommand(psprintf("ROLLBACK PREPARED '%d.%d'", |
| 1067 | +MyProcPid,currentLocalTransactionId)); |
| 1068 | +ereport(ERROR, |
| 1069 | +(errcode(ERRCODE_TRANSACTION_ROLLBACK), |
| 1070 | +errmsg("transaction was aborted at one of the shards"))); |
| 1071 | +} |
| 1072 | +return; |
| 1073 | +} |
| 1074 | + |
| 1075 | +elseif (two_phase_commit&& |
| 1076 | +(event==XACT_EVENT_PARALLEL_PRE_COMMIT||event==XACT_EVENT_PRE_COMMIT)) |
| 1077 | +{ |
| 1078 | +DistributedTransactionGid=psprintf("%d:%d:%lld:%lld:%d", |
| 1079 | +MyProcPid, |
| 1080 | +++DistributedTransactionCount, |
| 1081 | + (long long)GetSystemIdentifier(), |
| 1082 | + (long long)GetCurrentTransactionId(), |
| 1083 | +DistributedTransactionParticipantsCount); |
| 1084 | +if (!BroadcastCommand(psprintf("PREPARE TRANSACTION '%s'",DistributedTransactionGid))|| |
| 1085 | +!BroadcastCommand(psprintf("COMMIT PREPARED '%s'",DistributedTransactionGid))) |
| 1086 | +{ |
| 1087 | +BroadcastCommand(psprintf("ROLLBACK PREPARED '%s'",DistributedTransactionGid)); |
| 1088 | +ereport(ERROR, |
| 1089 | +(errcode(ERRCODE_TRANSACTION_ROLLBACK), |
| 1090 | +errmsg("Transaction %s was aborted at one of participants",DistributedTransactionGid))); |
| 1091 | +} |
| 1092 | +return; |
| 1093 | +} |
| 1094 | + |
| 1095 | + |
| 1096 | +/***********************************************************************************************/ |
| 1097 | + |
| 1098 | +/* |
| 1099 | + * Regardless of the event type, we can now mark ourselves as out of the |
| 1100 | + * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE, |
| 1101 | + * this saves a useless scan of the hashtable during COMMIT or PREPARE.) |
| 1102 | + */ |
| 1103 | +xact_got_connection= false; |
| 1104 | + |
| 1105 | +/* Also reset cursor numbering for next transaction */ |
| 1106 | +cursor_number=0; |
1039 | 1107 |
|
1040 |
| -currentGlobalTransactionId=0; |
1041 |
| -currentConnection=NULL; |
1042 |
| -// } |
| 1108 | +DistributedTransactionParticipantsCount=0; |
| 1109 | +DistributedTransactionGid=NULL; |
| 1110 | +currentGlobalTransactionId=0; |
| 1111 | +currentConnection=NULL; |
1043 | 1112 | }
|
1044 | 1113 |
|
1045 | 1114 | /*
|
|