@@ -247,8 +247,10 @@ typedef struct ApplyErrorCallbackArg
247247 * The action to be taken for the changes in the transaction.
248248 *
249249 * TRANS_LEADER_APPLY:
250- * This action means that we are in the leader apply worker and changes of the
251- * transaction are applied directly by the worker.
250+ * This action means that we are in the leader apply worker or table sync
251+ * worker. The changes of the transaction are either directly applied or
252+ * are read from temporary files (for streaming transactions) and then
253+ * applied by the worker.
252254 *
253255 * TRANS_LEADER_SERIALIZE:
254256 * This action means that we are in the leader apply worker or table sync
@@ -1004,6 +1006,9 @@ apply_handle_begin(StringInfo s)
10041006{
10051007LogicalRepBeginData begin_data ;
10061008
1009+ /* There must not be an active streaming transaction. */
1010+ Assert (!TransactionIdIsValid (stream_xid ));
1011+
10071012logicalrep_read_begin (s ,& begin_data );
10081013set_apply_error_context_xact (begin_data .xid ,begin_data .final_lsn );
10091014
@@ -1058,6 +1063,9 @@ apply_handle_begin_prepare(StringInfo s)
10581063(errcode (ERRCODE_PROTOCOL_VIOLATION ),
10591064errmsg_internal ("tablesync worker received a BEGIN PREPARE message" )));
10601065
1066+ /* There must not be an active streaming transaction. */
1067+ Assert (!TransactionIdIsValid (stream_xid ));
1068+
10611069logicalrep_read_begin_prepare (s ,& begin_data );
10621070set_apply_error_context_xact (begin_data .xid ,begin_data .prepare_lsn );
10631071
@@ -1301,7 +1309,7 @@ apply_handle_stream_prepare(StringInfo s)
13011309
13021310switch (apply_action )
13031311{
1304- case TRANS_LEADER_SERIALIZE :
1312+ case TRANS_LEADER_APPLY :
13051313
13061314/*
13071315 * The transaction has been serialized to file, so replay all the
@@ -1384,7 +1392,7 @@ apply_handle_stream_prepare(StringInfo s)
13841392break ;
13851393
13861394default :
1387- Assert (false );
1395+ elog ( ERROR , "unexpected apply action: %d" , ( int ) apply_action );
13881396break ;
13891397}
13901398
@@ -1484,6 +1492,9 @@ apply_handle_stream_start(StringInfo s)
14841492(errcode (ERRCODE_PROTOCOL_VIOLATION ),
14851493errmsg_internal ("duplicate STREAM START message" )));
14861494
1495+ /* There must not be an active streaming transaction. */
1496+ Assert (!TransactionIdIsValid (stream_xid ));
1497+
14871498/* notify handle methods we're processing a remote transaction */
14881499in_streamed_transaction = true;
14891500
@@ -1589,7 +1600,7 @@ apply_handle_stream_start(StringInfo s)
15891600break ;
15901601
15911602default :
1592- Assert (false );
1603+ elog ( ERROR , "unexpected apply action: %d" , ( int ) apply_action );
15931604break ;
15941605}
15951606
@@ -1705,11 +1716,12 @@ apply_handle_stream_stop(StringInfo s)
17051716break ;
17061717
17071718default :
1708- Assert (false );
1719+ elog ( ERROR , "unexpected apply action: %d" , ( int ) apply_action );
17091720break ;
17101721}
17111722
17121723in_streamed_transaction = false;
1724+ stream_xid = InvalidTransactionId ;
17131725
17141726/*
17151727 * The parallel apply worker could be in a transaction in which case we
@@ -1842,7 +1854,7 @@ apply_handle_stream_abort(StringInfo s)
18421854
18431855switch (apply_action )
18441856{
1845- case TRANS_LEADER_SERIALIZE :
1857+ case TRANS_LEADER_APPLY :
18461858
18471859/*
18481860 * We are in the leader apply worker and the transaction has been
@@ -1957,7 +1969,7 @@ apply_handle_stream_abort(StringInfo s)
19571969break ;
19581970
19591971default :
1960- Assert (false );
1972+ elog ( ERROR , "unexpected apply action: %d" , ( int ) apply_action );
19611973break ;
19621974}
19631975
@@ -2154,7 +2166,7 @@ apply_handle_stream_commit(StringInfo s)
21542166
21552167switch (apply_action )
21562168{
2157- case TRANS_LEADER_SERIALIZE :
2169+ case TRANS_LEADER_APPLY :
21582170
21592171/*
21602172 * The transaction has been serialized to file, so replay all the
@@ -2226,7 +2238,7 @@ apply_handle_stream_commit(StringInfo s)
22262238break ;
22272239
22282240default :
2229- Assert (false );
2241+ elog ( ERROR , "unexpected apply action: %d" , ( int ) apply_action );
22302242break ;
22312243}
22322244
@@ -4204,7 +4216,6 @@ stream_close_file(void)
42044216
42054217BufFileClose (stream_fd );
42064218
4207- stream_xid = InvalidTransactionId ;
42084219stream_fd = NULL ;
42094220}
42104221
@@ -4977,10 +4988,12 @@ set_apply_error_context_origin(char *originname)
49774988}
49784989
49794990/*
4980- * Return the action to be taken for the given transaction. *winfo is
4981- * assigned to the destination parallel worker info when the leader apply
4982- * worker has to pass all the transaction's changes to the parallel apply
4983- * worker.
4991+ * Return the action to be taken for the given transaction. See
4992+ * TransApplyAction for information on each of the actions.
4993+ *
4994+ * *winfo is assigned to the destination parallel worker info when the leader
4995+ * apply worker has to pass all the transaction's changes to the parallel
4996+ * apply worker.
49844997 */
49854998static TransApplyAction
49864999get_transaction_apply_action (TransactionId xid ,ParallelApplyWorkerInfo * * winfo )
@@ -4991,27 +5004,35 @@ get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
49915004{
49925005return TRANS_PARALLEL_APPLY ;
49935006}
4994- else if (in_remote_transaction )
4995- {
4996- return TRANS_LEADER_APPLY ;
4997- }
49985007
49995008/*
5000- * Check if we are processing this transaction using a parallel apply
5001- * worker.
5009+ * If we are processing this transaction using a parallel apply worker then
5010+ * either we send the changes to the parallel worker or if the worker is busy
5011+ * then serialize the changes to the file which will later be processed by
5012+ * the parallel worker.
50025013 */
50035014* winfo = pa_find_worker (xid );
50045015
5005- if (! * winfo )
5016+ if (* winfo && ( * winfo ) -> serialize_changes )
50065017{
5007- return TRANS_LEADER_SERIALIZE ;
5018+ return TRANS_LEADER_PARTIAL_SERIALIZE ;
50085019}
5009- else if (( * winfo ) -> serialize_changes )
5020+ else if (* winfo )
50105021{
5011- return TRANS_LEADER_PARTIAL_SERIALIZE ;
5022+ return TRANS_LEADER_SEND_TO_PARALLEL ;
5023+ }
5024+
5025+ /*
5026+ * If there is no parallel worker involved to process this transaction then
5027+ * we either directly apply the change or serialize it to a file which will
5028+ * later be applied when the transaction finish message is processed.
5029+ */
5030+ else if (in_streamed_transaction )
5031+ {
5032+ return TRANS_LEADER_SERIALIZE ;
50125033}
50135034else
50145035{
5015- return TRANS_LEADER_SEND_TO_PARALLEL ;
5036+ return TRANS_LEADER_APPLY ;
50165037}
50175038}