Skip to content

Instantly share code, notes, and snippets.

@pleasetrythisathome
Last active August 29, 2015 14:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pleasetrythisathome/c4db8a42b5067ef20052 to your computer and use it in GitHub Desktop.
Save pleasetrythisathome/c4db8a42b5067ef20052 to your computer and use it in GitHub Desktop.
asynchronous "protocol" handshakes
(ns async-protocol
(:require [clojure.core.async :as async]
[clojure.set :as set]
[com.stuartsierra.component :as component :refer (Lifecycle)]
[taoensso.timbre :as log]))
(defn throw-err [e]
(when (instance? Throwable e) (throw e))
e)
(defn <? [ch]
(throw-err (async/<! ch)))
(defn <?? [ch]
(throw-err (async/<!! ch)))
(defprotocol IAsyncProtocol
(send-chan [_])
(receive-chan [_])
(sent-events [_])
(received-events [_]))
(defn matching-arities? [source-fn arities]
(every? (fn [parameters]
(some (fn [args]
(let [[args parameters] (map (partial remove (partial = '_))
[args parameters])]
(and parameters
(if (= '& (last (butlast parameters)))
(>= (count args) (- (count parameters) 2))
(= (count parameters) (count args))))))
arities))
(:arglists (meta source-fn))))
(defn async-handshake! [component]
(assert (satisfies? IAsyncProtocol component) "component must satisfy IAsyncProtocol")
(let [send-c (send-chan component)
receive-c (receive-chan component)]
(when-let [events (sent-events component)]
(assert (and (seq events) send-c) "components with sent-events must provide a channel from send-chan")
(async/put! send-c events (sent-events component)))
(when-let [must-satisfy (received-events component)]
(assert (and (seq must-satisfy) receive-c) "components with send events must provide a channel from receive-chan")
(async/go
(try (loop [satisfied #{}
failed #{}]
(let [[v c] (async/alts! [receive-c
(async/timeout 1000)])]
(cond
(= (first v) ::done)
(if (= (second v) component)
true
(do (async/put! send-c v)
(recur satisfied failed)))
(not v)
(throw (ex-info (str "failed matching async-satisifes")
{:reason ::timeout
:component component}))
:else
(let [events v
[satisfied failed]
(reduce-kv (fn [[satisfied failed] k arities]
(let [v (get must-satisfy k)]
(if (and v (matching-arities? v arities))
[(conj satisfied k) failed]
(cond
(not send-c)
(throw (ex-info (str "dead end matching async-satisifes " k)
{:reason ::dead-end
:event-key k
:arities arities
:component component}))
(get failed [k arities])
(throw (ex-info (str "failed circuit matching async-satisifes " k)
{:reason ::failed
:event-key k
:arities arities
:component component}))
:else
(do
(async/put! send-c {k arities})
[satisfied (conj failed [k arities])])))))
[satisfied failed] events)]
(when-not (seq (set/difference (set (keys must-satisfy)) satisfied))
(async/put! send-c [::done component]))
(recur satisfied failed)))))
(catch Exception e
e))))))
(defn start-receive-loop! [component]
(assert (satisfies? IAsyncProtocol component) "component must satisfy IAsyncProtocol")
(let [receive-c (receive-chan component)
send-c (send-chan component)
events (received-events component)]
(assert (and (seq events) receive-c) "components with send events must provide a channel from receive-chan")
(when (seq events)
(async/take!
(async/go
(try (loop []
(when-let [event (async/<! receive-c)]
(let [[key & args] event
args (or args [])]
(if-let [handler-var (get events key)]
(cond
(not (matching-arities? handler-var [args]))
(throw (ex-info (str "malformed args, event:" key)
{:reason ::malformed-args
:event-key key
:args args
:expected (:arglists (meta handler-var))
:handler (meta handler-var)}))
:else
(let [handler (->> handler-var
meta
((juxt :ns :name))
(apply ns-resolve))]
(apply handler component args)))
(when send-c
(async/put! send-c event))))
(recur)))
(catch Exception e
e)))
throw-err))))
(defprotocol IDropdown
(open [_])
(close [_]))
(defrecord Dropdown [send-c receive-c]
IAsyncProtocol
(send-chan [_]
send-c)
(receive-chan [_]
receive-c)
(sent-events [_]
{::click-toggle '[[toggled?]]
::click-item '[[key]]})
(received-events [_]
{::open #'open
::close #'close})
IDropdown
(open [_]
(log/info :open))
(close [_]
(log/info :close)))
(defprotocol IHandleDropdown
(on-click-toggle [_ toggled?])
(on-click-item [_ key]))
(defrecord DropdownHandler [send-c receive-c]
IAsyncProtocol
(send-chan [_]
send-c)
(receive-chan [_]
receive-c)
(sent-events [_]
{::open [[]]
::close [[]]})
(received-events [_]
{::click-toggle #'on-click-toggle
::click-item #'on-click-item})
IHandleDropdown
(on-click-toggle [_ toggled?]
(log/info :click-toggle toggled?))
(on-click-item [_ key]
(log/info :click-item key)))
(def system
(component/start
(component/system-map
:to-dropdown (async/chan)
:from-dropdown (async/chan)
:dropdown (-> (map->Dropdown {})
(component/using {:send-c :from-dropdown
:receive-c :to-dropdown}))
:dropdown-handler (-> (map->DropdownHandler {})
(component/using {:send-c :to-dropdown
:receive-c :from-dropdown})))))
(let [cmps (filter (partial satisfies? IAsyncProtocol) (vals system))]
(when (->> (doall
(for [cmp cmps]
(async-handshake! cmp)))
(mapv <??)
(reduce =))
(doseq [cmp cmps]
(start-receive-loop! cmp))))
(comment
(async/put! (:to-dropdown system) [::open])
(async/put! (:to-dropdown system) [::close])
(async/put! (:from-dropdown system) [::click-toggle true])
(async/put! (:from-dropdown system) [::click-item :some-button])
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment