42
42
#include <sys/stat.h>
43
43
#include <unistd.h>
44
44
45
+ #include "access/xact.h"
45
46
#include "libpq/be-fsstubs.h"
46
47
#include "libpq/libpq-fs.h"
47
48
#include "miscadmin.h"
50
51
#include "utils/acl.h"
51
52
#include "utils/builtins.h"
52
53
#include "utils/memutils.h"
54
+ #include "utils/snapmgr.h"
53
55
54
56
/*
55
57
* compatibility flag for permission checks
@@ -72,19 +74,11 @@ boollo_compat_privileges;
72
74
static LargeObjectDesc * * cookies = NULL ;
73
75
static int cookies_size = 0 ;
74
76
77
+ static bool lo_cleanup_needed = false;
75
78
static MemoryContext fscxt = NULL ;
76
79
77
- #define CreateFSContext () \
78
- do { \
79
- if (fscxt == NULL) \
80
- fscxt = AllocSetContextCreate(TopMemoryContext, \
81
- "Filesystem", \
82
- ALLOCSET_DEFAULT_SIZES); \
83
- } while (0)
84
-
85
-
86
- static int newLOfd (LargeObjectDesc * lobjCookie );
87
- static void deleteLOfd (int fd );
80
+ static int newLOfd (void );
81
+ static void closeLOfd (int fd );
88
82
static Oid lo_import_internal (text * filename ,Oid lobjOid );
89
83
90
84
@@ -104,19 +98,33 @@ lo_open(PG_FUNCTION_ARGS)
104
98
elog (DEBUG4 ,"lo_open(%u,%d)" ,lobjId ,mode );
105
99
#endif
106
100
107
- CreateFSContext ();
101
+ /*
102
+ * Allocate a large object descriptor first. This will also create
103
+ * 'fscxt' if this is the first LO opened in this transaction.
104
+ */
105
+ fd = newLOfd ();
108
106
109
107
lobjDesc = inv_open (lobjId ,mode ,fscxt );
110
-
111
108
if (lobjDesc == NULL )
112
109
{/* lookup failed */
113
110
#if FSDB
114
111
elog (DEBUG4 ,"could not open large object %u" ,lobjId );
115
112
#endif
116
113
PG_RETURN_INT32 (-1 );
117
114
}
115
+ lobjDesc -> subid = GetCurrentSubTransactionId ();
118
116
119
- fd = newLOfd (lobjDesc );
117
+ /*
118
+ * We must register the snapshot in TopTransaction's resowner so that it
119
+ * stays alive until the LO is closed rather than until the current portal
120
+ * shuts down.
121
+ */
122
+ if (lobjDesc -> snapshot )
123
+ lobjDesc -> snapshot = RegisterSnapshotOnOwner (lobjDesc -> snapshot ,
124
+ TopTransactionResourceOwner );
125
+
126
+ Assert (cookies [fd ]== NULL );
127
+ cookies [fd ]= lobjDesc ;
120
128
121
129
PG_RETURN_INT32 (fd );
122
130
}
@@ -135,9 +143,7 @@ lo_close(PG_FUNCTION_ARGS)
135
143
elog (DEBUG4 ,"lo_close(%d)" ,fd );
136
144
#endif
137
145
138
- inv_close (cookies [fd ]);
139
-
140
- deleteLOfd (fd );
146
+ closeLOfd (fd );
141
147
142
148
PG_RETURN_INT32 (0 );
143
149
}
@@ -271,12 +277,7 @@ lo_creat(PG_FUNCTION_ARGS)
271
277
{
272
278
Oid lobjId ;
273
279
274
- /*
275
- * We don't actually need to store into fscxt, but create it anyway to
276
- * ensure that AtEOXact_LargeObject knows there is state to clean up
277
- */
278
- CreateFSContext ();
279
-
280
+ lo_cleanup_needed = true;
280
281
lobjId = inv_create (InvalidOid );
281
282
282
283
PG_RETURN_OID (lobjId );
@@ -287,12 +288,7 @@ lo_create(PG_FUNCTION_ARGS)
287
288
{
288
289
Oid lobjId = PG_GETARG_OID (0 );
289
290
290
- /*
291
- * We don't actually need to store into fscxt, but create it anyway to
292
- * ensure that AtEOXact_LargeObject knows there is state to clean up
293
- */
294
- CreateFSContext ();
295
-
291
+ lo_cleanup_needed = true;
296
292
lobjId = inv_create (lobjId );
297
293
298
294
PG_RETURN_OID (lobjId );
@@ -359,16 +355,13 @@ lo_unlink(PG_FUNCTION_ARGS)
359
355
for (i = 0 ;i < cookies_size ;i ++ )
360
356
{
361
357
if (cookies [i ]!= NULL && cookies [i ]-> id == lobjId )
362
- {
363
- inv_close (cookies [i ]);
364
- deleteLOfd (i );
365
- }
358
+ closeLOfd (i );
366
359
}
367
360
}
368
361
369
362
/*
370
363
* inv_drop does not create a need for end-of-transaction cleanup and
371
- * hence we don't need tohave created fscxt .
364
+ * hence we don't need toset lo_cleanup_needed .
372
365
*/
373
366
PG_RETURN_INT32 (inv_drop (lobjId ));
374
367
}
@@ -456,8 +449,6 @@ lo_import_internal(text *filename, Oid lobjOid)
456
449
errhint ("Anyone can use the client-side lo_import() provided by libpq." )));
457
450
#endif
458
451
459
- CreateFSContext ();
460
-
461
452
/*
462
453
* open the file to be read in
463
454
*/
@@ -472,12 +463,13 @@ lo_import_internal(text *filename, Oid lobjOid)
472
463
/*
473
464
* create an inversion object
474
465
*/
466
+ lo_cleanup_needed = true;
475
467
oid = inv_create (lobjOid );
476
468
477
469
/*
478
470
* read in from the filesystem and write to the inversion object
479
471
*/
480
- lobj = inv_open (oid ,INV_WRITE ,fscxt );
472
+ lobj = inv_open (oid ,INV_WRITE ,CurrentMemoryContext );
481
473
482
474
while ((nbytes = read (fd ,buf ,BUFSIZE ))> 0 )
483
475
{
@@ -522,12 +514,11 @@ lo_export(PG_FUNCTION_ARGS)
522
514
errhint ("Anyone can use the client-side lo_export() provided by libpq." )));
523
515
#endif
524
516
525
- CreateFSContext ();
526
-
527
517
/*
528
518
* open the inversion object (no need to test for failure)
529
519
*/
530
- lobj = inv_open (lobjId ,INV_READ ,fscxt );
520
+ lo_cleanup_needed = true;
521
+ lobj = inv_open (lobjId ,INV_READ ,CurrentMemoryContext );
531
522
532
523
/*
533
524
* open the file to be written to
@@ -643,20 +634,22 @@ AtEOXact_LargeObject(bool isCommit)
643
634
{
644
635
int i ;
645
636
646
- if (fscxt == NULL )
637
+ if (! lo_cleanup_needed )
647
638
return ;/* no LO operations in this xact */
648
639
649
640
/*
650
641
* Close LO fds and clear cookies array so that LO fds are no longer good.
651
- * On abort we skip the close step.
642
+ * The memory context and resource owner holding them are going away at
643
+ * the end-of-transaction anyway, but on commit, we need to close them to
644
+ * avoid warnings about leaked resources at commit. On abort we can skip
645
+ * this step.
652
646
*/
653
- for ( i = 0 ; i < cookies_size ; i ++ )
647
+ if ( isCommit )
654
648
{
655
- if ( cookies [ i ] != NULL )
649
+ for ( i = 0 ; i < cookies_size ; i ++ )
656
650
{
657
- if (isCommit )
658
- inv_close (cookies [i ]);
659
- deleteLOfd (i );
651
+ if (cookies [i ]!= NULL )
652
+ closeLOfd (i );
660
653
}
661
654
}
662
655
@@ -665,11 +658,14 @@ AtEOXact_LargeObject(bool isCommit)
665
658
cookies_size = 0 ;
666
659
667
660
/* Release the LO memory context to prevent permanent memory leaks. */
668
- MemoryContextDelete (fscxt );
661
+ if (fscxt )
662
+ MemoryContextDelete (fscxt );
669
663
fscxt = NULL ;
670
664
671
665
/* Give inv_api.c a chance to clean up, too */
672
666
close_lo_relation (isCommit );
667
+
668
+ lo_cleanup_needed = false;
673
669
}
674
670
675
671
/*
@@ -697,14 +693,7 @@ AtEOSubXact_LargeObject(bool isCommit, SubTransactionId mySubid,
697
693
if (isCommit )
698
694
lo -> subid = parentSubid ;
699
695
else
700
- {
701
- /*
702
- * Make sure we do not call inv_close twice if it errors out
703
- * for some reason. Better a leak than a crash.
704
- */
705
- deleteLOfd (i );
706
- inv_close (lo );
707
- }
696
+ closeLOfd (i );
708
697
}
709
698
}
710
699
}
@@ -714,19 +703,22 @@ AtEOSubXact_LargeObject(bool isCommit, SubTransactionId mySubid,
714
703
*****************************************************************************/
715
704
716
705
static int
717
- newLOfd (LargeObjectDesc * lobjCookie )
706
+ newLOfd (void )
718
707
{
719
708
int i ,
720
709
newsize ;
721
710
711
+ lo_cleanup_needed = true;
712
+ if (fscxt == NULL )
713
+ fscxt = AllocSetContextCreate (TopMemoryContext ,
714
+ "Filesystem" ,
715
+ ALLOCSET_DEFAULT_SIZES );
716
+
722
717
/* Try to find a free slot */
723
718
for (i = 0 ;i < cookies_size ;i ++ )
724
719
{
725
720
if (cookies [i ]== NULL )
726
- {
727
- cookies [i ]= lobjCookie ;
728
721
return i ;
729
- }
730
722
}
731
723
732
724
/* No free slot, so make the array bigger */
@@ -751,15 +743,25 @@ newLOfd(LargeObjectDesc *lobjCookie)
751
743
cookies_size = newsize ;
752
744
}
753
745
754
- Assert (cookies [i ]== NULL );
755
- cookies [i ]= lobjCookie ;
756
746
return i ;
757
747
}
758
748
759
749
static void
760
- deleteLOfd (int fd )
750
+ closeLOfd (int fd )
761
751
{
752
+ LargeObjectDesc * lobj ;
753
+
754
+ /*
755
+ * Make sure we do not try to free twice if this errors out for some
756
+ * reason. Better a leak than a crash.
757
+ */
758
+ lobj = cookies [fd ];
762
759
cookies [fd ]= NULL ;
760
+
761
+ if (lobj -> snapshot )
762
+ UnregisterSnapshotFromOwner (lobj -> snapshot ,
763
+ TopTransactionResourceOwner );
764
+ inv_close (lobj );
763
765
}
764
766
765
767
/*****************************************************************************
@@ -778,13 +780,8 @@ lo_get_fragment_internal(Oid loOid, int64 offset, int32 nbytes)
778
780
int total_read PG_USED_FOR_ASSERTS_ONLY ;
779
781
bytea * result = NULL ;
780
782
781
- /*
782
- * We don't actually need to store into fscxt, but create it anyway to
783
- * ensure that AtEOXact_LargeObject knows there is state to clean up
784
- */
785
- CreateFSContext ();
786
-
787
- loDesc = inv_open (loOid ,INV_READ ,fscxt );
783
+ lo_cleanup_needed = true;
784
+ loDesc = inv_open (loOid ,INV_READ ,CurrentMemoryContext );
788
785
789
786
/* Permission check */
790
787
if (!lo_compat_privileges &&
@@ -879,10 +876,9 @@ lo_from_bytea(PG_FUNCTION_ARGS)
879
876
LargeObjectDesc * loDesc ;
880
877
int written PG_USED_FOR_ASSERTS_ONLY ;
881
878
882
- CreateFSContext ();
883
-
879
+ lo_cleanup_needed = true;
884
880
loOid = inv_create (loOid );
885
- loDesc = inv_open (loOid ,INV_WRITE ,fscxt );
881
+ loDesc = inv_open (loOid ,INV_WRITE ,CurrentMemoryContext );
886
882
written = inv_write (loDesc ,VARDATA_ANY (str ),VARSIZE_ANY_EXHDR (str ));
887
883
Assert (written == VARSIZE_ANY_EXHDR (str ));
888
884
inv_close (loDesc );
@@ -902,9 +898,8 @@ lo_put(PG_FUNCTION_ARGS)
902
898
LargeObjectDesc * loDesc ;
903
899
int written PG_USED_FOR_ASSERTS_ONLY ;
904
900
905
- CreateFSContext ();
906
-
907
- loDesc = inv_open (loOid ,INV_WRITE ,fscxt );
901
+ lo_cleanup_needed = true;
902
+ loDesc = inv_open (loOid ,INV_WRITE ,CurrentMemoryContext );
908
903
909
904
/* Permission check */
910
905
if (!lo_compat_privileges &&