4242#include <sys/stat.h>
4343#include <unistd.h>
4444
45+ #include "access/xact.h"
4546#include "libpq/be-fsstubs.h"
4647#include "libpq/libpq-fs.h"
4748#include "miscadmin.h"
5051#include "utils/acl.h"
5152#include "utils/builtins.h"
5253#include "utils/memutils.h"
54+ #include "utils/snapmgr.h"
5355
5456/* define this to enable debug logging */
5557/* #define FSDB 1 */
6769static LargeObjectDesc * * cookies = NULL ;
6870static int cookies_size = 0 ;
6971
72+ static bool lo_cleanup_needed = false;
7073static MemoryContext fscxt = NULL ;
7174
72- #define CreateFSContext () \
73- do { \
74- if (fscxt == NULL) \
75- fscxt = AllocSetContextCreate(TopMemoryContext, \
76- "Filesystem", \
77- ALLOCSET_DEFAULT_SIZES); \
78- } while (0)
79-
80-
81- static int newLOfd (LargeObjectDesc * lobjCookie );
82- static void deleteLOfd (int fd );
75+ static int newLOfd (void );
76+ static void closeLOfd (int fd );
8377static Oid lo_import_internal (text * filename ,Oid lobjOid );
8478
8579
@@ -99,11 +93,26 @@ be_lo_open(PG_FUNCTION_ARGS)
9993elog (DEBUG4 ,"lo_open(%u,%d)" ,lobjId ,mode );
10094#endif
10195
102- CreateFSContext ();
96+ /*
97+ * Allocate a large object descriptor first. This will also create
98+ * 'fscxt' if this is the first LO opened in this transaction.
99+ */
100+ fd = newLOfd ();
103101
104102lobjDesc = inv_open (lobjId ,mode ,fscxt );
103+ lobjDesc -> subid = GetCurrentSubTransactionId ();
104+
105+ /*
106+ * We must register the snapshot in TopTransaction's resowner so that it
107+ * stays alive until the LO is closed rather than until the current portal
108+ * shuts down.
109+ */
110+ if (lobjDesc -> snapshot )
111+ lobjDesc -> snapshot = RegisterSnapshotOnOwner (lobjDesc -> snapshot ,
112+ TopTransactionResourceOwner );
105113
106- fd = newLOfd (lobjDesc );
114+ Assert (cookies [fd ]== NULL );
115+ cookies [fd ]= lobjDesc ;
107116
108117PG_RETURN_INT32 (fd );
109118}
@@ -122,9 +131,7 @@ be_lo_close(PG_FUNCTION_ARGS)
122131elog (DEBUG4 ,"lo_close(%d)" ,fd );
123132#endif
124133
125- inv_close (cookies [fd ]);
126-
127- deleteLOfd (fd );
134+ closeLOfd (fd );
128135
129136PG_RETURN_INT32 (0 );
130137}
@@ -238,12 +245,7 @@ be_lo_creat(PG_FUNCTION_ARGS)
238245{
239246Oid lobjId ;
240247
241- /*
242- * We don't actually need to store into fscxt, but create it anyway to
243- * ensure that AtEOXact_LargeObject knows there is state to clean up
244- */
245- CreateFSContext ();
246-
248+ lo_cleanup_needed = true;
247249lobjId = inv_create (InvalidOid );
248250
249251PG_RETURN_OID (lobjId );
@@ -254,12 +256,7 @@ be_lo_create(PG_FUNCTION_ARGS)
254256{
255257Oid lobjId = PG_GETARG_OID (0 );
256258
257- /*
258- * We don't actually need to store into fscxt, but create it anyway to
259- * ensure that AtEOXact_LargeObject knows there is state to clean up
260- */
261- CreateFSContext ();
262-
259+ lo_cleanup_needed = true;
263260lobjId = inv_create (lobjId );
264261
265262PG_RETURN_OID (lobjId );
@@ -330,16 +327,13 @@ be_lo_unlink(PG_FUNCTION_ARGS)
330327for (i = 0 ;i < cookies_size ;i ++ )
331328{
332329if (cookies [i ]!= NULL && cookies [i ]-> id == lobjId )
333- {
334- inv_close (cookies [i ]);
335- deleteLOfd (i );
336- }
330+ closeLOfd (i );
337331}
338332}
339333
340334/*
341335 * inv_drop does not create a need for end-of-transaction cleanup and
342- * hence we don't need tohave created fscxt .
336+ * hence we don't need toset lo_cleanup_needed .
343337 */
344338PG_RETURN_INT32 (inv_drop (lobjId ));
345339}
@@ -419,8 +413,6 @@ lo_import_internal(text *filename, Oid lobjOid)
419413LargeObjectDesc * lobj ;
420414Oid oid ;
421415
422- CreateFSContext ();
423-
424416/*
425417 * open the file to be read in
426418 */
@@ -435,12 +427,13 @@ lo_import_internal(text *filename, Oid lobjOid)
435427/*
436428 * create an inversion object
437429 */
430+ lo_cleanup_needed = true;
438431oid = inv_create (lobjOid );
439432
440433/*
441434 * read in from the filesystem and write to the inversion object
442435 */
443- lobj = inv_open (oid ,INV_WRITE ,fscxt );
436+ lobj = inv_open (oid ,INV_WRITE ,CurrentMemoryContext );
444437
445438while ((nbytes = read (fd ,buf ,BUFSIZE ))> 0 )
446439{
@@ -482,12 +475,11 @@ be_lo_export(PG_FUNCTION_ARGS)
482475LargeObjectDesc * lobj ;
483476mode_t oumask ;
484477
485- CreateFSContext ();
486-
487478/*
488479 * open the inversion object (no need to test for failure)
489480 */
490- lobj = inv_open (lobjId ,INV_READ ,fscxt );
481+ lo_cleanup_needed = true;
482+ lobj = inv_open (lobjId ,INV_READ ,CurrentMemoryContext );
491483
492484/*
493485 * open the file to be written to
@@ -592,20 +584,22 @@ AtEOXact_LargeObject(bool isCommit)
592584{
593585int i ;
594586
595- if (fscxt == NULL )
587+ if (! lo_cleanup_needed )
596588return ;/* no LO operations in this xact */
597589
598590/*
599591 * Close LO fds and clear cookies array so that LO fds are no longer good.
600- * On abort we skip the close step.
592+ * The memory context and resource owner holding them are going away at
593+ * the end-of-transaction anyway, but on commit, we need to close them to
594+ * avoid warnings about leaked resources at commit. On abort we can skip
595+ * this step.
601596 */
602- for ( i = 0 ; i < cookies_size ; i ++ )
597+ if ( isCommit )
603598{
604- if ( cookies [ i ] != NULL )
599+ for ( i = 0 ; i < cookies_size ; i ++ )
605600{
606- if (isCommit )
607- inv_close (cookies [i ]);
608- deleteLOfd (i );
601+ if (cookies [i ]!= NULL )
602+ closeLOfd (i );
609603}
610604}
611605
@@ -614,11 +608,14 @@ AtEOXact_LargeObject(bool isCommit)
614608cookies_size = 0 ;
615609
616610/* Release the LO memory context to prevent permanent memory leaks. */
617- MemoryContextDelete (fscxt );
611+ if (fscxt )
612+ MemoryContextDelete (fscxt );
618613fscxt = NULL ;
619614
620615/* Give inv_api.c a chance to clean up, too */
621616close_lo_relation (isCommit );
617+
618+ lo_cleanup_needed = false;
622619}
623620
624621/*
@@ -646,14 +643,7 @@ AtEOSubXact_LargeObject(bool isCommit, SubTransactionId mySubid,
646643if (isCommit )
647644lo -> subid = parentSubid ;
648645else
649- {
650- /*
651- * Make sure we do not call inv_close twice if it errors out
652- * for some reason. Better a leak than a crash.
653- */
654- deleteLOfd (i );
655- inv_close (lo );
656- }
646+ closeLOfd (i );
657647}
658648}
659649}
@@ -663,19 +653,22 @@ AtEOSubXact_LargeObject(bool isCommit, SubTransactionId mySubid,
663653 *****************************************************************************/
664654
665655static int
666- newLOfd (LargeObjectDesc * lobjCookie )
656+ newLOfd (void )
667657{
668658int i ,
669659newsize ;
670660
661+ lo_cleanup_needed = true;
662+ if (fscxt == NULL )
663+ fscxt = AllocSetContextCreate (TopMemoryContext ,
664+ "Filesystem" ,
665+ ALLOCSET_DEFAULT_SIZES );
666+
671667/* Try to find a free slot */
672668for (i = 0 ;i < cookies_size ;i ++ )
673669{
674670if (cookies [i ]== NULL )
675- {
676- cookies [i ]= lobjCookie ;
677671return i ;
678- }
679672}
680673
681674/* No free slot, so make the array bigger */
@@ -700,15 +693,25 @@ newLOfd(LargeObjectDesc *lobjCookie)
700693cookies_size = newsize ;
701694}
702695
703- Assert (cookies [i ]== NULL );
704- cookies [i ]= lobjCookie ;
705696return i ;
706697}
707698
708699static void
709- deleteLOfd (int fd )
700+ closeLOfd (int fd )
710701{
702+ LargeObjectDesc * lobj ;
703+
704+ /*
705+ * Make sure we do not try to free twice if this errors out for some
706+ * reason. Better a leak than a crash.
707+ */
708+ lobj = cookies [fd ];
711709cookies [fd ]= NULL ;
710+
711+ if (lobj -> snapshot )
712+ UnregisterSnapshotFromOwner (lobj -> snapshot ,
713+ TopTransactionResourceOwner );
714+ inv_close (lobj );
712715}
713716
714717/*****************************************************************************
@@ -727,13 +730,8 @@ lo_get_fragment_internal(Oid loOid, int64 offset, int32 nbytes)
727730int total_read PG_USED_FOR_ASSERTS_ONLY ;
728731bytea * result = NULL ;
729732
730- /*
731- * We don't actually need to store into fscxt, but create it anyway to
732- * ensure that AtEOXact_LargeObject knows there is state to clean up
733- */
734- CreateFSContext ();
735-
736- loDesc = inv_open (loOid ,INV_READ ,fscxt );
733+ lo_cleanup_needed = true;
734+ loDesc = inv_open (loOid ,INV_READ ,CurrentMemoryContext );
737735
738736/*
739737 * Compute number of bytes we'll actually read, accommodating nbytes == -1
@@ -817,10 +815,9 @@ be_lo_from_bytea(PG_FUNCTION_ARGS)
817815LargeObjectDesc * loDesc ;
818816int written PG_USED_FOR_ASSERTS_ONLY ;
819817
820- CreateFSContext ();
821-
818+ lo_cleanup_needed = true;
822819loOid = inv_create (loOid );
823- loDesc = inv_open (loOid ,INV_WRITE ,fscxt );
820+ loDesc = inv_open (loOid ,INV_WRITE ,CurrentMemoryContext );
824821written = inv_write (loDesc ,VARDATA_ANY (str ),VARSIZE_ANY_EXHDR (str ));
825822Assert (written == VARSIZE_ANY_EXHDR (str ));
826823inv_close (loDesc );
@@ -840,9 +837,8 @@ be_lo_put(PG_FUNCTION_ARGS)
840837LargeObjectDesc * loDesc ;
841838int written PG_USED_FOR_ASSERTS_ONLY ;
842839
843- CreateFSContext ();
844-
845- loDesc = inv_open (loOid ,INV_WRITE ,fscxt );
840+ lo_cleanup_needed = true;
841+ loDesc = inv_open (loOid ,INV_WRITE ,CurrentMemoryContext );
846842
847843/* Permission check */
848844if (!lo_compat_privileges &&