4444#include "storage/lmgr.h"
4545
4646#include "utils/builtins.h"
47+ #include "utils/guc.h"
4748#include "utils/lsyscache.h"
4849#include "utils/memutils.h"
4950#include "utils/syscache.h"
@@ -60,7 +61,7 @@ static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
6061static void
6162parse_subscription_options (List * options ,bool * connect ,bool * enabled_given ,
6263bool * enabled ,bool * create_slot ,char * * slot_name ,
63- bool * copy_data )
64+ bool * copy_data , char * * synchronous_commit )
6465{
6566ListCell * lc ;
6667bool connect_given = false;
@@ -80,6 +81,8 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
8081* slot_name = NULL ;
8182if (copy_data )
8283* copy_data = true;
84+ if (synchronous_commit )
85+ * synchronous_commit = NULL ;
8386
8487/* Parse options */
8588foreach (lc ,options )
@@ -165,6 +168,21 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
165168copy_data_given = true;
166169* copy_data = !defGetBoolean (defel );
167170}
171+ else if (strcmp (defel -> defname ,"synchronous_commit" )== 0 &&
172+ synchronous_commit )
173+ {
174+ if (* synchronous_commit )
175+ ereport (ERROR ,
176+ (errcode (ERRCODE_SYNTAX_ERROR ),
177+ errmsg ("conflicting or redundant options" )));
178+
179+ * synchronous_commit = defGetString (defel );
180+
181+ /* Test if the given value is valid for synchronous_commit GUC. */
182+ (void )set_config_option ("synchronous_commit" ,* synchronous_commit ,
183+ PGC_BACKEND ,PGC_S_TEST ,GUC_ACTION_SET ,
184+ false,0 , false);
185+ }
168186else
169187elog (ERROR ,"unrecognized option: %s" ,defel -> defname );
170188}
@@ -269,6 +287,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
269287bool enabled_given ;
270288bool enabled ;
271289bool copy_data ;
290+ char * synchronous_commit ;
272291char * conninfo ;
273292char * slotname ;
274293char originname [NAMEDATALEN ];
@@ -280,7 +299,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
280299 * Connection and publication should not be specified here.
281300 */
282301parse_subscription_options (stmt -> options ,& connect ,& enabled_given ,
283- & enabled ,& create_slot ,& slotname ,& copy_data );
302+ & enabled ,& create_slot ,& slotname ,& copy_data ,
303+ & synchronous_commit );
284304
285305/*
286306 * Since creating a replication slot is not transactional, rolling back
@@ -311,6 +331,9 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
311331
312332if (slotname == NULL )
313333slotname = stmt -> subname ;
334+ /* The default for synchronous_commit of subscriptions is off. */
335+ if (synchronous_commit == NULL )
336+ synchronous_commit = "off" ;
314337
315338conninfo = stmt -> conninfo ;
316339publications = stmt -> publication ;
@@ -334,6 +357,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
334357CStringGetTextDatum (conninfo );
335358values [Anum_pg_subscription_subslotname - 1 ]=
336359DirectFunctionCall1 (namein ,CStringGetDatum (slotname ));
360+ values [Anum_pg_subscription_subsynccommit - 1 ]=
361+ CStringGetTextDatum (synchronous_commit );
337362values [Anum_pg_subscription_subpublications - 1 ]=
338363publicationListToArray (publications );
339364
@@ -582,13 +607,24 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
582607case ALTER_SUBSCRIPTION_OPTIONS :
583608{
584609char * slot_name ;
610+ char * synchronous_commit ;
585611
586612parse_subscription_options (stmt -> options ,NULL ,NULL ,NULL ,
587- NULL ,& slot_name ,NULL );
613+ NULL ,& slot_name ,NULL ,
614+ & synchronous_commit );
588615
589- values [Anum_pg_subscription_subslotname - 1 ]=
590- DirectFunctionCall1 (namein ,CStringGetDatum (slot_name ));
591- replaces [Anum_pg_subscription_subslotname - 1 ]= true;
616+ if (slot_name )
617+ {
618+ values [Anum_pg_subscription_subslotname - 1 ]=
619+ DirectFunctionCall1 (namein ,CStringGetDatum (slot_name ));
620+ replaces [Anum_pg_subscription_subslotname - 1 ]= true;
621+ }
622+ if (synchronous_commit )
623+ {
624+ values [Anum_pg_subscription_subsynccommit - 1 ]=
625+ CStringGetTextDatum (synchronous_commit );
626+ replaces [Anum_pg_subscription_subsynccommit - 1 ]= true;
627+ }
592628
593629update_tuple = true;
594630break ;
@@ -601,7 +637,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
601637
602638parse_subscription_options (stmt -> options ,NULL ,
603639& enabled_given ,& enabled ,NULL ,
604- NULL ,NULL );
640+ NULL ,NULL , NULL );
605641Assert (enabled_given );
606642
607643values [Anum_pg_subscription_subenabled - 1 ]=
@@ -626,7 +662,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
626662Subscription * sub = GetSubscription (subid , false);
627663
628664parse_subscription_options (stmt -> options ,NULL ,NULL ,NULL ,
629- NULL ,NULL ,& copy_data );
665+ NULL ,NULL ,& copy_data , NULL );
630666
631667values [Anum_pg_subscription_subpublications - 1 ]=
632668publicationListToArray (stmt -> publication );
@@ -652,7 +688,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
652688Subscription * sub = GetSubscription (subid , false);
653689
654690parse_subscription_options (stmt -> options ,NULL ,NULL ,NULL ,
655- NULL ,NULL ,& copy_data );
691+ NULL ,NULL ,& copy_data , NULL );
656692
657693AlterSubscription_refresh (sub ,copy_data );
658694