3131 * BufFile also supports temporary files that exceed the OS file size limit
3232 * (by opening multiple fd.c temporary files). This is an essential feature
3333 * for sorts and hashjoins on large amounts of data.
34+ *
35+ * BufFile supports temporary files that can be made read-only and shared with
36+ * other backends, as infrastructure for parallel execution. Such files need
37+ * to be created as a member of a SharedFileSet that all participants are
38+ * attached to.
3439 *-------------------------------------------------------------------------
3540 */
3641
3742#include "postgres.h"
3843
3944#include "executor/instrument.h"
45+ #include "miscadmin.h"
4046#include "pgstat.h"
4147#include "storage/fd.h"
4248#include "storage/buffile.h"
@@ -70,6 +76,10 @@ struct BufFile
7076
7177bool isInterXact ;/* keep open over transactions? */
7278bool dirty ;/* does buffer need to be written? */
79+ bool readOnly ;/* has the file been set to read only? */
80+
81+ SharedFileSet * fileset ;/* space for segment files if shared */
82+ const char * name ;/* name of this BufFile if shared */
7383
7484/*
7585 * resowner is the ResourceOwner to use for underlying temp files. (We
@@ -94,6 +104,7 @@ static void extendBufFile(BufFile *file);
94104static void BufFileLoadBuffer (BufFile * file );
95105static void BufFileDumpBuffer (BufFile * file );
96106static int BufFileFlush (BufFile * file );
107+ static File MakeNewSharedSegment (BufFile * file ,int segment );
97108
98109
99110/*
@@ -117,6 +128,9 @@ makeBufFile(File firstfile)
117128file -> curOffset = 0L ;
118129file -> pos = 0 ;
119130file -> nbytes = 0 ;
131+ file -> readOnly = false;
132+ file -> fileset = NULL ;
133+ file -> name = NULL ;
120134
121135return file ;
122136}
@@ -134,7 +148,11 @@ extendBufFile(BufFile *file)
134148oldowner = CurrentResourceOwner ;
135149CurrentResourceOwner = file -> resowner ;
136150
137- pfile = OpenTemporaryFile (file -> isInterXact );
151+ if (file -> fileset == NULL )
152+ pfile = OpenTemporaryFile (file -> isInterXact );
153+ else
154+ pfile = MakeNewSharedSegment (file ,file -> numFiles );
155+
138156Assert (pfile >=0 );
139157
140158CurrentResourceOwner = oldowner ;
@@ -175,6 +193,189 @@ BufFileCreateTemp(bool interXact)
175193return file ;
176194}
177195
196+ /*
197+ * Build the name for a given segment of a given BufFile.
198+ */
199+ static void
200+ SharedSegmentName (char * name ,const char * buffile_name ,int segment )
201+ {
202+ snprintf (name ,MAXPGPATH ,"%s.%d" ,buffile_name ,segment );
203+ }
204+
205+ /*
206+ * Create a new segment file backing a shared BufFile.
207+ */
208+ static File
209+ MakeNewSharedSegment (BufFile * buffile ,int segment )
210+ {
211+ char name [MAXPGPATH ];
212+ File file ;
213+
214+ SharedSegmentName (name ,buffile -> name ,segment );
215+ file = SharedFileSetCreate (buffile -> fileset ,name );
216+
217+ /* SharedFileSetCreate would've errored out */
218+ Assert (file > 0 );
219+
220+ return file ;
221+ }
222+
223+ /*
224+ * Create a BufFile that can be discovered and opened read-only by other
225+ * backends that are attached to the same SharedFileSet using the same name.
226+ *
227+ * The naming scheme for shared BufFiles is left up to the calling code. The
228+ * name will appear as part of one or more filenames on disk, and might
229+ * provide clues to administrators about which subsystem is generating
230+ * temporary file data. Since each SharedFileSet object is backed by one or
231+ * more uniquely named temporary directory, names don't conflict with
232+ * unrelated SharedFileSet objects.
233+ */
234+ BufFile *
235+ BufFileCreateShared (SharedFileSet * fileset ,const char * name )
236+ {
237+ BufFile * file ;
238+
239+ file = (BufFile * )palloc (sizeof (BufFile ));
240+ file -> fileset = fileset ;
241+ file -> name = pstrdup (name );
242+ file -> numFiles = 1 ;
243+ file -> files = (File * )palloc (sizeof (File ));
244+ file -> files [0 ]= MakeNewSharedSegment (file ,0 );
245+ file -> offsets = (off_t * )palloc (sizeof (off_t ));
246+ file -> offsets [0 ]= 0L ;
247+ file -> isInterXact = false;
248+ file -> dirty = false;
249+ file -> resowner = CurrentResourceOwner ;
250+ file -> curFile = 0 ;
251+ file -> curOffset = 0L ;
252+ file -> pos = 0 ;
253+ file -> nbytes = 0 ;
254+ file -> readOnly = false;
255+ file -> name = pstrdup (name );
256+
257+ return file ;
258+ }
259+
260+ /*
261+ * Open a file that was previously created in another backend (or this one)
262+ * with BufFileCreateShared in the same SharedFileSet using the same name.
263+ * The backend that created the file must have called BufFileClose() or
264+ * BufFileExport() to make sure that it is ready to be opened by other
265+ * backends and render it read-only.
266+ */
267+ BufFile *
268+ BufFileOpenShared (SharedFileSet * fileset ,const char * name )
269+ {
270+ BufFile * file = (BufFile * )palloc (sizeof (BufFile ));
271+ char segment_name [MAXPGPATH ];
272+ Size capacity = 16 ;
273+ File * files = palloc (sizeof (File )* capacity );
274+ int nfiles = 0 ;
275+
276+ file = (BufFile * )palloc (sizeof (BufFile ));
277+ files = palloc (sizeof (File )* capacity );
278+
279+ /*
280+ * We don't know how many segments there are, so we'll probe the
281+ * filesystem to find out.
282+ */
283+ for (;;)
284+ {
285+ /* See if we need to expand our file segment array. */
286+ if (nfiles + 1 > capacity )
287+ {
288+ capacity *=2 ;
289+ files = repalloc (files ,sizeof (File )* capacity );
290+ }
291+ /* Try to load a segment. */
292+ SharedSegmentName (segment_name ,name ,nfiles );
293+ files [nfiles ]= SharedFileSetOpen (fileset ,segment_name );
294+ if (files [nfiles ] <=0 )
295+ break ;
296+ ++ nfiles ;
297+
298+ CHECK_FOR_INTERRUPTS ();
299+ }
300+
301+ /*
302+ * If we didn't find any files at all, then no BufFile exists with this
303+ * name.
304+ */
305+ if (nfiles == 0 )
306+ return NULL ;
307+
308+ file -> numFiles = nfiles ;
309+ file -> files = files ;
310+ file -> offsets = (off_t * )palloc0 (sizeof (off_t )* nfiles );
311+ file -> isInterXact = false;
312+ file -> dirty = false;
313+ file -> resowner = CurrentResourceOwner ;/* Unused, can't extend */
314+ file -> curFile = 0 ;
315+ file -> curOffset = 0L ;
316+ file -> pos = 0 ;
317+ file -> nbytes = 0 ;
318+ file -> readOnly = true;/* Can't write to files opened this way */
319+ file -> fileset = fileset ;
320+ file -> name = pstrdup (name );
321+
322+ return file ;
323+ }
324+
325+ /*
326+ * Delete a BufFile that was created by BufFileCreateShared in the given
327+ * SharedFileSet using the given name.
328+ *
329+ * It is not necessary to delete files explicitly with this function. It is
330+ * provided only as a way to delete files proactively, rather than waiting for
331+ * the SharedFileSet to be cleaned up.
332+ *
333+ * Only one backend should attempt to delete a given name, and should know
334+ * that it exists and has been exported or closed.
335+ */
336+ void
337+ BufFileDeleteShared (SharedFileSet * fileset ,const char * name )
338+ {
339+ char segment_name [MAXPGPATH ];
340+ int segment = 0 ;
341+ bool found = false;
342+
343+ /*
344+ * We don't know how many segments the file has. We'll keep deleting
345+ * until we run out. If we don't manage to find even an initial segment,
346+ * raise an error.
347+ */
348+ for (;;)
349+ {
350+ SharedSegmentName (segment_name ,name ,segment );
351+ if (!SharedFileSetDelete (fileset ,segment_name , true))
352+ break ;
353+ found = true;
354+ ++ segment ;
355+
356+ CHECK_FOR_INTERRUPTS ();
357+ }
358+
359+ if (!found )
360+ elog (ERROR ,"could not delete unknown shared BufFile \"%s\"" ,name );
361+ }
362+
363+ /*
364+ * BufFileExportShared --- flush and make read-only, in preparation for sharing.
365+ */
366+ void
367+ BufFileExportShared (BufFile * file )
368+ {
369+ /* Must be a file belonging to a SharedFileSet. */
370+ Assert (file -> fileset != NULL );
371+
372+ /* It's probably a bug if someone calls this twice. */
373+ Assert (!file -> readOnly );
374+
375+ BufFileFlush (file );
376+ file -> readOnly = true;
377+ }
378+
178379/*
179380 * Close a BufFile
180381 *
@@ -390,6 +591,8 @@ BufFileWrite(BufFile *file, void *ptr, size_t size)
390591size_t nwritten = 0 ;
391592size_t nthistime ;
392593
594+ Assert (!file -> readOnly );
595+
393596while (size > 0 )
394597{
395598if (file -> pos >=BLCKSZ )