6060
6161typedef struct remoteConn
6262{
63- PGconn * conn ;/* Hold the remote connection */
64- int autoXactCursors ; /*Indicates the number of open cursors,
65- * non-zero means we opened the xact ourselves */
63+ PGconn * conn ;/* Hold the remote connection */
64+ int openCursorCount ; /*The number of open cursors */
65+ bool newXactForCursor ; /* Opened a transaction for a cursor */
6666}remoteConn ;
6767
6868/*
@@ -84,10 +84,8 @@ static Oidget_relid_from_relname(text *relname_text);
8484static char * generate_relation_name (Oid relid );
8585
8686/* Global */
87- List * res_id = NIL ;
88- int res_id_index = 0 ;
89- PGconn * persistent_conn = NULL ;
90- static HTAB * remoteConnHash = NULL ;
87+ static remoteConn * pconn = NULL ;
88+ static HTAB * remoteConnHash = NULL ;
9189
9290/*
9391 *Following is list that holds multiple remote connections.
@@ -184,6 +182,16 @@ typedef struct remoteConnHashEnt
184182} \
185183} while (0)
186184
185+ #define DBLINK_INIT \
186+ do { \
187+ if (!pconn) \
188+ { \
189+ pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn)); \
190+ pconn->conn = NULL; \
191+ pconn->openCursorCount = 0; \
192+ pconn->newXactForCursor = FALSE; \
193+ } \
194+ } while (0)
187195
188196/*
189197 * Create a persistent connection to another database
@@ -199,6 +207,8 @@ dblink_connect(PG_FUNCTION_ARGS)
199207PGconn * conn = NULL ;
200208remoteConn * rconn = NULL ;
201209
210+ DBLINK_INIT ;
211+
202212if (PG_NARGS ()== 2 )
203213{
204214connstr = GET_STR (PG_GETARG_TEXT_P (1 ));
@@ -234,7 +244,7 @@ dblink_connect(PG_FUNCTION_ARGS)
234244createNewConnection (connname ,rconn );
235245}
236246else
237- persistent_conn = conn ;
247+ pconn -> conn = conn ;
238248
239249PG_RETURN_TEXT_P (GET_TEXT ("OK" ));
240250}
@@ -250,6 +260,8 @@ dblink_disconnect(PG_FUNCTION_ARGS)
250260remoteConn * rconn = NULL ;
251261PGconn * conn = NULL ;
252262
263+ DBLINK_INIT ;
264+
253265if (PG_NARGS ()== 1 )
254266{
255267conname = GET_STR (PG_GETARG_TEXT_P (0 ));
@@ -258,7 +270,7 @@ dblink_disconnect(PG_FUNCTION_ARGS)
258270conn = rconn -> conn ;
259271}
260272else
261- conn = persistent_conn ;
273+ conn = pconn -> conn ;
262274
263275if (!conn )
264276DBLINK_CONN_NOT_AVAIL ;
@@ -270,7 +282,7 @@ dblink_disconnect(PG_FUNCTION_ARGS)
270282pfree (rconn );
271283}
272284else
273- persistent_conn = NULL ;
285+ pconn -> conn = NULL ;
274286
275287PG_RETURN_TEXT_P (GET_TEXT ("OK" ));
276288}
@@ -292,12 +304,14 @@ dblink_open(PG_FUNCTION_ARGS)
292304remoteConn * rconn = NULL ;
293305bool fail = true;/* default to backward compatible behavior */
294306
307+ DBLINK_INIT ;
308+
295309if (PG_NARGS ()== 2 )
296310{
297311/* text,text */
298312curname = GET_STR (PG_GETARG_TEXT_P (0 ));
299313sql = GET_STR (PG_GETARG_TEXT_P (1 ));
300- conn = persistent_conn ;
314+ rconn = pconn ;
301315}
302316else if (PG_NARGS ()== 3 )
303317{
@@ -307,16 +321,14 @@ dblink_open(PG_FUNCTION_ARGS)
307321curname = GET_STR (PG_GETARG_TEXT_P (0 ));
308322sql = GET_STR (PG_GETARG_TEXT_P (1 ));
309323fail = PG_GETARG_BOOL (2 );
310- conn = persistent_conn ;
324+ rconn = pconn ;
311325}
312326else
313327{
314328conname = GET_STR (PG_GETARG_TEXT_P (0 ));
315329curname = GET_STR (PG_GETARG_TEXT_P (1 ));
316330sql = GET_STR (PG_GETARG_TEXT_P (2 ));
317331rconn = getConnectionByName (conname );
318- if (rconn )
319- conn = rconn -> conn ;
320332}
321333}
322334else if (PG_NARGS ()== 4 )
@@ -327,18 +339,26 @@ dblink_open(PG_FUNCTION_ARGS)
327339sql = GET_STR (PG_GETARG_TEXT_P (2 ));
328340fail = PG_GETARG_BOOL (3 );
329341rconn = getConnectionByName (conname );
330- if (rconn )
331- conn = rconn -> conn ;
332342}
333343
334- if (!conn )
344+ if (!rconn || ! rconn -> conn )
335345DBLINK_CONN_NOT_AVAIL ;
346+ else
347+ conn = rconn -> conn ;
336348
337- res = PQexec (conn ,"BEGIN" );
338- if (PQresultStatus (res )!= PGRES_COMMAND_OK )
339- DBLINK_RES_INTERNALERROR ("begin error" );
349+ /*If we are not in a transaction, start one */
350+ if (PQtransactionStatus (conn )== PQTRANS_IDLE )
351+ {
352+ res = PQexec (conn ,"BEGIN" );
353+ if (PQresultStatus (res )!= PGRES_COMMAND_OK )
354+ DBLINK_RES_INTERNALERROR ("begin error" );
355+ PQclear (res );
356+ rconn -> newXactForCursor = TRUE;
357+ }
340358
341- PQclear (res );
359+ /* if we started a transaction, increment cursor count */
360+ if (rconn -> newXactForCursor )
361+ (rconn -> openCursorCount )++ ;
342362
343363appendStringInfo (str ,"DECLARE %s CURSOR FOR %s" ,curname ,sql );
344364res = PQexec (conn ,str -> data );
@@ -373,11 +393,13 @@ dblink_close(PG_FUNCTION_ARGS)
373393remoteConn * rconn = NULL ;
374394bool fail = true;/* default to backward compatible behavior */
375395
396+ DBLINK_INIT ;
397+
376398if (PG_NARGS ()== 1 )
377399{
378400/* text */
379401curname = GET_STR (PG_GETARG_TEXT_P (0 ));
380- conn = persistent_conn ;
402+ rconn = pconn ;
381403}
382404else if (PG_NARGS ()== 2 )
383405{
@@ -386,15 +408,13 @@ dblink_close(PG_FUNCTION_ARGS)
386408{
387409curname = GET_STR (PG_GETARG_TEXT_P (0 ));
388410fail = PG_GETARG_BOOL (1 );
389- conn = persistent_conn ;
411+ rconn = pconn ;
390412}
391413else
392414{
393415conname = GET_STR (PG_GETARG_TEXT_P (0 ));
394416curname = GET_STR (PG_GETARG_TEXT_P (1 ));
395417rconn = getConnectionByName (conname );
396- if (rconn )
397- conn = rconn -> conn ;
398418}
399419}
400420if (PG_NARGS ()== 3 )
@@ -404,12 +424,12 @@ dblink_close(PG_FUNCTION_ARGS)
404424curname = GET_STR (PG_GETARG_TEXT_P (1 ));
405425fail = PG_GETARG_BOOL (2 );
406426rconn = getConnectionByName (conname );
407- if (rconn )
408- conn = rconn -> conn ;
409427}
410428
411- if (!conn )
429+ if (!rconn || ! rconn -> conn )
412430DBLINK_CONN_NOT_AVAIL ;
431+ else
432+ conn = rconn -> conn ;
413433
414434appendStringInfo (str ,"CLOSE %s" ,curname );
415435
@@ -428,12 +448,22 @@ dblink_close(PG_FUNCTION_ARGS)
428448
429449PQclear (res );
430450
431- /*commit the transaction */
432- res = PQexec ( conn , "COMMIT" );
433- if ( PQresultStatus ( res ) != PGRES_COMMAND_OK )
434- DBLINK_RES_INTERNALERROR ( "commit error" ) ;
451+ /*if we started a transaction, decrement cursor count */
452+ if ( rconn -> newXactForCursor )
453+ {
454+ ( rconn -> openCursorCount ) -- ;
435455
436- PQclear (res );
456+ /* if count is zero, commit the transaction */
457+ if (rconn -> openCursorCount == 0 )
458+ {
459+ rconn -> newXactForCursor = FALSE;
460+
461+ res = PQexec (conn ,"COMMIT" );
462+ if (PQresultStatus (res )!= PGRES_COMMAND_OK )
463+ DBLINK_RES_INTERNALERROR ("commit error" );
464+ PQclear (res );
465+ }
466+ }
437467
438468PG_RETURN_TEXT_P (GET_TEXT ("OK" ));
439469}
@@ -456,6 +486,8 @@ dblink_fetch(PG_FUNCTION_ARGS)
456486char * conname = NULL ;
457487remoteConn * rconn = NULL ;
458488
489+ DBLINK_INIT ;
490+
459491/* stuff done only on the first call of the function */
460492if (SRF_IS_FIRSTCALL ())
461493{
@@ -485,7 +517,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
485517curname = GET_STR (PG_GETARG_TEXT_P (0 ));
486518howmany = PG_GETARG_INT32 (1 );
487519fail = PG_GETARG_BOOL (2 );
488- conn = persistent_conn ;
520+ conn = pconn -> conn ;
489521}
490522else
491523{
@@ -503,7 +535,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
503535/* text,int */
504536curname = GET_STR (PG_GETARG_TEXT_P (0 ));
505537howmany = PG_GETARG_INT32 (1 );
506- conn = persistent_conn ;
538+ conn = pconn -> conn ;
507539}
508540
509541if (!conn )
@@ -648,6 +680,8 @@ dblink_record(PG_FUNCTION_ARGS)
648680MemoryContext oldcontext ;
649681bool freeconn = false;
650682
683+ DBLINK_INIT ;
684+
651685/* stuff done only on the first call of the function */
652686if (SRF_IS_FIRSTCALL ())
653687{
@@ -678,7 +712,7 @@ dblink_record(PG_FUNCTION_ARGS)
678712/* text,text or text,bool */
679713if (get_fn_expr_argtype (fcinfo -> flinfo ,1 )== BOOLOID )
680714{
681- conn = persistent_conn ;
715+ conn = pconn -> conn ;
682716sql = GET_STR (PG_GETARG_TEXT_P (0 ));
683717fail = PG_GETARG_BOOL (1 );
684718}
@@ -691,7 +725,7 @@ dblink_record(PG_FUNCTION_ARGS)
691725else if (PG_NARGS ()== 1 )
692726{
693727/* text */
694- conn = persistent_conn ;
728+ conn = pconn -> conn ;
695729sql = GET_STR (PG_GETARG_TEXT_P (0 ));
696730}
697731else
@@ -857,6 +891,8 @@ dblink_exec(PG_FUNCTION_ARGS)
857891bool freeconn = false;
858892bool fail = true;/* default to backward compatible behavior */
859893
894+ DBLINK_INIT ;
895+
860896if (PG_NARGS ()== 3 )
861897{
862898/* must be text,text,bool */
@@ -869,7 +905,7 @@ dblink_exec(PG_FUNCTION_ARGS)
869905/* might be text,text or text,bool */
870906if (get_fn_expr_argtype (fcinfo -> flinfo ,1 )== BOOLOID )
871907{
872- conn = persistent_conn ;
908+ conn = pconn -> conn ;
873909sql = GET_STR (PG_GETARG_TEXT_P (0 ));
874910fail = PG_GETARG_BOOL (1 );
875911}
@@ -882,7 +918,7 @@ dblink_exec(PG_FUNCTION_ARGS)
882918else if (PG_NARGS ()== 1 )
883919{
884920/* must be single text argument */
885- conn = persistent_conn ;
921+ conn = pconn -> conn ;
886922sql = GET_STR (PG_GETARG_TEXT_P (0 ));
887923}
888924else