2222#include "storage/latch.h"
2323#include "storage/proc.h"
2424#include "utils/hsearch.h"
25+ #include "utils/inval.h"
2526#include "utils/memutils.h"
2627#include "utils/syscache.h"
2728#include "utils/timestamp.h"
@@ -51,11 +52,15 @@ typedef struct ConnCacheEntry
5152{
5253ConnCacheKey key ;/* hash key (must be first) */
5354PGconn * conn ;/* connection to foreign server, or NULL */
55+ /* Remaining fields are invalid when conn is NULL: */
5456int xact_depth ;/* 0 = no xact open, 1 = main xact open, 2 =
5557 * one level of subxact open, etc */
5658bool have_prep_stmt ;/* have we prepared any stmts in this xact? */
5759bool have_error ;/* have any subxacts aborted in this xact? */
5860bool changing_xact_state ;/* xact state change in process */
61+ bool invalidated ;/* true if reconnect is pending */
62+ uint32 server_hashvalue ;/* hash value of foreign server OID */
63+ uint32 mapping_hashvalue ;/* hash value of user mapping OID */
5964}ConnCacheEntry ;
6065
6166/*
@@ -72,6 +77,7 @@ static bool xact_got_connection = false;
7277
7378/* prototypes of private functions */
7479static PGconn * connect_pg_server (ForeignServer * server ,UserMapping * user );
80+ static void disconnect_pg_server (ConnCacheEntry * entry );
7581static void check_conn_params (const char * * keywords ,const char * * values );
7682static void configure_remote_session (PGconn * conn );
7783static void do_sql_command (PGconn * conn ,const char * sql );
@@ -81,6 +87,7 @@ static void pgfdw_subxact_callback(SubXactEvent event,
8187SubTransactionId mySubid ,
8288SubTransactionId parentSubid ,
8389void * arg );
90+ static void pgfdw_inval_callback (Datum arg ,int cacheid ,uint32 hashvalue );
8491static void pgfdw_reject_incomplete_xact_state_change (ConnCacheEntry * entry );
8592static bool pgfdw_cancel_query (PGconn * conn );
8693static bool pgfdw_exec_cleanup_query (PGconn * conn ,const char * query ,
@@ -98,13 +105,6 @@ static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
98105 * will_prep_stmt must be true if caller intends to create any prepared
99106 * statements. Since those don't go away automatically at transaction end
100107 * (not even on error), we need this flag to cue manual cleanup.
101- *
102- * XXX Note that caching connections theoretically requires a mechanism to
103- * detect change of FDW objects to invalidate already established connections.
104- * We could manage that by watching for invalidation events on the relevant
105- * syscaches. For the moment, though, it's not clear that this would really
106- * be useful and not mere pedantry. We could not flush any active connections
107- * mid-transaction anyway.
108108 */
109109PGconn *
110110GetConnection (ForeignServer * server ,UserMapping * user ,
@@ -135,6 +135,10 @@ GetConnection(ForeignServer *server, UserMapping *user,
135135 */
136136RegisterXactCallback (pgfdw_xact_callback ,NULL );
137137RegisterSubXactCallback (pgfdw_subxact_callback ,NULL );
138+ CacheRegisterSyscacheCallback (FOREIGNSERVEROID ,
139+ pgfdw_inval_callback , (Datum )0 );
140+ CacheRegisterSyscacheCallback (USERMAPPINGOID ,
141+ pgfdw_inval_callback , (Datum )0 );
138142}
139143
140144/* Set flag that we did GetConnection during the current transaction */
@@ -150,17 +154,27 @@ GetConnection(ForeignServer *server, UserMapping *user,
150154entry = hash_search (ConnectionHash ,& key ,HASH_ENTER ,& found );
151155if (!found )
152156{
153- /* initialize new hashtable entry (key is already filled in) */
157+ /*
158+ * We need only clear "conn" here; remaining fields will be filled
159+ * later when "conn" is set.
160+ */
154161entry -> conn = NULL ;
155- entry -> xact_depth = 0 ;
156- entry -> have_prep_stmt = false;
157- entry -> have_error = false;
158- entry -> changing_xact_state = false;
159162}
160163
161164/* Reject further use of connections which failed abort cleanup. */
162165pgfdw_reject_incomplete_xact_state_change (entry );
163166
167+ /*
168+ * If the connection needs to be remade due to invalidation, disconnect as
169+ * soon as we're out of all transactions.
170+ */
171+ if (entry -> conn != NULL && entry -> invalidated && entry -> xact_depth == 0 )
172+ {
173+ elog (DEBUG3 ,"closing connection %p for option changes to take effect" ,
174+ entry -> conn );
175+ disconnect_pg_server (entry );
176+ }
177+
164178/*
165179 * We don't check the health of cached connection here, because it would
166180 * require some overhead. Broken connection will be detected when the
@@ -170,13 +184,36 @@ GetConnection(ForeignServer *server, UserMapping *user,
170184/*
171185 * If cache entry doesn't have a connection, we have to establish a new
172186 * connection. (If connect_pg_server throws an error, the cache entry
173- * willbe left in a valid empty state.)
187+ * willremain in a valid empty state, ie conn == NULL .)
174188 */
175189if (entry -> conn == NULL )
176190{
177- entry -> xact_depth = 0 ;/* just to be sure */
191+ Oid umoid ;
192+
193+ /* Reset all transient state fields, to be sure all are clean */
194+ entry -> xact_depth = 0 ;
178195entry -> have_prep_stmt = false;
179196entry -> have_error = false;
197+ entry -> changing_xact_state = false;
198+ entry -> invalidated = false;
199+ entry -> server_hashvalue =
200+ GetSysCacheHashValue1 (FOREIGNSERVEROID ,
201+ ObjectIdGetDatum (server -> serverid ));
202+ /* Pre-9.6, UserMapping doesn't store its OID, so look it up again */
203+ umoid = GetSysCacheOid2 (USERMAPPINGUSERSERVER ,
204+ ObjectIdGetDatum (user -> userid ),
205+ ObjectIdGetDatum (user -> serverid ));
206+ if (!OidIsValid (umoid ))
207+ {
208+ /* Not found for the specific user -- try PUBLIC */
209+ umoid = GetSysCacheOid2 (USERMAPPINGUSERSERVER ,
210+ ObjectIdGetDatum (InvalidOid ),
211+ ObjectIdGetDatum (user -> serverid ));
212+ }
213+ entry -> mapping_hashvalue =
214+ GetSysCacheHashValue1 (USERMAPPINGOID ,ObjectIdGetDatum (umoid ));
215+
216+ /* Now try to make the connection */
180217entry -> conn = connect_pg_server (server ,user );
181218elog (DEBUG3 ,"new postgres_fdw connection %p for server \"%s\"" ,
182219entry -> conn ,server -> servername );
@@ -289,6 +326,19 @@ connect_pg_server(ForeignServer *server, UserMapping *user)
289326return conn ;
290327}
291328
329+ /*
330+ * Disconnect any open connection for a connection cache entry.
331+ */
332+ static void
333+ disconnect_pg_server (ConnCacheEntry * entry )
334+ {
335+ if (entry -> conn != NULL )
336+ {
337+ PQfinish (entry -> conn );
338+ entry -> conn = NULL ;
339+ }
340+ }
341+
292342/*
293343 * For non-superusers, insist that the connstr specify a password. This
294344 * prevents a password from being picked up from .pgpass, a service file,
@@ -787,9 +837,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
787837entry -> changing_xact_state )
788838{
789839elog (DEBUG3 ,"discarding connection %p" ,entry -> conn );
790- PQfinish (entry -> conn );
791- entry -> conn = NULL ;
792- entry -> changing_xact_state = false;
840+ disconnect_pg_server (entry );
793841}
794842}
795843
@@ -906,6 +954,47 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
906954}
907955}
908956
957+ /*
958+ * Connection invalidation callback function
959+ *
960+ * After a change to a pg_foreign_server or pg_user_mapping catalog entry,
961+ * mark connections depending on that entry as needing to be remade.
962+ * We can't immediately destroy them, since they might be in the midst of
963+ * a transaction, but we'll remake them at the next opportunity.
964+ *
965+ * Although most cache invalidation callbacks blow away all the related stuff
966+ * regardless of the given hashvalue, connections are expensive enough that
967+ * it's worth trying to avoid that.
968+ *
969+ * NB: We could avoid unnecessary disconnection more strictly by examining
970+ * individual option values, but it seems too much effort for the gain.
971+ */
972+ static void
973+ pgfdw_inval_callback (Datum arg ,int cacheid ,uint32 hashvalue )
974+ {
975+ HASH_SEQ_STATUS scan ;
976+ ConnCacheEntry * entry ;
977+
978+ Assert (cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID );
979+
980+ /* ConnectionHash must exist already, if we're registered */
981+ hash_seq_init (& scan ,ConnectionHash );
982+ while ((entry = (ConnCacheEntry * )hash_seq_search (& scan )))
983+ {
984+ /* Ignore invalid entries */
985+ if (entry -> conn == NULL )
986+ continue ;
987+
988+ /* hashvalue == 0 means a cache reset, must clear all state */
989+ if (hashvalue == 0 ||
990+ (cacheid == FOREIGNSERVEROID &&
991+ entry -> server_hashvalue == hashvalue )||
992+ (cacheid == USERMAPPINGOID &&
993+ entry -> mapping_hashvalue == hashvalue ))
994+ entry -> invalidated = true;
995+ }
996+ }
997+
909998/*
910999 * Raise an error if the given connection cache entry is marked as being
9111000 * in the middle of an xact state change. This should be called at which no
@@ -921,10 +1010,16 @@ pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
9211010{
9221011ForeignServer * server ;
9231012
924- if (!entry -> changing_xact_state )
1013+ /* nothing to do for inactive entries and entries of sane state */
1014+ if (entry -> conn == NULL || !entry -> changing_xact_state )
9251015return ;
9261016
1017+ /* make sure this entry is inactive */
1018+ disconnect_pg_server (entry );
1019+
1020+ /* find server name to be shown in the message below */
9271021server = GetForeignServer (entry -> key .serverid );
1022+
9281023ereport (ERROR ,
9291024(errcode (ERRCODE_CONNECTION_EXCEPTION ),
9301025errmsg ("connection to server \"%s\" was lost" ,