CLJS
basic-lein-cljs.core
cljs.analyzer
cljs.compiler
CLJS
cljs.core
CLJS
cljs.core.async
CLJS
cljs.core.async.impl.buffers
CLJS
cljs.core.async.impl.channels
CLJS
cljs.core.async.impl.dispatch
CLJS
cljs.core.async.impl.ioc-helpers
CLJS
cljs.core.async.impl.protocols
CLJS
cljs.core.async.impl.timers
cljs.env
cljs.externs
CLJS
cljs.js
cljs.js-deps
CLJS
cljs.pprint
CLJS
cljs.reader
CLJS
cljs.repl
cljs.source-map
CLJS
cljs.source-map
cljs.source-map.base64
CLJS
cljs.source-map.base64
cljs.source-map.base64-vlq
CLJS
cljs.source-map.base64-vlq
CLJS
cljs.spec.alpha
CLJS
cljs.spec.gen.alpha
cljs.tagged-literals
CLJS
cljs.tools.reader
CLJS
cljs.tools.reader.edn
CLJS
cljs.tools.reader.impl.commons
CLJS
cljs.tools.reader.impl.errors
CLJS
cljs.tools.reader.impl.inspect
CLJS
cljs.tools.reader.impl.utils
CLJS
cljs.tools.reader.reader-types
cljs.util
clojure.core
clojure.core.async
clojure.core.async.impl.buffers
clojure.core.async.impl.channels
clojure.core.async.impl.concurrent
clojure.core.async.impl.dispatch
clojure.core.async.impl.exec.threadpool
clojure.core.async.impl.ioc-macros
clojure.core.async.impl.mutex
clojure.core.async.impl.protocols
clojure.core.async.impl.timers
clojure.core.cache
clojure.core.memoize
clojure.core.protocols
clojure.core.server
clojure.data.json
clojure.data.priority-map
clojure.edn
clojure.instant
clojure.java.io
clojure.main
clojure.pprint
clojure.reflect
clojure.repl
clojure.set
CLJS
clojure.set
clojure.spec.alpha
clojure.spec.gen.alpha
clojure.string
CLJS
clojure.string
clojure.tools.analyzer
clojure.tools.analyzer.ast
clojure.tools.analyzer.env
clojure.tools.analyzer.jvm
clojure.tools.analyzer.jvm.utils
clojure.tools.analyzer.passes
clojure.tools.analyzer.passes.add-binding-atom
clojure.tools.analyzer.passes.cleanup
clojure.tools.analyzer.passes.constant-lifter
clojure.tools.analyzer.passes.elide-meta
clojure.tools.analyzer.passes.emit-form
clojure.tools.analyzer.passes.jvm.analyze-host-expr
clojure.tools.analyzer.passes.jvm.annotate-host-info
clojure.tools.analyzer.passes.jvm.annotate-loops
clojure.tools.analyzer.passes.jvm.annotate-tag
clojure.tools.analyzer.passes.jvm.box
clojure.tools.analyzer.passes.jvm.classify-invoke
clojure.tools.analyzer.passes.jvm.constant-lifter
clojure.tools.analyzer.passes.jvm.emit-form
clojure.tools.analyzer.passes.jvm.fix-case-test
clojure.tools.analyzer.passes.jvm.infer-tag
clojure.tools.analyzer.passes.jvm.validate
clojure.tools.analyzer.passes.jvm.validate-loop-locals
clojure.tools.analyzer.passes.jvm.validate-recur
clojure.tools.analyzer.passes.jvm.warn-on-reflection
clojure.tools.analyzer.passes.source-info
clojure.tools.analyzer.passes.trim
clojure.tools.analyzer.passes.uniquify
clojure.tools.analyzer.passes.warn-earmuff
clojure.tools.analyzer.utils
clojure.tools.cli
clojure.tools.namespace.dependency
clojure.tools.namespace.file
clojure.tools.namespace.find
clojure.tools.namespace.parse
clojure.tools.namespace.track
clojure.tools.reader
clojure.tools.reader.default-data-readers
clojure.tools.reader.impl.commons
clojure.tools.reader.impl.errors
clojure.tools.reader.impl.inspect
clojure.tools.reader.impl.utils
clojure.tools.reader.reader-types
clojure.walk
CLJS
clojure.walk
dynadoc.aliases
dynadoc.common
dynadoc.core
CLJS
dynadoc.core
dynadoc.example
CLJS
dynadoc.state
dynadoc.static
dynadoc.utils
dynadoc.watch
eval-soup.clojail
eval-soup.core
CLJS
eval-soup.core
CLJS
figwheel.client
CLJS
figwheel.client.file-reloading
CLJS
figwheel.client.heads-up
CLJS
figwheel.client.socket
CLJS
figwheel.client.utils
hawk.core
hawk.watcher
html-soup.core
ns-tracker.core
ns-tracker.dependency
ns-tracker.nsdeps
ns-tracker.parse
CLJS
oakcljs.tools.reader
CLJS
oakcljs.tools.reader.impl.commons
CLJS
oakcljs.tools.reader.impl.errors
CLJS
oakcljs.tools.reader.impl.inspect
CLJS
oakcljs.tools.reader.impl.utils
CLJS
oakcljs.tools.reader.reader-types
oakclojure.tools.reader
oakclojure.tools.reader.default-data-readers
oakclojure.tools.reader.impl.commons
oakclojure.tools.reader.impl.errors
oakclojure.tools.reader.impl.inspect
oakclojure.tools.reader.impl.utils
oakclojure.tools.reader.reader-types
org.httpkit.server
CLJS
paren-soup.console
CLJS
paren-soup.core
CLJS
paren-soup.dom
CLJS
paren-soup.instarepl
CLJS
reagent.core
CLJS
reagent.debug
CLJS
reagent.dom
CLJS
reagent.impl.batching
CLJS
reagent.impl.component
CLJS
reagent.impl.template
CLJS
reagent.impl.util
CLJS
reagent.ratom
ring.middleware.content-type
ring.middleware.file
ring.middleware.head
ring.middleware.keyword-params
ring.middleware.params
ring.middleware.reload
ring.middleware.resource
ring.util.codec
ring.util.io
ring.util.mime-type
ring.util.parsing
ring.util.request
ring.util.response
ring.util.time
rum.core
CLJS
rum.core
rum.cursor
rum.derived-atom
rum.server-render
rum.util
sablono.compiler
CLJS
sablono.core
sablono.normalize
sablono.util
tag-soup.core

(->ManyToManyChannel takes puts buf closed mutex add!)

Positional factory function for class clojure.core.async.impl.channels.ManyToManyChannel.

Source

(deftype ManyToManyChannel [^LinkedList takes ^LinkedList puts ^Queue buf closed ^Lock mutex add!] MMC (cleanup [_] (when-not (.isEmpty takes) (let [iter (.iterator takes)] (loop [taker (.next iter)] (when-not (impl/active? taker) (.remove iter)) (when (.hasNext iter) (recur (.next iter)))))) (when-not (.isEmpty puts) (let [iter (.iterator puts)] (loop [[putter] (.next iter)] (when-not (impl/active? putter) (.remove iter)) (when (.hasNext iter) (recur (.next iter))))))) (abort [this] (let [iter (.iterator puts)] (when (.hasNext iter) (loop [^Lock putter (.next iter)] (.lock putter) (let [put-cb (and (impl/active? putter) (impl/commit putter))] (.unlock putter) (when put-cb (dispatch/run (fn [] (put-cb true)))) (when (.hasNext iter) (recur (.next iter))))))) (.clear puts) (impl/close! this)) impl/WritePort (put! [this val handler] (when (nil? val) (throw (IllegalArgumentException. "Can't put nil on channel"))) (.lock mutex) (cleanup this) (if @closed (do (.unlock mutex) (box false)) (let [^Lock handler handler] (if (and buf (not (impl/full? buf)) (not (.isEmpty takes))) (do (.lock handler) (let [put-cb (and (impl/active? handler) (impl/commit handler))] (.unlock handler) (if put-cb (let [done? (reduced? (add! buf val))] (if (pos? (count buf)) (let [iter (.iterator takes) take-cbs (loop [takers []] (if (and (.hasNext iter) (pos? (count buf))) (let [^Lock taker (.next iter)] (.lock taker) (let [ret (and (impl/active? taker) (impl/commit taker))] (.unlock taker) (if ret (let [val (impl/remove! buf)] (.remove iter) (recur (conj takers (fn [] (ret val))))) (recur takers)))) takers))] (if (seq take-cbs) (do (when done? (abort this)) (.unlock mutex) (doseq [f take-cbs] (dispatch/run f))) (do (when done? (abort this)) (.unlock mutex)))) (do (when done? (abort this)) (.unlock mutex))) (box true)) (do (.unlock mutex) nil)))) (let [iter (.iterator takes) [put-cb take-cb] (when (.hasNext iter) (loop [^Lock taker (.next iter)] (if (< (impl/lock-id handler) (impl/lock-id taker)) (do (.lock handler) (.lock taker)) (do (.lock taker) (.lock handler))) (let [ret (when (and (impl/active? handler) (impl/active? taker)) [(impl/commit handler) (impl/commit taker)])] (.unlock handler) (.unlock taker) (if ret (do (.remove iter) ret) (when (.hasNext iter) (recur (.next iter)))))))] (if (and put-cb take-cb) (do (.unlock mutex) (dispatch/run (fn [] (take-cb val))) (box true)) (if (and buf (not (impl/full? buf))) (do (.lock handler) (let [put-cb (and (impl/active? handler) (impl/commit handler))] (.unlock handler) (if put-cb (let [done? (reduced? (add! buf val))] (when done? (abort this)) (.unlock mutex) (box true)) (do (.unlock mutex) nil)))) (do (when (and (impl/active? handler) (impl/blockable? handler)) (assert-unlock mutex (< (.size puts) impl/MAX-QUEUE-SIZE) (str "No more than " impl/MAX-QUEUE-SIZE " pending puts are allowed on a single channel." " Consider using a windowed buffer.")) (.add puts [handler val])) (.unlock mutex) nil)))))))) impl/ReadPort (take! [this handler] (.lock mutex) (cleanup this) (let [^Lock handler handler commit-handler (fn [] (.lock handler) (let [take-cb (and (impl/active? handler) (impl/commit handler))] (.unlock handler) take-cb))] (if (and buf (pos? (count buf))) (do (if-let [take-cb (commit-handler)] (let [val (impl/remove! buf) iter (.iterator puts) [done? cbs] (when (.hasNext iter) (loop [cbs [] [^Lock putter val] (.next iter)] (.lock putter) (let [cb (and (impl/active? putter) (impl/commit putter))] (.unlock putter) (.remove iter) (let [cbs (if cb (conj cbs cb) cbs) done? (when cb (reduced? (add! buf val)))] (if (and (not done?) (not (impl/full? buf)) (.hasNext iter)) (recur cbs (.next iter)) [done? cbs])))))] (when done? (abort this)) (.unlock mutex) (doseq [cb cbs] (dispatch/run #(cb true))) (box val)) (do (.unlock mutex) nil))) (let [iter (.iterator puts) [take-cb put-cb val] (when (.hasNext iter) (loop [[^Lock putter val] (.next iter)] (if (< (impl/lock-id handler) (impl/lock-id putter)) (do (.lock handler) (.lock putter)) (do (.lock putter) (.lock handler))) (let [ret (when (and (impl/active? handler) (impl/active? putter)) [(impl/commit handler) (impl/commit putter) val])] (.unlock handler) (.unlock putter) (if ret (do (.remove iter) ret) (when-not (impl/active? putter) (.remove iter) (when (.hasNext iter) (recur (.next iter))))))))] (if (and put-cb take-cb) (do (.unlock mutex) (dispatch/run #(put-cb true)) (box val)) (if @closed (do (when buf (add! buf)) (let [has-val (and buf (pos? (count buf)))] (if-let [take-cb (commit-handler)] (let [val (when has-val (impl/remove! buf))] (.unlock mutex) (box val)) (do (.unlock mutex) nil)))) (do (when (impl/blockable? handler) (assert-unlock mutex (< (.size takes) impl/MAX-QUEUE-SIZE) (str "No more than " impl/MAX-QUEUE-SIZE " pending takes are allowed on a single channel.")) (.add takes handler)) (.unlock mutex) nil))))))) impl/Channel (closed? [_] @closed) (close! [this] (.lock mutex) (cleanup this) (if @closed (do (.unlock mutex) nil) (do (reset! closed true) (when (and buf (.isEmpty puts)) (add! buf)) (let [iter (.iterator takes)] (when (.hasNext iter) (loop [^Lock taker (.next iter)] (.lock taker) (let [take-cb (and (impl/active? taker) (impl/commit taker))] (.unlock taker) (when take-cb (let [val (when (and buf (pos? (count buf))) (impl/remove! buf))] (dispatch/run (fn [] (take-cb val))))) (.remove iter) (when (.hasNext iter) (recur (.next iter))))))) (when buf (impl/close-buf! buf)) (.unlock mutex) nil))))