Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit9514a3c

Browse files
committed
remove map arg to process, use map->step to convert map first. Put full describe result in process datafy as :desc and remove components
1 parent0cb226c commit9514a3c

File tree

3 files changed

+73
-54
lines changed

3 files changed

+73
-54
lines changed

‎deps.edn

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,4 @@
3333
:output-path"docs"
3434
:html {:namespace-list:flat}}}
3535
}}
36+

‎src/main/clojure/clojure/core/async/flow.clj

Lines changed: 61 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -142,27 +142,24 @@
142142
[g [pid io-id:as coord] msgs] (g/inject g coord msgs))
143143

144144
(defnprocess
145-
"Given a function of four arities (0-3), aka the 'step-fn', or a map
146-
of functions corresponding thereto (described below), returns a
147-
launcher that creates a process compliant with the process
145+
"Given a function of four arities (0-3), aka the 'step-fn',
146+
returns a launcher that creates a process compliant with the process
148147
protocol (see the spi/ProcLauncher doc).
149148
150-
The possible arities/entries for the step-fn/map are
149+
The possible arities for the step-fn are
151150
152-
0 -:describe,
153-
1 -:init,
154-
2 -:transition
155-
3 -:transform.
151+
0 -'describe', () -> description
152+
1 -'init', (arg-map) -> initial-state
153+
2 -'transition', (state transition) -> state'
154+
3 -'transform', (state input msg) -> [state' output-map]
156155
157156
This is the core facility for defining the logic for processes via
158157
ordinary functions. Using a var holding a fn as the 'step-fn' is the
159158
preferred method for defining a proc, as it enables
160159
hot-code-reloading of the proc logic in a flow, and better names in
161-
datafy. You can use the map form to compose the proc logic from
162-
disparate functions or to leverage the optionality of some of the
163-
entry points.
160+
datafy.
164161
165-
arity 0, or :describe - required, () -> description
162+
arity 0 - 'describe', () -> description
166163
where description is a map with keys :params :ins and :outs, each of which
167164
in turn is a map of keyword to doc string, and :workload with
168165
possible values of :mixed :io :compute. All entries in the describe
@@ -182,14 +179,13 @@
182179
the proc. It will also be called by the impl in order to discover
183180
what channels are needed.
184181
185-
arity 1, or :init - optional, (arg-map) -> initial-state
182+
arity 1 - 'init', (arg-map) -> initial-state
186183
187-
init will be called once by the process to establish any initial
184+
Theinit arity will be called once by the process to establish any initial
188185
state. The arg-map will be a map of param->val, as supplied in the
189186
flow def. The key ::flow/pid will be added, mapped to the pid
190187
associated with the process (useful e.g. if the process wants to
191-
refer to itself in reply-to coordinates). init must be provided if
192-
'describe' returns :params.
188+
refer to itself in reply-to coordinates).
193189
194190
Optionally, a returned init state may contain the
195191
keys ::flow/in-ports and/or ::flow/out-ports. These should be maps
@@ -200,34 +196,35 @@
200196
outside of it. Use :transition to coordinate the lifecycle of these
201197
external channels.
202198
203-
Optionally, _any_ returned state, whether from:init,:transition
204-
or:transform, may contain the key ::flow/input-filter, a predicate
199+
Optionally, _any_ returned state, whether from init, transition
200+
or transform, may contain the key ::flow/input-filter, a predicate
205201
of cid. Only inputs (including in-ports) satisfying the predicate
206202
will be part of the next channel read set. In the absence of this
207203
predicate all inputs are read.
208204
209-
arity 2, or :transition - optional, (state transition) -> state'
205+
arity 2 - 'transition', (state transition) -> state'
210206
211-
transition will be called when the process makes a state transition,
212-
transition being one of ::flow/resume, ::flow/pause or ::flow/stop
207+
The transition arity will be called when the process makes a state
208+
transition, transition being one of ::flow/resume, ::flow/pause
209+
or ::flow/stop
213210
214-
With thisfna process impl can track changes and coordinate
211+
With this a process impl can track changes and coordinate
215212
resources, especially cleaning up any resources on :stop, since the
216213
process will no longer be used following that. See the SPI for
217214
details. state' will be the state supplied to subsequent calls.
218215
219-
arity 3, or :transform - required, (state in-name msg) -> [state' output]
216+
arity 3 - 'transform', (state in-name msg) -> [state' output]
220217
where output is a map of outid->[msgs*]
221218
222-
The transformfn will be called every time a message arrives at any
219+
The transformarity will be called every time a message arrives at any
223220
of the inputs. Output can be sent to none, any or all of the :outs
224221
enumerated, and/or an input named by a [pid inid] tuple (e.g. for
225222
reply-to), and/or to the ::flow/report output. A step need not
226223
output at all (output or msgs can be empyt/nil), however an output _message_
227224
may never be nil (per core.async channels). state' will be the state
228225
supplied to subsequent calls.
229226
230-
process accepts an option map with keys:
227+
processalsoaccepts an option map with keys:
231228
:workload - one of :mixed, :io or :compute
232229
:compute-timeout-ms - if :workload is :compute, this timeout (default 5000 msec)
233230
will be used when getting the return from the future - see below
@@ -242,18 +239,38 @@
242239
243240
When :io is specified, transform should not do extensive computation.
244241
245-
When :compute is specified (only allowed for :transform), each call
246-
to transform will be run in aseparate thread. The process loop will
247-
run in an :io context (sinceit no longer directly calls transform,
248-
all it does is I/O) and itwill submit transform to the :compute
249-
executor then await (blocking,for compute-timeout-ms) the
250-
completion of the futurereturned by the executor. If the future
251-
times out it will be reportedon ::flow/error.
242+
When :compute is specified, each call totransform will be run in a
243+
separate thread. The process loop will run in an :io context (since
244+
it no longer directly calls transform, all it does is I/O) and it
245+
will submit transform to the :compute executor then await (blocking,
246+
for compute-timeout-ms) the completion of the future returned by the
247+
executor. If the futuretimes out it will be reported
248+
on ::flow/error.
252249
253250
When :compute is specified transform must not block!"
254-
([fn-or-map] (process fn-or-mapnil))
255-
([fn-or-map {:keys [workload compute-timeout-ms]:as opts}]
256-
(impl/proc fn-or-map opts)))
251+
([step-fn] (process step-fnnil))
252+
([step-fn {:keys [workload compute-timeout-ms]:as opts}]
253+
(impl/proc step-fn opts)))
254+
255+
(defnmap->step
256+
"given a map of functions corresponding to step fn arities (see
257+
'process'), returns a step fn suitable for passing to 'process'. You
258+
can use this map form to compose the proc logic from disparate
259+
functions or to leverage the optionality of some of the entry
260+
points.
261+
262+
The keys in the map are:
263+
:describe, arity 0 - required
264+
:init, arity 1 - optional, but should be provided if 'describe' returns :params.
265+
:transition, arity 2 - optional
266+
:transform, arity 3 - required"
267+
[{:keys [describe init transition transform]}]
268+
(assert (and describe transform)"must provide :describe and :transform")
269+
(fn
270+
([] (describe))
271+
([arg-map] (when init (init arg-map)))
272+
([state trans] (if transition (transition state trans) state))
273+
([state input msg] (transform state input msg))))
257274

258275
(defnlift*->step
259276
"given a fn f taking one arg and returning a collection of non-nil
@@ -263,15 +280,20 @@
263280
(fn
264281
([] {:ins {:in (str"the argument to" f)}
265282
:outs {:out (str"the return of" f)}})
266-
([_]nil)
267-
([_ _]nil)
268-
([_ _ msg] [nil {:out (f msg)}])))
283+
([arg-map]nil)
284+
([state transition]nil)
285+
([state input msg] [nil {:out (f msg)}])))
269286

270287
(defnlift1->step
271288
"like lift*->step except taking a fn returning one value, which when
272289
nil will yield no output."
273290
[f]
274-
(lift*->step #(when-some [m (f %)] (vector m))))
291+
(fn
292+
([] {:ins {:in (str"the argument to" f)}
293+
:outs {:out (str"the return of" f)}})
294+
([arg-map]nil)
295+
([state transition]nil)
296+
([state input msg] [nil (when-some [m (f msg)] {:out (vector m)})])))
275297

276298
(defnfuturize
277299
"Takes a fn f and returns a fn that takes the same arguments as f

‎src/main/clojure/clojure/core/async/flow/impl.clj

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@
189189
(defnhandle-transition
190190
"when transition, returns maybe new state"
191191
[transition status nstatus state]
192-
(if (and transition (not= status nstatus))
192+
(if (not= status nstatus)
193193
(transition state (case nstatus
194194
:exit::flow/stop
195195
:running::flow/resume
@@ -219,29 +219,25 @@
219219

220220
(defnproc
221221
"see lib ns for docs"
222-
[fm {:keys [workload compute-timeout-ms]:or {compute-timeout-ms5000}}]
223-
(let [{:keys [describe init transition transform]:as impl}
224-
(if (map? fm) fm {:describe fm:init fm:transition fm:transform fm})
225-
{:keys [params ins]:as desc} (describe)
222+
[step {:keys [workload compute-timeout-ms]:or {compute-timeout-ms5000}}]
223+
(let [{:keys [params ins]:as desc} (step)
226224
workload (or workload (:workload desc):mixed)]
227-
(assert transform"must provide :transform")
228-
(assert (or (not params) init)"must have :init if :params")
225+
;;(assert (or (not params) init) "must have :init if :params")
229226
(reify
230227
clojure.core.protocols/Datafiable
231228
(datafy [_]
232229
(let [{:keys [params ins outs]} desc]
233-
(walk/postwalk datafy {:impl fm:params (-> params keys vec)
234-
:ins (-> ins keys vec):outs (-> outs keys vec)})))
230+
(walk/postwalk datafy {:step step:desc desc})))
235231
spi/ProcLauncher
236232
(describe [_] desc)
237233
(start [_ {:keys [pid args ins outs resolver]}]
238234
(assert (or (not params) args)"must provide :args if :params")
239235
(let [transform (if (= workload:compute)
240-
#(.get ^Future ((futurizetransform {:exec (spi/get-exec resolver:compute)}) %1 %2 %3)
236+
#(.get ^Future ((futurizestep {:exec (spi/get-exec resolver:compute)}) %1 %2 %3)
241237
compute-timeout-ms TimeUnit/MILLISECONDS)
242-
transform)
238+
step)
243239
exs (spi/get-exec resolver (if (= workload:mixed):mixed:io))
244-
state (when init (initargs))
240+
state (stepargs)
245241
ins (into (or ins {}) (::flow/in-ports state))
246242
outs (into (or outs {}) (::flow/out-ports state))
247243
io-id (zipmap (concat (vals ins) (vals outs)) (concat (keys ins) (keys outs)))
@@ -261,7 +257,7 @@
261257
(try
262258
(if (= status:paused)
263259
(let [nstatus (handle-command status (async/<!! control))
264-
nstate (handle-transitiontransition status nstatus state)]
260+
nstate (handle-transitionstep status nstatus state)]
265261
[nstatus nstate count read-ins])
266262
;;:running
267263
(let [;;TODO rotate/randomize after control per normal alts?
@@ -275,13 +271,13 @@
275271
cid (io-id c)]
276272
(if (= c control)
277273
(let [nstatus (handle-command status msg)
278-
nstate (handle-transitiontransition status nstatus state)]
274+
nstate (handle-transitionstep status nstatus state)]
279275
[nstatus nstate count read-ins])
280276
(try
281277
(let [[nstate outputs] (transform state cid msg)
282278
[nstatus nstate]
283279
(send-outputs status nstate outputs outs
284-
resolver control handle-commandtransition)]
280+
resolver control handle-commandstep)]
285281
[nstatus nstate (inc count) (if (some? msg)
286282
read-ins
287283
(dissoc read-ins cid))])

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp