Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit8672743

Browse files
authored
fix: resolve nested step dependency and argument inheritance issues (#258)
This commit addresses multiple related issues with nested step dependenciesand argument inheritance in Reactor:**Core Issues Fixed:**- Steps inside nested contexts (around, group, switch, map) couldn't depend on parent scope results- Compose steps inside maps couldn't access the map's arguments- Missing dependency tracking for cross-scope references**Key Changes:**1. **Enhanced Planner (lib/reactor/planner.ex)**: - Added extract_nested_dependencies() to identify cross-scope dependencies - Added support for nested step dependency tracking via new nested_steps/1 callback - Properly handles result() references from nested steps to parent scope2. **Improved Map Step (lib/reactor/step/map.ex)**: - Implemented nested_steps/1 callback to expose contained steps to planner - Enhanced argument inheritance for nested compose steps - Added support for cross-scope element references - Fixed element argument resolution for nested contexts3. **Enhanced Step Protocol (lib/reactor/step.ex)**: - Added optional nested_steps/1 callback for steps containing other steps - Provides default empty implementation via use Reactor.Step4. **Map DSL Improvements (lib/reactor/dsl/map.ex)**: - Added context passing for argument inheritance - Compose steps now automatically inherit map arguments when not explicitly provided - Supports nested map contexts with proper argument propagation5. **Runtime Support (lib/reactor/executor/step_runner.ex)**: - Added nested dependency collection and context passing - Enables runtime access to cross-scope dependencies6. **Compose Builder Fix (lib/reactor/builder/compose.ex)**: - Improved error handling for missing arguments - Better error reporting when both extra and missing arguments exist**Test Coverage:**- Added comprehensive tests for nested dependency resolution- Added tests for compose steps in maps with argument inheritance- Added tests for cross-scope dependency scenarios- Verified proper error handling for invalid patterns**Backwards Compatibility:**- All changes are backwards compatible- Existing functionality unchanged- New features are opt-in via the nested_steps callbackFixes issues with compose steps in maps not having access to map argumentsand resolves dependency planning issues for nested step contexts.
1 parent981aea1 commit8672743

File tree

9 files changed

+639
-26
lines changed

9 files changed

+639
-26
lines changed

‎lib/reactor/builder/compose.ex‎

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,10 @@ defmodule Reactor.Builder.Compose do
113113

114114
{[],missing_args}->
115115
{:error,{:missing_args,inputs,missing_args}}
116+
117+
{_extra_args,missing_args}->
118+
# Both extra and missing args - report missing args as primary issue
119+
{:error,{:missing_args,inputs,missing_args}}
116120
end
117121
end
118122
end

‎lib/reactor/dsl/map.ex‎

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,9 +167,16 @@ defmodule Reactor.Dsl.Map do
167167
aliasSpark.{Dsl.Verifier,Error.DslError}
168168

169169
defbuild(map,reactor)do
170+
build(map,reactor,%{map_arguments:[]})
171+
end
172+
173+
defbuild(map,reactor,context)do
170174
sub_reactor=Builder.new(reactor.id)
171175

172-
with{:ok,sub_reactor}<-build_steps(sub_reactor,map)do
176+
# Add current map's arguments to the context for nested steps
177+
context=%{context|map_arguments:context.map_arguments++map.arguments}
178+
179+
with{:ok,sub_reactor}<-build_steps(sub_reactor,map,context)do
173180
arguments=
174181
map.arguments
175182
|>Enum.concat([Argument.from_template(:source,map.source)])
@@ -244,9 +251,35 @@ defmodule Reactor.Dsl.Map do
244251
end
245252
end
246253

247-
defpbuild_steps(reactor,map)do
254+
defpbuild_steps(reactor,map,context)do
248255
map.steps
249-
|>reduce_while_ok(reactor,&Dsl.Build.build/2)
256+
|>reduce_while_ok(reactor,fnstep,reactor->
257+
build_step_with_context(step,reactor,context)
258+
end)
259+
end
260+
261+
defpbuild_step_with_context(%Dsl.Compose{}=compose,reactor,context)do
262+
# For compose steps, add the map's arguments to the compose arguments
263+
# if they're not already present
264+
inherited_args=context.map_arguments
265+
explicit_arg_names=MapSet.new(compose.arguments,&&1.name)
266+
267+
additional_args=
268+
inherited_args
269+
|>Enum.reject(fnarg->MapSet.member?(explicit_arg_names,arg.name)end)
270+
271+
compose_with_args=%{compose|arguments:compose.arguments++additional_args}
272+
Dsl.Build.build(compose_with_args,reactor)
273+
end
274+
275+
defpbuild_step_with_context(%Dsl.Map{}=nested_map,reactor,context)do
276+
# For nested maps, pass the context down
277+
build(nested_map,reactor,context)
278+
end
279+
280+
defpbuild_step_with_context(step,reactor,_context)do
281+
# For regular steps, use normal build
282+
Dsl.Build.build(step,reactor)
250283
end
251284
end
252285
end

‎lib/reactor/executor/step_runner.ex‎

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,9 @@ defmodule Reactor.Executor.StepRunner do
445445
maxwhenis_integer(max)andmax>=0->max-current_try
446446
end
447447

448+
# Collect nested dependencies for this step
449+
nested_dependencies=collect_nested_dependencies(reactor,step)
450+
448451
context=
449452
step.context
450453
|>deep_merge(reactor.context)
@@ -453,14 +456,38 @@ defmodule Reactor.Executor.StepRunner do
453456
concurrency_key:concurrency_key,
454457
current_try:current_try,
455458
retries_remaining:retries_remaining,
456-
async?:state.async?
459+
async?:state.async?,
460+
nested_dependencies:nested_dependencies
457461
})
458462
|>Map.put(:current_step,step)
459463
|>Map.put(:concurrency_key,concurrency_key)
460464

461465
{:ok,context}
462466
end
463467

468+
defpcollect_nested_dependencies(reactor,step)do
469+
# Find all edges to this step that are nested dependencies
470+
casereactor.plando
471+
nil->
472+
%{}
473+
474+
graph->
475+
graph
476+
|>Graph.in_edges(step)
477+
|>Enum.filter(fn
478+
{_,_,{:nested_dependency,_,_,:for,_}}->true
479+
_->false
480+
end)
481+
|>Enum.group_by(
482+
fn{_,_,{:nested_dependency,nested_step,_,:for,_}}->nested_stepend,
483+
fn{source_step,_,{:nested_dependency,_,arg_name,:for,_}}->
484+
{arg_name,Map.get(reactor.intermediate_results,source_step.name)}
485+
end
486+
)
487+
|>Map.new(fn{nested_step,args}->{nested_step,Map.new(args)}end)
488+
end
489+
end
490+
464491
defpmaybe_replace_arguments(arguments,context)whenis_nil(context.private.replace_arguments),
465492
do:{:ok,arguments}
466493

‎lib/reactor/planner.ex‎

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,15 @@ defmodule Reactor.Planner do
7979
end
8080

8181
defpreduce_arguments_into_graph(graph,current_step,steps_by_name,intermediate_results)do
82+
# First handle regular arguments
83+
with{:ok,graph}<-
84+
add_regular_dependencies(graph,current_step,steps_by_name,intermediate_results)do
85+
# Then handle nested step dependencies
86+
add_nested_dependencies(graph,current_step,steps_by_name,intermediate_results)
87+
end
88+
end
89+
90+
defpadd_regular_dependencies(graph,current_step,steps_by_name,intermediate_results)do
8291
reduce_while_ok(current_step.arguments,graph,fn
8392
argument,graph
8493
whenis_from_result(argument)andis_map_key(intermediate_results,argument.source.name)->
@@ -107,7 +116,75 @@ defmodule Reactor.Planner do
107116
)}
108117
end
109118

110-
argument,graphwhenis_from_input(argument)oris_from_value(argument)->
119+
argument,graph
120+
whenis_from_input(argument)oris_from_value(argument)oris_from_element(argument)->
121+
{:ok,graph}
122+
end)
123+
end
124+
125+
defpadd_nested_dependencies(graph,current_step,steps_by_name,intermediate_results)do
126+
nested_steps=Step.nested_steps(current_step)
127+
128+
reduce_while_ok(nested_steps,graph,fnnested_step,graph->
129+
add_nested_step_dependencies(
130+
graph,
131+
current_step,
132+
nested_step,
133+
steps_by_name,
134+
intermediate_results
135+
)
136+
end)
137+
end
138+
139+
defpadd_nested_step_dependencies(
140+
graph,
141+
containing_step,
142+
nested_step,
143+
steps_by_name,
144+
_intermediate_results
145+
)do
146+
reduce_while_ok(nested_step.arguments,graph,fn
147+
argument,graphwhenis_from_result(argument)->
148+
dependency_name=argument.source.name
149+
150+
# Check if this is a cross-scope dependency (not in nested steps)
151+
caseMap.fetch(steps_by_name,dependency_name)do
152+
{:ok,dependency}whendependency.name!=containing_step.name->
153+
# This is a cross-scope dependency - add it as a nested dependency edge
154+
{:ok,
155+
Graph.add_edge(graph,dependency,containing_step,
156+
label:
157+
{:nested_dependency,nested_step.name,argument.name,:for,containing_step.name}
158+
)}
159+
160+
_->
161+
# Either not found or is the containing step itself
162+
{:ok,graph}
163+
end
164+
165+
argument,graphwhenis_from_element(argument)->
166+
# For element references, we need to track them as nested dependencies
167+
# if they refer to a parent/outer map
168+
element_name=argument.source.name
169+
170+
# Check if this element refers to a map step that's not the containing step
171+
caseMap.fetch(steps_by_name,element_name)do
172+
{:ok,map_step}whenmap_step.name!=containing_step.name->
173+
# This is a cross-scope element reference - track it as a nested dependency
174+
# We'll need to pass the element value through the context
175+
{:ok,
176+
Graph.add_edge(graph,map_step,containing_step,
177+
label:
178+
{:nested_element_dependency,nested_step.name,argument.name,element_name,:for,
179+
containing_step.name}
180+
)}
181+
182+
_->
183+
# Either the containing step itself or not found
184+
{:ok,graph}
185+
end
186+
187+
_argument,graph->
111188
{:ok,graph}
112189
end)
113190
end

‎lib/reactor/step.ex‎

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,29 @@ defmodule Reactor.Step do
196196
"""
197197
@callbackasync?(step::Step.t())::boolean
198198

199-
@optional_callbackscompensate:4,undo:4
199+
@doc"""
200+
Extract nested steps from the step's options.
201+
202+
> This callback is automatically defined by `use Reactor.Step` however you're
203+
> free to override it if you need specific behaviour.
204+
205+
This callback is called during the planning phase to extract any nested steps
206+
that may be contained within this step's options. This allows the planner to
207+
identify cross-scope dependencies and properly track them in the dependency graph.
208+
209+
The default implementation returns an empty list.
210+
211+
## Arguments
212+
213+
- `options` - the keyword list of options provided to the step.
214+
215+
## Return values
216+
217+
- A list of nested `Reactor.Step` structs contained within this step.
218+
"""
219+
@callbacknested_steps(options::keyword)::[Step.t()]
220+
221+
@optional_callbackscompensate:4,undo:4,nested_steps:1
200222

201223
@doc"""
202224
Find out of a step has a capability.
@@ -249,6 +271,20 @@ defmodule Reactor.Step do
249271
defasync?(step),
250272
do:module_and_options_from_step(step,fnmodule,_opts->module.async?(step)end)
251273

274+
@doc"""
275+
Extract nested steps from a step.
276+
"""
277+
@specnested_steps(Step.t())::[Step.t()]
278+
defnested_steps(step)do
279+
module_and_options_from_step(step,fnmodule,options->
280+
iffunction_exported?(module,:nested_steps,1)do
281+
module.nested_steps(options)
282+
else
283+
[]
284+
end
285+
end)
286+
end
287+
252288
defpmodule_and_options_from_step(%{impl:{module,options}}=step,fun)
253289
whenis_struct(step,Step)andis_atom(module)andis_list(options)andis_function(fun,2),
254290
do:fun.(module,options)
@@ -277,7 +313,11 @@ defmodule Reactor.Step do
277313
defasync?(%{async?:fun})whenis_function(fun,1),do:fun.([])
278314
defasync?(_),do:false
279315

280-
defoverridablecan?:2,async?:1
316+
@docfalse
317+
@implunquote(__MODULE__)
318+
defnested_steps(_options),do:[]
319+
320+
defoverridablecan?:2,async?:1,nested_steps:1
281321
end
282322
end
283323
end

‎lib/reactor/step/map.ex‎

Lines changed: 45 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
defmoduleReactor.Step.Mapdo
22
useReactor.Step
33
requireReactor.Argument
4-
requireReactor.Error.Internal.UnreachableError
54
requireIter
6-
aliasReactor.{Argument,Builder,Error.Internal.UnreachableError,Step,Template}
5+
aliasReactor.{Argument,Builder,Step,Template}
76
aliasSpark.Options
87
importReactor.Utils
98

@@ -91,28 +90,41 @@ defmodule Reactor.Step.Map do
9190
defrun(arguments,context,options)do
9291
with{:ok,options}<-Options.validate(options,@option_schema)do
9392
caseoptions[:state]do
94-
:init->do_init(arguments.source,arguments,options,context.current_step)
93+
:init->do_init(arguments.source,arguments,options,context.current_step,context)
9594
:iterating->do_iterate(arguments,options,context.current_step)
9695
end
9796
end
9897
end
9998

99+
@docfalse
100+
@impltrue
101+
defnested_steps(options)do
102+
Keyword.get(options,:steps,[])
103+
end
104+
100105
@docfalse
101106
@impltrue
102107
defto_mermaid(step,options),do:__MODULE__.Mermaid.to_mermaid(step,options)
103108

104-
defpdo_init(source,arguments,options,map_step)whenIter.is_iter(source)do
109+
defpdo_init(source,arguments,options,map_step,context)whenIter.is_iter(source)do
105110
source=
106111
source
107112
|>Iter.with_index()
108113

109-
extra_arguments=
114+
# Collect regular extra arguments
115+
regular_extra_arguments=
110116
arguments
111117
|>Map.drop([:source,:result])
112118
|>Enum.map(fn{name,value}->
113119
Argument.from_value(name,value)
114120
end)
115121

122+
# Collect nested dependency arguments
123+
nested_dep_arguments=build_nested_dependency_arguments(context)
124+
125+
# Merge both sets of extra arguments
126+
extra_arguments=regular_extra_arguments++nested_dep_arguments
127+
116128
options=
117129
options
118130
|>Keyword.put_new_lazy(:descendant_step_names,fn->
@@ -124,10 +136,23 @@ defmodule Reactor.Step.Map do
124136
emit_batch(source,options,map_step,[])
125137
end
126138

127-
defpdo_init(source,arguments,options,map_step)do
139+
defpdo_init(source,arguments,options,map_step,context)do
128140
source
129141
|>Iter.from()
130-
|>do_init(arguments,options,map_step)
142+
|>do_init(arguments,options,map_step,context)
143+
end
144+
145+
defpbuild_nested_dependency_arguments(context)do
146+
nested_deps=Map.get(context,:nested_dependencies,%{})
147+
148+
# Flatten all nested dependencies into arguments
149+
nested_deps
150+
|>Enum.flat_map(fn{_nested_step,args}->
151+
Enum.map(args,fn{arg_name,value}->
152+
Argument.from_value(arg_name,value)
153+
end)
154+
end)
155+
|>Enum.uniq_by(&&1.name)
131156
end
132157

133158
defpdo_iterate(arguments,options,map_step)do
@@ -289,20 +314,21 @@ defmodule Reactor.Step.Map do
289314

290315
defprewrite_arguments(step,{element,index},descendant_step_names,map_step)do
291316
map_while_ok(step.arguments,fn
292-
argument
293-
whenArgument.is_from_element(argument)andargument.source.name!=map_step.name->
294-
{:error,
295-
UnreachableError.unreachable(
296-
"Attempted to retrieve an element whose source doesn't match the current map step:#{inspect(argument.source.name)} vs#{inspect(map_step.name)}"
297-
)}
298-
299317
argumentwhenArgument.is_from_element(argument)->
300-
argument=
301-
argument.name
302-
|>Argument.from_value(element)
303-
|>Argument.sub_path(argument.source.sub_path)
318+
# Check if this element reference is for the current map or should be inherited
319+
ifargument.source.name==map_step.namedo
320+
# This is for the current map - replace with the element value
321+
argument=
322+
argument.name
323+
|>Argument.from_value(element)
324+
|>Argument.sub_path(argument.source.sub_path)
304325

305-
{:ok,argument}
326+
{:ok,argument}
327+
else
328+
# This is for a parent/outer map - it should have been provided as an extra_argument
329+
# Keep it as-is, it will be resolved from extra_arguments
330+
{:ok,argument}
331+
end
306332

307333
argumentwhenArgument.is_from_result(argument)->
308334
ifMapSet.member?(descendant_step_names,argument.source.name)do

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp