Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commita1c4c8a

Browse files
committed
file_fdw: Add on_error and log_verbosity options to file_fdw.
In v17, the on_error and log_verbosity options were introduced forthe COPY command. This commit extends support for these optionsto file_fdw.Setting on_error = 'ignore' for a file_fdw foreign table allows usersto query it without errors, even when the input file containsmalformed rows, by skipping the problematic rows.Both on_error and log_verbosity options apply to SELECT and ANALYZEoperations on file_fdw foreign tables.Author: Atsushi TorikoshiReviewed-by: Masahiko Sawada, Fujii MasaoDiscussion:https://postgr.es/m/ab59dad10490ea3734cf022b16c24cfd@oss.nttdata.com
1 parente7834a1 commita1c4c8a

File tree

4 files changed

+140
-16
lines changed

4 files changed

+140
-16
lines changed

‎contrib/file_fdw/expected/file_fdw.out

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,25 @@ SELECT * FROM agg_csv c JOIN agg_text t ON (t.a = c.a) ORDER BY c.a;
206206
SELECT * FROM agg_bad; -- ERROR
207207
ERROR: invalid input syntax for type real: "aaa"
208208
CONTEXT: COPY agg_bad, line 3, column b: "aaa"
209+
-- on_error and log_verbosity tests
210+
ALTER FOREIGN TABLE agg_bad OPTIONS (ADD on_error 'ignore');
211+
SELECT * FROM agg_bad;
212+
NOTICE: 1 row was skipped due to data type incompatibility
213+
a | b
214+
-----+--------
215+
100 | 99.097
216+
42 | 324.78
217+
(2 rows)
218+
219+
ALTER FOREIGN TABLE agg_bad OPTIONS (ADD log_verbosity 'silent');
220+
SELECT * FROM agg_bad;
221+
a | b
222+
-----+--------
223+
100 | 99.097
224+
42 | 324.78
225+
(2 rows)
226+
227+
ANALYZE agg_bad;
209228
-- misc query tests
210229
\t on
211230
SELECT explain_filter('EXPLAIN (VERBOSE, COSTS FALSE) SELECT * FROM agg_csv');

‎contrib/file_fdw/file_fdw.c

Lines changed: 91 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include"catalog/pg_authid.h"
2323
#include"catalog/pg_foreign_table.h"
2424
#include"commands/copy.h"
25+
#include"commands/copyfrom_internal.h"
2526
#include"commands/defrem.h"
2627
#include"commands/explain.h"
2728
#include"commands/vacuum.h"
@@ -74,6 +75,8 @@ static const struct FileFdwOption valid_options[] = {
7475
{"null",ForeignTableRelationId},
7576
{"default",ForeignTableRelationId},
7677
{"encoding",ForeignTableRelationId},
78+
{"on_error",ForeignTableRelationId},
79+
{"log_verbosity",ForeignTableRelationId},
7780
{"force_not_null",AttributeRelationId},
7881
{"force_null",AttributeRelationId},
7982

@@ -723,38 +726,74 @@ fileIterateForeignScan(ForeignScanState *node)
723726
FileFdwExecutionState*festate= (FileFdwExecutionState*)node->fdw_state;
724727
EState*estate=CreateExecutorState();
725728
ExprContext*econtext;
726-
MemoryContextoldcontext;
729+
MemoryContextoldcontext=CurrentMemoryContext;
727730
TupleTableSlot*slot=node->ss.ss_ScanTupleSlot;
728-
boolfound;
731+
CopyFromStatecstate=festate->cstate;
729732
ErrorContextCallbackerrcallback;
730733

731734
/* Set up callback to identify error line number. */
732735
errcallback.callback=CopyFromErrorCallback;
733-
errcallback.arg= (void*)festate->cstate;
736+
errcallback.arg= (void*)cstate;
734737
errcallback.previous=error_context_stack;
735738
error_context_stack=&errcallback;
736739

737740
/*
738-
* The protocol for loading a virtual tuple into a slot is first
739-
* ExecClearTuple, then fill the values/isnull arrays, then
740-
* ExecStoreVirtualTuple. If we don't find another row in the file, we
741-
* just skip the last step, leaving the slot empty as required.
742-
*
743741
* We pass ExprContext because there might be a use of the DEFAULT option
744742
* in COPY FROM, so we may need to evaluate default expressions.
745743
*/
746-
ExecClearTuple(slot);
747744
econtext=GetPerTupleExprContext(estate);
748745

746+
retry:
747+
749748
/*
750749
* DEFAULT expressions need to be evaluated in a per-tuple context, so
751750
* switch in case we are doing that.
752751
*/
753-
oldcontext=MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
754-
found=NextCopyFrom(festate->cstate,econtext,
755-
slot->tts_values,slot->tts_isnull);
756-
if (found)
752+
MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
753+
754+
/*
755+
* The protocol for loading a virtual tuple into a slot is first
756+
* ExecClearTuple, then fill the values/isnull arrays, then
757+
* ExecStoreVirtualTuple. If we don't find another row in the file, we
758+
* just skip the last step, leaving the slot empty as required.
759+
*
760+
*/
761+
ExecClearTuple(slot);
762+
763+
if (NextCopyFrom(cstate,econtext,slot->tts_values,slot->tts_isnull))
764+
{
765+
if (cstate->opts.on_error==COPY_ON_ERROR_IGNORE&&
766+
cstate->escontext->error_occurred)
767+
{
768+
/*
769+
* Soft error occurred, skip this tuple and just make
770+
* ErrorSaveContext ready for the next NextCopyFrom. Since we
771+
* don't set details_wanted and error_data is not to be filled,
772+
* just resetting error_occurred is enough.
773+
*/
774+
cstate->escontext->error_occurred= false;
775+
776+
/* Switch back to original memory context */
777+
MemoryContextSwitchTo(oldcontext);
778+
779+
/*
780+
* Make sure we are interruptible while repeatedly calling
781+
* NextCopyFrom() until no soft error occurs.
782+
*/
783+
CHECK_FOR_INTERRUPTS();
784+
785+
/*
786+
* Reset the per-tuple exprcontext, to clean-up after expression
787+
* evaluations etc.
788+
*/
789+
ResetPerTupleExprContext(estate);
790+
791+
/* Repeat NextCopyFrom() until no soft error occurs */
792+
gotoretry;
793+
}
794+
757795
ExecStoreVirtualTuple(slot);
796+
}
758797

759798
/* Switch back to original memory context */
760799
MemoryContextSwitchTo(oldcontext);
@@ -796,8 +835,19 @@ fileEndForeignScan(ForeignScanState *node)
796835
FileFdwExecutionState*festate= (FileFdwExecutionState*)node->fdw_state;
797836

798837
/* if festate is NULL, we are in EXPLAIN; nothing to do */
799-
if (festate)
800-
EndCopyFrom(festate->cstate);
838+
if (!festate)
839+
return;
840+
841+
if (festate->cstate->opts.on_error==COPY_ON_ERROR_IGNORE&&
842+
festate->cstate->num_errors>0&&
843+
festate->cstate->opts.log_verbosity >=COPY_LOG_VERBOSITY_DEFAULT)
844+
ereport(NOTICE,
845+
errmsg_plural("%llu row was skipped due to data type incompatibility",
846+
"%llu rows were skipped due to data type incompatibility",
847+
(unsigned long long)festate->cstate->num_errors,
848+
(unsigned long long)festate->cstate->num_errors));
849+
850+
EndCopyFrom(festate->cstate);
801851
}
802852

803853
/*
@@ -1113,7 +1163,8 @@ estimate_costs(PlannerInfo *root, RelOptInfo *baserel,
11131163
* which must have at least targrows entries.
11141164
* The actual number of rows selected is returned as the function result.
11151165
* We also count the total number of rows in the file and return it into
1116-
* *totalrows. Note that *totaldeadrows is always set to 0.
1166+
* *totalrows. Rows skipped due to on_error = 'ignore' are not included
1167+
* in this count. Note that *totaldeadrows is always set to 0.
11171168
*
11181169
* Note that the returned list of rows is not always in order by physical
11191170
* position in the file. Therefore, correlation estimates derived later
@@ -1191,6 +1242,21 @@ file_acquire_sample_rows(Relation onerel, int elevel,
11911242
if (!found)
11921243
break;
11931244

1245+
if (cstate->opts.on_error==COPY_ON_ERROR_IGNORE&&
1246+
cstate->escontext->error_occurred)
1247+
{
1248+
/*
1249+
* Soft error occurred, skip this tuple and just make
1250+
* ErrorSaveContext ready for the next NextCopyFrom. Since we
1251+
* don't set details_wanted and error_data is not to be filled,
1252+
* just resetting error_occurred is enough.
1253+
*/
1254+
cstate->escontext->error_occurred= false;
1255+
1256+
/* Repeat NextCopyFrom() until no soft error occurs */
1257+
continue;
1258+
}
1259+
11941260
/*
11951261
* The first targrows sample rows are simply copied into the
11961262
* reservoir. Then we start replacing tuples in the sample until we
@@ -1236,6 +1302,15 @@ file_acquire_sample_rows(Relation onerel, int elevel,
12361302
/* Clean up. */
12371303
MemoryContextDelete(tupcontext);
12381304

1305+
if (cstate->opts.on_error==COPY_ON_ERROR_IGNORE&&
1306+
cstate->num_errors>0&&
1307+
cstate->opts.log_verbosity >=COPY_LOG_VERBOSITY_DEFAULT)
1308+
ereport(NOTICE,
1309+
errmsg_plural("%llu row was skipped due to data type incompatibility",
1310+
"%llu rows were skipped due to data type incompatibility",
1311+
(unsigned long long)cstate->num_errors,
1312+
(unsigned long long)cstate->num_errors));
1313+
12391314
EndCopyFrom(cstate);
12401315

12411316
pfree(values);

‎contrib/file_fdw/sql/file_fdw.sql

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,13 @@ SELECT * FROM agg_csv c JOIN agg_text t ON (t.a = c.a) ORDER BY c.a;
150150
-- error context report tests
151151
SELECT * FROM agg_bad; -- ERROR
152152
153+
-- on_error and log_verbosity tests
154+
ALTER FOREIGN TABLE agg_bad OPTIONS (ADD on_error'ignore');
155+
SELECT * FROM agg_bad;
156+
ALTER FOREIGN TABLE agg_bad OPTIONS (ADD log_verbosity'silent');
157+
SELECT * FROM agg_bad;
158+
ANALYZE agg_bad;
159+
153160
-- misc query tests
154161
\t on
155162
SELECT explain_filter('EXPLAIN (VERBOSE, COSTS FALSE)SELECT*FROM agg_csv');

‎doc/src/sgml/file-fdw.sgml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,29 @@
126126
</listitem>
127127
</varlistentry>
128128

129+
<varlistentry>
130+
<term><literal>on_error</literal></term>
131+
132+
<listitem>
133+
<para>
134+
Specifies how to behave when encountering an error converting a column's
135+
input value into its data type,
136+
the same as <command>COPY</command>'s <literal>ON_ERROR</literal> option.
137+
</para>
138+
</listitem>
139+
</varlistentry>
140+
141+
<varlistentry>
142+
<term><literal>log_verbosity</literal></term>
143+
144+
<listitem>
145+
<para>
146+
Specifies the amount of messages emitted by <literal>file_fdw</literal>,
147+
the same as <command>COPY</command>'s <literal>LOG_VERBOSITY</literal> option.
148+
</para>
149+
</listitem>
150+
</varlistentry>
151+
129152
</variablelist>
130153

131154
<para>

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp