Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Add file_fdw support for external decompressors#4

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Closed
jasonmp85 wants to merge9 commits intopostgres:REL9_3_STABLEfromjasonmp85:compressed_fdw_external_program
Closed
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file addedcontrib/file_fdw/data/agg.csv.gz
View file
Open in desktop
Binary file not shown.
Binary file addedcontrib/file_fdw/data/it's_ok.csv.gz
View file
Open in desktop
Binary file not shown.
162 changes: 137 additions & 25 deletionscontrib/file_fdw/file_fdw.c
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -46,6 +46,9 @@ struct FileFdwOption
Oidoptcontext;/* Oid of catalog in which option may appear */
};

/* Totally made-up compression ratio */
static const double program_compression_ratio = 2.7708899835032f;

/*
* Valid options for file_fdw.
* These options are based on the options for COPY FROM command.
Expand All@@ -68,6 +71,7 @@ static const struct FileFdwOption valid_options[] = {
{"escape", ForeignTableRelationId},
{"null", ForeignTableRelationId},
{"encoding", ForeignTableRelationId},
{"decompressor", ForeignTableRelationId},
{"force_not_null", AttributeRelationId},

/*
Expand All@@ -84,6 +88,7 @@ static const struct FileFdwOption valid_options[] = {
typedef struct FileFdwPlanState
{
char *filename;/* file to read */
boolis_program;/* whether a program is used to read the file */
List *options;/* merged COPY options, excluding filename */
BlockNumber pages;/* estimate of file's physical size */
doublentuples;/* estimate of number of rows in file */
Expand All@@ -95,6 +100,7 @@ typedef struct FileFdwPlanState
typedef struct FileFdwExecutionState
{
char *filename;/* file to read */
char *program;/* optional program to use to read file */
List *options;/* merged COPY options, excluding filename */
CopyStatecstate;/* state of reading file */
} FileFdwExecutionState;
Expand DownExpand Up@@ -137,7 +143,7 @@ static bool fileAnalyzeForeignTable(Relation relation,
*/
static bool is_valid_option(const char *option, Oid context);
static void fileGetOptions(Oid foreigntableid,
char **filename, List **other_options);
char **filename,char **program,List **other_options);
static List *get_file_fdw_attribute_options(Oid relid);
static bool check_selective_binary_conversion(RelOptInfo *baserel,
Oid foreigntableid,
Expand DownExpand Up@@ -186,6 +192,7 @@ file_fdw_validator(PG_FUNCTION_ARGS)
List *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
Oidcatalog = PG_GETARG_OID(1);
char *filename = NULL;
char *decompressor = NULL;
DefElem *force_not_null = NULL;
List *other_options = NIL;
ListCell *cell;
Expand DownExpand Up@@ -243,9 +250,9 @@ file_fdw_validator(PG_FUNCTION_ARGS)
}

/*
* Separate out filenameand force_not_null, since ProcessCopyOptions
* won't accept them. (force_not_null only comes in a boolean
* per-column flavor here.)
* Separate out filename, decompressor,and force_not_null, since
*ProcessCopyOptionswon't accept them. (force_not_null only comes in
*a booleanper-column flavor here.)
*/
if (strcmp(def->defname, "filename") == 0)
{
Expand All@@ -255,6 +262,14 @@ file_fdw_validator(PG_FUNCTION_ARGS)
errmsg("conflicting or redundant options")));
filename = defGetString(def);
}
else if (strcmp(def->defname, "decompressor") == 0)
{
if (decompressor)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
decompressor = defGetString(def);
}
else if (strcmp(def->defname, "force_not_null") == 0)
{
if (force_not_null)
Expand All@@ -274,13 +289,28 @@ file_fdw_validator(PG_FUNCTION_ARGS)
*/
ProcessCopyOptions(NULL, true, other_options);

/*
* Filename option is required for file_fdw foreign tables.
*/
if (catalog == ForeignTableRelationId && filename == NULL)
ereport(ERROR,
(errcode(ERRCODE_FDW_DYNAMIC_PARAMETER_VALUE_NEEDED),
errmsg("filename is required for file_fdw foreign tables")));
if (catalog == ForeignTableRelationId)
{
/*
* Filename option is required for file_fdw foreign tables.
*/
if (filename == NULL)
ereport(ERROR,
(errcode(ERRCODE_FDW_DYNAMIC_PARAMETER_VALUE_NEEDED),
errmsg("filename is required for file_fdw foreign tables")));


/*
* Decompressors must be executable.
*/
if (decompressor && (access(decompressor, R_OK | X_OK) != 0))
{
ereport(ERROR,
(errcode_for_file_access(),
errmsg("decompressor must be readable/executable \"%s\": %m",
decompressor)));
}
}

PG_RETURN_VOID();
}
Expand All@@ -305,12 +335,14 @@ is_valid_option(const char *option, Oid context)
/*
* Fetch the options for a file_fdw foreign table.
*
* We have to separate out "filename" from the other options because
* it must not appear in the options list passed to the core COPY code.
* We have to separate out "filename" and "decompressor" from the other options
* because they must not appear in the options passed to the core COPY code. If
* a decompressor is present, a string consisting of it concatenated to the
* escaped file name is stored at `program`.
*/
static void
fileGetOptions(Oid foreigntableid,
char **filename, List **other_options)
char **filename,char **program,List **other_options)
{
ForeignTable *table;
ForeignServer *server;
Expand All@@ -319,6 +351,9 @@ fileGetOptions(Oid foreigntableid,
ListCell *lc,
*prev;

char *decompressor;
char *write_ptr, *token, *input, *read_ptr;

/*
* Extract options from FDW objects. We ignore user mappings because
* file_fdw doesn't have any options that can be specified there.
Expand DownExpand Up@@ -352,6 +387,27 @@ fileGetOptions(Oid foreigntableid,
options = list_delete_cell(options, lc, prev);
break;
}

prev = lc;
}

/*
* Separate out the decompressor, which will be used to calculate program.
*/
decompressor = NULL;
*program = NULL;
prev = NULL;
foreach(lc, options)
{
DefElem *def = (DefElem *) lfirst(lc);

if (strcmp(def->defname, "decompressor") == 0)
{
decompressor = defGetString(def);
options = list_delete_cell(options, lc, prev);
break;
}

prev = lc;
}

Expand All@@ -362,6 +418,41 @@ fileGetOptions(Oid foreigntableid,
if (*filename == NULL)
elog(ERROR, "filename is required for file_fdw foreign tables");

/*
* Set up the decompressor if present.
*/
if (decompressor != NULL)
{
/*
* We will escape the filename by wrapping it in single quotes. To deal
* with single quotes in the name itself, we will replace all single
* quotes with the string "'\''", which is four characters long. Strings
* of only single quotes will need four times as much space, plus the
* room for the quotes, a space, and a null terminator.
*/
*program = palloc0(
(strlen(decompressor) + (4 * strlen(*filename)) + 4)
* sizeof(char));

write_ptr = stpcpy(*program, decompressor);
write_ptr = stpcpy(write_ptr, " '");

/* We're mutating filename so copy it */
input = read_ptr = pstrdup(*filename);

write_ptr = stpcpy(write_ptr, strsep(&read_ptr, "'"));

while ((token = strsep(&read_ptr, "'")) != NULL)
{
write_ptr = stpcpy(write_ptr, "'\\''");
write_ptr = stpcpy(write_ptr, token);
}

stpcpy(write_ptr, "'");

pfree(input);
}

*other_options = options;
}

Expand DownExpand Up@@ -433,14 +524,16 @@ fileGetForeignRelSize(PlannerInfo *root,
Oid foreigntableid)
{
FileFdwPlanState *fdw_private;
char *program;

/*
* Fetch options. We only need filename at this point, but we might as
* well get everything and not need to re-fetch it later in planning.
*/
fdw_private = (FileFdwPlanState *) palloc(sizeof(FileFdwPlanState));
fileGetOptions(foreigntableid,
&fdw_private->filename, &fdw_private->options);
fileGetOptions(foreigntableid, &fdw_private->filename, &program,
&fdw_private->options);
fdw_private->is_program = (program != NULL);
baserel->fdw_private = (void *) fdw_private;

/* Estimate relation size */
Expand DownExpand Up@@ -537,14 +630,22 @@ static void
fileExplainForeignScan(ForeignScanState *node, ExplainState *es)
{
char *filename;
char *program;
List *options;

/* Fetch options --- we only need filename at this point */
fileGetOptions(RelationGetRelid(node->ss.ss_currentRelation),
&filename, &options);
&filename, &program, &options);

ExplainPropertyText("Foreign File", filename, es);

if (program != NULL)
{
ExplainPropertyText("Foreign Program", program, es);
ExplainPropertyFloat("Foreign Program Compression Est.",
program_compression_ratio, 4, es);
}

/* Suppress file size if we're not showing cost details */
if (es->costs)
{
Expand All@@ -565,6 +666,7 @@ fileBeginForeignScan(ForeignScanState *node, int eflags)
{
ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
char *filename;
char *program;
List *options;
CopyStatecstate;
FileFdwExecutionState *festate;
Expand All@@ -577,7 +679,7 @@ fileBeginForeignScan(ForeignScanState *node, int eflags)

/* Fetch options of foreign table */
fileGetOptions(RelationGetRelid(node->ss.ss_currentRelation),
&filename, &options);
&filename, &program, &options);

/* Add any options from the plan (currently only convert_selectively) */
options = list_concat(options, plan->fdw_private);
Expand All@@ -587,8 +689,8 @@ fileBeginForeignScan(ForeignScanState *node, int eflags)
* as to match the expected ScanTupleSlot signature.
*/
cstate = BeginCopyFrom(node->ss.ss_currentRelation,
filename,
false,
(program != NULL) ? program :filename,
(program != NULL),
NIL,
options);

Expand All@@ -598,6 +700,7 @@ fileBeginForeignScan(ForeignScanState *node, int eflags)
*/
festate = (FileFdwExecutionState *) palloc(sizeof(FileFdwExecutionState));
festate->filename = filename;
festate->program = program;
festate->options = options;
festate->cstate = cstate;

Expand DownExpand Up@@ -656,12 +759,16 @@ static void
fileReScanForeignScan(ForeignScanState *node)
{
FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
char *filename_or_program;

EndCopyFrom(festate->cstate);

filename_or_program =
(festate->program != NULL) ? festate->program : festate->filename;

festate->cstate = BeginCopyFrom(node->ss.ss_currentRelation,
festate->filename,
false,
filename_or_program,
(festate->program != NULL),
NIL,
festate->options);
}
Expand DownExpand Up@@ -690,11 +797,12 @@ fileAnalyzeForeignTable(Relation relation,
BlockNumber *totalpages)
{
char *filename;
char *program;
List *options;
struct stat stat_buf;

/* Fetch options of foreign table */
fileGetOptions(RelationGetRelid(relation), &filename, &options);
fileGetOptions(RelationGetRelid(relation), &filename, &program, &options);

/*
* Get size of the file. (XXX if we fail here, would it be better to just
Expand DownExpand Up@@ -900,6 +1008,8 @@ estimate_size(PlannerInfo *root, RelOptInfo *baserel,
MAXALIGN(sizeof(HeapTupleHeaderData));
ntuples = clamp_row_est((double) stat_buf.st_size /
(double) tuple_width);
if (fdw_private->is_program)
ntuples *= program_compression_ratio;
}
fdw_private->ntuples = ntuples;

Expand DownExpand Up@@ -976,6 +1086,7 @@ file_acquire_sample_rows(Relation onerel, int elevel,
bool *nulls;
boolfound;
char *filename;
char *program;
List *options;
CopyStatecstate;
ErrorContextCallback errcallback;
Expand All@@ -990,12 +1101,13 @@ file_acquire_sample_rows(Relation onerel, int elevel,
nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));

/* Fetch options of foreign table */
fileGetOptions(RelationGetRelid(onerel), &filename, &options);
fileGetOptions(RelationGetRelid(onerel), &filename, &program, &options);

/*
* Create CopyState from FDW options.
*/
cstate = BeginCopyFrom(onerel, filename, false, NIL, options);
cstate = BeginCopyFrom(onerel, (program != NULL) ? program : filename,
(program != NULL), NIL, options);

/*
* Use per-tuple memory context to prevent leak of memory used to read
Expand Down
Loading

[8]ページ先頭

©2009-2025 Movatter.jp