Expand Up @@ -46,6 +46,9 @@ struct FileFdwOption Oidoptcontext;/* Oid of catalog in which option may appear */ }; /* Totally made-up compression ratio */ static const double program_compression_ratio = 2.7708899835032f; /* * Valid options for file_fdw. * These options are based on the options for COPY FROM command. Expand All @@ -68,6 +71,7 @@ static const struct FileFdwOption valid_options[] = { {"escape", ForeignTableRelationId}, {"null", ForeignTableRelationId}, {"encoding", ForeignTableRelationId}, {"decompressor", ForeignTableRelationId}, {"force_not_null", AttributeRelationId}, /* Expand All @@ -84,6 +88,7 @@ static const struct FileFdwOption valid_options[] = { typedef struct FileFdwPlanState { char *filename;/* file to read */ boolis_program;/* whether a program is used to read the file */ List *options;/* merged COPY options, excluding filename */ BlockNumber pages;/* estimate of file's physical size */ doublentuples;/* estimate of number of rows in file */ Expand All @@ -95,6 +100,7 @@ typedef struct FileFdwPlanState typedef struct FileFdwExecutionState { char *filename;/* file to read */ char *program;/* optional program to use to read file */ List *options;/* merged COPY options, excluding filename */ CopyStatecstate;/* state of reading file */ } FileFdwExecutionState; Expand Down Expand Up @@ -137,7 +143,7 @@ static bool fileAnalyzeForeignTable(Relation relation, */ static bool is_valid_option(const char *option, Oid context); static void fileGetOptions(Oid foreigntableid, char **filename, List **other_options); char **filename,char **program, List **other_options); static List *get_file_fdw_attribute_options(Oid relid); static bool check_selective_binary_conversion(RelOptInfo *baserel, Oid foreigntableid, Expand Down Expand Up @@ -186,6 +192,7 @@ file_fdw_validator(PG_FUNCTION_ARGS) List *options_list = untransformRelOptions(PG_GETARG_DATUM(0)); Oidcatalog = PG_GETARG_OID(1); char *filename = NULL; char *decompressor = NULL; DefElem *force_not_null = NULL; List *other_options = NIL; ListCell *cell; Expand Down Expand Up @@ -243,9 +250,9 @@ file_fdw_validator(PG_FUNCTION_ARGS) } /* * Separate out filename and force_not_null, since ProcessCopyOptions * won't accept them. (force_not_null only comes in a boolean * per-column flavor here.) * Separate out filename, decompressor, and force_not_null, since *ProcessCopyOptions won't accept them. (force_not_null only comes in *a boolean per-column flavor here.) */ if (strcmp(def->defname, "filename") == 0) { Expand All @@ -255,6 +262,14 @@ file_fdw_validator(PG_FUNCTION_ARGS) errmsg("conflicting or redundant options"))); filename = defGetString(def); } else if (strcmp(def->defname, "decompressor") == 0) { if (decompressor) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); decompressor = defGetString(def); } else if (strcmp(def->defname, "force_not_null") == 0) { if (force_not_null) Expand All @@ -274,13 +289,28 @@ file_fdw_validator(PG_FUNCTION_ARGS) */ ProcessCopyOptions(NULL, true, other_options); /* * Filename option is required for file_fdw foreign tables. */ if (catalog == ForeignTableRelationId && filename == NULL) ereport(ERROR, (errcode(ERRCODE_FDW_DYNAMIC_PARAMETER_VALUE_NEEDED), errmsg("filename is required for file_fdw foreign tables"))); if (catalog == ForeignTableRelationId) { /* * Filename option is required for file_fdw foreign tables. */ if (filename == NULL) ereport(ERROR, (errcode(ERRCODE_FDW_DYNAMIC_PARAMETER_VALUE_NEEDED), errmsg("filename is required for file_fdw foreign tables"))); /* * Decompressors must be executable. */ if (decompressor && (access(decompressor, R_OK | X_OK) != 0)) { ereport(ERROR, (errcode_for_file_access(), errmsg("decompressor must be readable/executable \"%s\": %m", decompressor))); } } PG_RETURN_VOID(); } Expand All @@ -305,12 +335,14 @@ is_valid_option(const char *option, Oid context) /* * Fetch the options for a file_fdw foreign table. * * We have to separate out "filename" from the other options because * it must not appear in the options list passed to the core COPY code. * We have to separate out "filename" and "decompressor" from the other options * because they must not appear in the options passed to the core COPY code. If * a decompressor is present, a string consisting of it concatenated to the * escaped file name is stored at `program`. */ static void fileGetOptions(Oid foreigntableid, char **filename, List **other_options) char **filename,char **program, List **other_options) { ForeignTable *table; ForeignServer *server; Expand All @@ -319,6 +351,9 @@ fileGetOptions(Oid foreigntableid, ListCell *lc, *prev; char *decompressor; char *write_ptr, *token, *input, *read_ptr; /* * Extract options from FDW objects. We ignore user mappings because * file_fdw doesn't have any options that can be specified there. Expand Down Expand Up @@ -352,6 +387,27 @@ fileGetOptions(Oid foreigntableid, options = list_delete_cell(options, lc, prev); break; } prev = lc; } /* * Separate out the decompressor, which will be used to calculate program. */ decompressor = NULL; *program = NULL; prev = NULL; foreach(lc, options) { DefElem *def = (DefElem *) lfirst(lc); if (strcmp(def->defname, "decompressor") == 0) { decompressor = defGetString(def); options = list_delete_cell(options, lc, prev); break; } prev = lc; } Expand All @@ -362,6 +418,41 @@ fileGetOptions(Oid foreigntableid, if (*filename == NULL) elog(ERROR, "filename is required for file_fdw foreign tables"); /* * Set up the decompressor if present. */ if (decompressor != NULL) { /* * We will escape the filename by wrapping it in single quotes. To deal * with single quotes in the name itself, we will replace all single * quotes with the string "'\''", which is four characters long. Strings * of only single quotes will need four times as much space, plus the * room for the quotes, a space, and a null terminator. */ *program = palloc0( (strlen(decompressor) + (4 * strlen(*filename)) + 4) * sizeof(char)); write_ptr = stpcpy(*program, decompressor); write_ptr = stpcpy(write_ptr, " '"); /* We're mutating filename so copy it */ input = read_ptr = pstrdup(*filename); write_ptr = stpcpy(write_ptr, strsep(&read_ptr, "'")); while ((token = strsep(&read_ptr, "'")) != NULL) { write_ptr = stpcpy(write_ptr, "'\\''"); write_ptr = stpcpy(write_ptr, token); } stpcpy(write_ptr, "'"); pfree(input); } *other_options = options; } Expand Down Expand Up @@ -433,14 +524,16 @@ fileGetForeignRelSize(PlannerInfo *root, Oid foreigntableid) { FileFdwPlanState *fdw_private; char *program; /* * Fetch options. We only need filename at this point, but we might as * well get everything and not need to re-fetch it later in planning. */ fdw_private = (FileFdwPlanState *) palloc(sizeof(FileFdwPlanState)); fileGetOptions(foreigntableid, &fdw_private->filename, &fdw_private->options); fileGetOptions(foreigntableid, &fdw_private->filename, &program, &fdw_private->options); fdw_private->is_program = (program != NULL); baserel->fdw_private = (void *) fdw_private; /* Estimate relation size */ Expand Down Expand Up @@ -537,14 +630,22 @@ static void fileExplainForeignScan(ForeignScanState *node, ExplainState *es) { char *filename; char *program; List *options; /* Fetch options --- we only need filename at this point */ fileGetOptions(RelationGetRelid(node->ss.ss_currentRelation), &filename, &options); &filename, &program, & options); ExplainPropertyText("Foreign File", filename, es); if (program != NULL) { ExplainPropertyText("Foreign Program", program, es); ExplainPropertyFloat("Foreign Program Compression Est.", program_compression_ratio, 4, es); } /* Suppress file size if we're not showing cost details */ if (es->costs) { Expand All @@ -565,6 +666,7 @@ fileBeginForeignScan(ForeignScanState *node, int eflags) { ForeignScan *plan = (ForeignScan *) node->ss.ps.plan; char *filename; char *program; List *options; CopyStatecstate; FileFdwExecutionState *festate; Expand All @@ -577,7 +679,7 @@ fileBeginForeignScan(ForeignScanState *node, int eflags) /* Fetch options of foreign table */ fileGetOptions(RelationGetRelid(node->ss.ss_currentRelation), &filename, &options); &filename, &program, & options); /* Add any options from the plan (currently only convert_selectively) */ options = list_concat(options, plan->fdw_private); Expand All @@ -587,8 +689,8 @@ fileBeginForeignScan(ForeignScanState *node, int eflags) * as to match the expected ScanTupleSlot signature. */ cstate = BeginCopyFrom(node->ss.ss_currentRelation, filename, false ,(program != NULL) ? program : filename,(program != NULL) , NIL, options); Expand All @@ -598,6 +700,7 @@ fileBeginForeignScan(ForeignScanState *node, int eflags) */ festate = (FileFdwExecutionState *) palloc(sizeof(FileFdwExecutionState)); festate->filename = filename; festate->program = program; festate->options = options; festate->cstate = cstate; Expand Down Expand Up @@ -656,12 +759,16 @@ static void fileReScanForeignScan(ForeignScanState *node) { FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state; char *filename_or_program; EndCopyFrom(festate->cstate); filename_or_program = (festate->program != NULL) ? festate->program : festate->filename; festate->cstate = BeginCopyFrom(node->ss.ss_currentRelation, festate->filename ,false ,filename_or_program ,(festate->program != NULL) ,NIL, festate->options); } Expand Down Expand Up @@ -690,11 +797,12 @@ fileAnalyzeForeignTable(Relation relation, BlockNumber *totalpages) { char *filename; char *program; List *options; struct stat stat_buf; /* Fetch options of foreign table */ fileGetOptions(RelationGetRelid(relation), &filename, &options); fileGetOptions(RelationGetRelid(relation), &filename, &program, & options); /* * Get size of the file. (XXX if we fail here, would it be better to just Expand Down Expand Up @@ -900,6 +1008,8 @@ estimate_size(PlannerInfo *root, RelOptInfo *baserel, MAXALIGN(sizeof(HeapTupleHeaderData)); ntuples = clamp_row_est((double) stat_buf.st_size / (double) tuple_width); if (fdw_private->is_program) ntuples *= program_compression_ratio; } fdw_private->ntuples = ntuples; Expand Down Expand Up @@ -976,6 +1086,7 @@ file_acquire_sample_rows(Relation onerel, int elevel, bool *nulls; boolfound; char *filename; char *program; List *options; CopyStatecstate; ErrorContextCallback errcallback; Expand All @@ -990,12 +1101,13 @@ file_acquire_sample_rows(Relation onerel, int elevel, nulls = (bool *) palloc(tupDesc->natts * sizeof(bool)); /* Fetch options of foreign table */ fileGetOptions(RelationGetRelid(onerel), &filename, &options); fileGetOptions(RelationGetRelid(onerel), &filename, &program, & options); /* * Create CopyState from FDW options. */ cstate = BeginCopyFrom(onerel, filename, false, NIL, options); cstate = BeginCopyFrom(onerel, (program != NULL) ? program : filename, (program != NULL), NIL, options); /* * Use per-tuple memory context to prevent leak of memory used to read Expand Down