7676#define MAX_ROUTES 16
7777#define BUFFER_SIZE 1024
7878
79+ typedef enum
80+ {
81+ MSG_PREPARE ,
82+ MSG_COMMIT ,
83+ MSG_ABORT
84+ }MessageCode ;
85+
86+
7987typedef struct
8088{
89+ MessageCode code ;/* Message code: MSG_PREPARE, MSG_COMMIT, MSG_ABORT
90+ int node; /* Sender node ID */
8191TransactionId dxid ;/* Transaction ID at destination node */
8292TransactionId sxid ;/* Transaction IO at sender node */
83- int node ;/* Sender node ID */
8493csn_t csn ;/* local CSN in case of sending data from replica to master, global CSN master->replica */
8594}DtmCommitMessage ;
8695
@@ -100,15 +109,15 @@ static BackgroundWorker DtmSender = {
100109"mm-sender" ,
101110BGWORKER_SHMEM_ACCESS |BGWORKER_BACKEND_DATABASE_CONNECTION ,/* do not need connection to the database */
102111BgWorkerStart_ConsistentState ,
103- 1 ,/*restrart in one second (is it possible torestort immediately?) */
112+ 1 ,/*restart in one second (is it possible torestart immediately?) */
104113DtmTransSender
105114};
106115
107116static BackgroundWorker DtmRecevier = {
108117"mm-receiver" ,
109118BGWORKER_SHMEM_ACCESS |BGWORKER_BACKEND_DATABASE_CONNECTION ,/* do not need connection to the database */
110119BgWorkerStart_ConsistentState ,
111- 1 ,/*restrart in one second (is it possible torestort immediately?) */
120+ 1 ,/*restart in one second (is it possible torestart immediately?) */
112121DtmTransReceiver
113122};
114123
@@ -300,6 +309,25 @@ static int readSocket(int sd, void* buf, int buf_size)
300309return rc ;
301310}
302311
312+ static bool IsCoordinator (DtmTransState * ts )
313+ {
314+ return ts -> dsid .node == MMNodeId ;
315+ }
316+
317+ static void DtmAppendBuffer (MessageCode code ,DtmBuffer * txBuffer ,TransactionId xid ,int node ,DtmTransState * ts )
318+ {
319+ DtmBuffer * buf = & txBuffer [node ];
320+ if (buf -> used == BUFFER_SIZE ) {
321+ writeSocket (sockets [node ],buf -> data ,buf -> used * sizeof (DtmCommitMessage ));
322+ buf -> used = 0 ;
323+ }
324+ buf -> data [buf -> used ].code = code ;
325+ buf -> data [buf -> used ].dxid = xid ;
326+ buf -> data [buf -> used ].sxid = ts -> xid ;
327+ buf -> data [buf -> used ].csn = ts -> status == TRANSACTION_STATUS_ABORTED ?INVALID_CSN :ts -> csn ;
328+ buf -> data [buf -> used ].node = MMNodeId ;
329+ buf -> used += 1 ;
330+ }
303331
304332static void DtmTransSender (Datum arg )
305333{
@@ -327,38 +355,18 @@ static void DtmTransSender(Datum arg)
327355SpinLockRelease (& ds -> votingSpinlock );
328356
329357for (;ts != NULL ;ts = ts -> nextVoting ) {
330- if (ts -> gtid . node == MMNodeId ) {
331- /* Coordinator is broadcastingconfirmations to replicas */
358+ if (IsCoordinator ( ts )) {
359+ /* Coordinator is broadcastingPREPARE message to replicas */
332360for (i = 0 ;i < nNodes ;i ++ ) {
333361if (TransactionIdIsValid (ts -> xids [i ])) {
334- if (txBuffer [i ].used == BUFFER_SIZE ) {
335- writeSocket (sockets [i ],txBuffer [i ].data ,txBuffer [i ].used * sizeof (DtmCommitMessage ));
336- txBuffer [i ].used = 0 ;
337- }
338- DTM_TRACE ("Send notification %ld to replica %d from coordinator %d for transaction %d (local transaction %d)\n" ,
339- ts -> csn ,i + 1 ,MMNodeId ,ts -> xid ,ts -> xids [i ]);
340-
341- txBuffer [i ].data [txBuffer [i ].used ].dxid = ts -> xids [i ];
342- txBuffer [i ].data [txBuffer [i ].used ].sxid = ts -> xid ;
343- txBuffer [i ].data [txBuffer [i ].used ].csn = ts -> csn ;
344- txBuffer [i ].data [txBuffer [i ].used ].node = MMNodeId ;
345- txBuffer [i ].used += 1 ;
362+ DtmAppendBuffer (CMD_PREPARE ,txBuffer ,ts -> xids [i ],i ,ts );
346363}
347364}
348365}else {
349- /* Replica is notifying master */
350- i = ts -> gtid .node - 1 ;
351- if (txBuffer [i ].used == BUFFER_SIZE ) {
352- writeSocket (sockets [i ],txBuffer [i ].data ,txBuffer [i ].used * sizeof (DtmCommitMessage ));
353- txBuffer [i ].used = 0 ;
354- }
366+ /* Replica is notifying master that it is ready to PREPARE */
355367DTM_TRACE ("Send notification %ld to coordinator %d from node %d for transaction %d (local transaction %d)\n" ,
356368ts -> csn ,ts -> gtid .node ,MMNodeId ,ts -> gtid .xid ,ts -> xid );
357- txBuffer [i ].data [txBuffer [i ].used ].dxid = ts -> gtid .xid ;
358- txBuffer [i ].data [txBuffer [i ].used ].sxid = ts -> xid ;
359- txBuffer [i ].data [txBuffer [i ].used ].node = MMNodeId ;
360- txBuffer [i ].data [txBuffer [i ].used ].csn = ts -> csn ;
361- txBuffer [i ].used += 1 ;
369+ DtmAppendBuffer (CMD_PREPARE ,txBuffer ,ts -> gtid .xid ,ts -> gtid .node - 1 ,ts );
362370}
363371}
364372for (i = 0 ;i < nNodes ;i ++ ) {
@@ -431,9 +439,33 @@ static void DtmTransReceiver(Datum arg)
431439DtmCommitMessage * msg = & rxBuffer [i ].data [j ];
432440DtmTransState * ts = (DtmTransState * )hash_search (xid2state ,& msg -> dxid ,HASH_FIND ,NULL );
433441Assert (ts != NULL );
434- if (msg -> csn > ts -> csn ) {
435- ts -> csn = msg -> csn ;
436- }
442+ switch (msg -> code ) {
443+ case CMD_PREPARE :
444+ if (IsCoordinator (ts )) {
445+ switch (msg -> command ) {
446+ case CMD_PREPARE :
447+
448+ if (ts -> state == TRANSACTION_STATUS_IN_PROGRESS :
449+ /* transaction is in-prepared stage (in-doubt): calculate max CSN */
450+ if (msg -> csn > ts -> csn ) {
451+ ts -> csn = msg -> csn ;
452+ }
453+ Assert (ts -> nVotes < dtm -> nNodes );
454+ if (++ ts -> nVotes == dtm -> nNodes ) {/* receive responses from all nodes */
455+ ts -> status = TRANSACTION_STATUS_COMMIT ;
456+
457+ if (ts -> state == TRANSACTION_STATUS_UNKNOWN ) {
458+ /* All nodes are ready to prepare: switch transaction to in-doubt state */
459+ ts -> csn = dtm_get_csn ();
460+ ts -> status = TRANSACTION_STATUS_IN_PROGRESS ;
461+ /* and broadcast PREPARE message */
462+ MMSendNotificationMessage (ts );
463+ }else if (ts -> state == CMD_ABORT ) {
464+ ts -> status = TRANSACTION_STATUS_ABORTED ;
465+
466+ }else {
467+ Assert (ts -> state == TRANSACTION_STATUS_IN_PROGRESS );
468+
437469Assert ((unsigned )(msg -> node - 1 ) <= (unsigned )nNodes );
438470ts -> xids [msg -> node - 1 ]= msg -> sxid ;
439471DTM_TRACE ("Receive response %ld for transaction %d votes %d from node %d (transaction %d)\n" ,