(map f chs)
(map f chs buf-or-n)
Takes a function and a collection of source channels, and returns a
channel which contains the values produced by applying f to the set
of first items taken from each source channel, followed by applying
f to the set of second items from each channel, until any one of the
channels is closed, at which point the output channel will be
closed. The returned channel will be unbuffered by default, or a
buf-or-n can be supplied
Source
(defn
map
"Takes a function and a collection of source channels, and returns a\n channel which contains the values produced by applying f to the set\n of first items taken from each source channel, followed by applying\n f to the set of second items from each channel, until any one of the\n channels is closed, at which point the output channel will be\n closed. The returned channel will be unbuffered by default, or a\n buf-or-n can be supplied"
([f chs] (map f chs nil))
([f chs buf-or-n]
(let
[chs
(vec chs)
out
(chan buf-or-n)
cnt
(count chs)
rets
(object-array cnt)
dchan
(chan 1)
dctr
(atom nil)
done
(mapv
(fn
[i]
(fn
[ret]
(aset rets i ret)
(when (zero? (swap! dctr dec)) (put! dchan (.slice rets 0)))))
(range cnt))]
(go-loop
[]
(reset! dctr cnt)
(dotimes
[i cnt]
(try
(take! (chs i) (done i))
(catch js/Object e (swap! dctr dec))))
(let
[rets (<! dchan)]
(if
(some nil? rets)
(close! out)
(do (>! out (apply f rets)) (recur)))))
out)))