(pub ch topic-fn)
(pub ch topic-fn buf-fn)
Creates and returns a pub(lication) of the supplied channel,
partitioned into topics by the topic-fn. topic-fn will be applied to
each value on the channel and the result will determine the 'topic'
on which that value will be put. Channels can be subscribed to
receive copies of topics using 'sub', and unsubscribed using
'unsub'. Each topic will be handled by an internal mult on a
dedicated channel. By default these internal channels are
unbuffered, but a buf-fn can be supplied which, given a topic,
creates a buffer with desired properties.
Each item is distributed to all subs in parallel and synchronously,
i.e. each sub must accept before the next item is distributed. Use
buffering/windowing to prevent slow subs from holding up the pub.
Items received when there are no matching subs get dropped.
Note that if buf-fns are used then each topic is handled
asynchronously, i.e. if a channel is subscribed to more than one
topic it should not expect them to be interleaved identically with
the source.
Source
(defn
pub
"Creates and returns a pub(lication) of the supplied channel,\n partitioned into topics by the topic-fn. topic-fn will be applied to\n each value on the channel and the result will determine the 'topic'\n on which that value will be put. Channels can be subscribed to\n receive copies of topics using 'sub', and unsubscribed using\n 'unsub'. Each topic will be handled by an internal mult on a\n dedicated channel. By default these internal channels are\n unbuffered, but a buf-fn can be supplied which, given a topic,\n creates a buffer with desired properties.\n\n Each item is distributed to all subs in parallel and synchronously,\n i.e. each sub must accept before the next item is distributed. Use\n buffering/windowing to prevent slow subs from holding up the pub.\n\n Items received when there are no matching subs get dropped.\n\n Note that if buf-fns are used then each topic is handled\n asynchronously, i.e. if a channel is subscribed to more than one\n topic it should not expect them to be interleaved identically with\n the source."
([ch topic-fn] (pub ch topic-fn (constantly nil)))
([ch topic-fn buf-fn]
(let
[mults
(atom {})
ensure-mult
(fn
[topic]
(or
(get @mults topic)
(get
(swap!
mults
(fn*
[p1__18543#]
(if
(p1__18543# topic)
p1__18543#
(assoc p1__18543# topic (mult (chan (buf-fn topic)))))))
topic)))
p
(reify
Mux
(muxch* [_] ch)
Pub
(sub*
[p topic ch close?]
(let [m (ensure-mult topic)] (tap m ch close?)))
(unsub*
[p topic ch]
(when-let [m (get @mults topic)] (untap m ch)))
(unsub-all* [_] (reset! mults {}))
(unsub-all* [_ topic] (swap! mults dissoc topic)))]
(go-loop
[]
(let
[val (<! ch)]
(if
(nil? val)
(doseq [m (vals @mults)] (close! (muxch* m)))
(let
[topic (topic-fn val) m (get @mults topic)]
(when
m
(when-not (>! (muxch* m) val) (swap! mults dissoc topic)))
(recur)))))
p)))