Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commite666847

Browse files
committed
[FLINK-38709][table][python] Fix ScalarFunctionSplitter to allow PythonFunction & AsyncFunction work when taking the recursive field of composite type as input
Thiscloses#27259.
1 parent4fd5bea commite666847

File tree

5 files changed

+174
-108
lines changed

5 files changed

+174
-108
lines changed

‎flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/RemoteCalcSplitRule.scala‎

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,8 @@ class ScalarFunctionSplitter(
434434

435435
privatevarfieldsRexCall:Map[Int,Int]=Map[Int,Int]()
436436

437+
privatevalextractedRexNodeRefs: mutable.HashSet[RexNode]= mutable.HashSet[RexNode]()
438+
437439
overridedefvisitCall(call:RexCall):RexNode= {
438440
if (needConvert(call)) {
439441
getExtractedRexNode(call)
@@ -454,7 +456,9 @@ class ScalarFunctionSplitter(
454456
newRexInputRef(field.getIndex, field.getType)
455457
case _=>
456458
valnewFieldAccess=
457-
rexBuilder.makeFieldAccess(expr.accept(this), fieldAccess.getField.getIndex)
459+
rexBuilder.makeFieldAccess(
460+
convertInputRefToLocalRefIfNecessary(expr.accept(this)),
461+
fieldAccess.getField.getIndex)
458462
getExtractedRexNode(newFieldAccess)
459463
}
460464
}else {
@@ -468,9 +472,18 @@ class ScalarFunctionSplitter(
468472

469473
overridedefvisitNode(rexNode:RexNode):RexNode= rexNode
470474

475+
privatedefconvertInputRefToLocalRefIfNecessary(node:RexNode):RexNode= {
476+
nodematch {
477+
caseinputRef:RexInputRefif extractedRexNodeRefs.contains(node)=>
478+
newRexLocalRef(inputRef.getIndex, node.getType)
479+
case _=> node
480+
}
481+
}
482+
471483
privatedefgetExtractedRexNode(node:RexNode):RexNode= {
472484
valnewNode=newRexInputRef(extractedFunctionOffset+ extractedRexNodes.length, node.getType)
473485
extractedRexNodes.append(node)
486+
extractedRexNodeRefs.add(newNode)
474487
newNode
475488
}
476489

‎flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRuleTest.java‎

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ public void setup() {
6262
+" a int,\n"
6363
+" b bigint,\n"
6464
+" c string,\n"
65-
+" d ARRAY<INT NOT NULL>\n"
65+
+" d ARRAY<INT NOT NULL>,\n"
66+
+" e ROW<f ROW<h int, i double>, g string>"
6667
+") WITH (\n"
6768
+" 'connector' = 'test-simple-table-source'\n"
6869
+") ;");
@@ -182,6 +183,12 @@ public void testFieldAccessAfter() {
182183
util.verifyRelPlan(sqlQuery);
183184
}
184185

186+
@Test
187+
publicvoidtestCompositeFieldAsInput() {
188+
StringsqlQuery ="SELECT func1(e.f.h) from MyTable";
189+
util.verifyRelPlan(sqlQuery);
190+
}
191+
185192
@Test
186193
publicvoidtestFieldOperand() {
187194
StringsqlQuery ="SELECT func1(func5(a).f0) from MyTable";

‎flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCorrelateSplitRuleTest.java‎

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ public void setup() {
6262
+" a int,\n"
6363
+" b bigint,\n"
6464
+" c string,\n"
65-
+" d ARRAY<INT NOT NULL>\n"
65+
+" d ARRAY<INT NOT NULL>,\n"
66+
+" e ROW<f ROW<h int, i double>, g string>\n"
6667
+") WITH (\n"
6768
+" 'connector' = 'test-simple-table-source'\n"
6869
+") ;");
@@ -110,6 +111,12 @@ public void testCorrelateWithCast() {
110111
util.verifyRelPlan(sqlQuery);
111112
}
112113

114+
@Test
115+
publicvoidtestCorrelateWithCompositeFieldAsInput() {
116+
StringsqlQuery ="select * FROM MyTable, LATERAL TABLE(asyncTableFunc(e.f.h))";
117+
util.verifyRelPlan(sqlQuery);
118+
}
119+
113120
/** Test function. */
114121
publicstaticclassAsyncFuncextendsAsyncTableFunction<String> {
115122

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp