Reloadable core.async code
After being exposed to writing reloadable code on the front end using figwheel and Om, I started wondering about how to write similarly on message-driven server applications.
The core.async library is fantastic, but it does have some gotchas when writing reloadable code. Definition of reloadable here being:
- I can reload a given namespace such that it affects currently running code in a predictable manner
- I can stop/start the ‘core’ part of the program and a predictable and safe manner
Here are a few things I’ve started doing to maintain a reloadable workflow.
Bring up channels in a function
Rather than this:
(defonce incoming-msgs (chan))
(defonce outgoing-orders (chan))
;; Do stuff here
Prefer:
(defn start
[]
(let [incoming-msgs (chan)
outgoing-orders (chan)]
;; Do stuff here
))
The reason for this is that, even if we gracefully do bring down all of our running go-blocks, there are times when a message will remain in the channel even after stopping. It’s best to just re-create the channel as part of the startup routine to avoid this altogether.
Prefer functions that take their dependant channels as arguments
Kind of an extension of the above - but rather than this:
(defonce incoming-msgs (chan))
(defn process-msgs
[process-fn]
(go
(loop []
(process-fn (<! incoming-msgs))
(recur))))
Prefer:
(defn process-msgs
[msg-chan process-fn]
(go
(loop []
(process-fn (<! msg-chan))
(recur))))
Other than the fact that it’s good form to have your functions declare their dependencies as arguments, it’s far easier to test/debug.
Handle receiving nil
Parking take’s (<!) will receive ‘nil’ from closed channels. The go-block should terminate accordingly.
Rather than:
(go-loop []
(process-fn (<! incoming-msgs))
(recur))
Prefer:
(go-loop []
(if-let [msg (<! incoming-msgs)]
(do
(println "Received " msg)
(recur))
(println "Done.")))
A little more verbose, but the go-blocks listening to those channels will complete gracefully on the channel closing. This is especially important in repl development, as you can leave a lot of these go-loops kicking around taking up resources.
Stop channels
I like to define a ‘stop channel’ that gets exposed to me, so I can stop all running go-blocks easily:
(defn start
[]
(let [stop-chan (chan)
stop-chan-m (mult stop-chan)
incoming-msgs (chan)]
(go
(<! stop-chan)
(close! incoming-msgs))
(fetch-msgs (copy-chan stop-chan-m) incoming-msgs)
(process-msgs (copy-chan stop-chan-m) incoming-msgs)
stop-chan))
We return the stop-chan above so that the caller has a hook to stop the process(es). To invoke it, we put any message onto the stop channel.
The ‘copy-chan’ is a simple method:
(defn copy-chan
[chm]
(let [copy (chan)]
(tap chm copy)
copy))
And fetch-msgs is a little more involved, due to the complexities of bridging the gap between the outside world and our app, plus remaining responsive to the stop channel:
(defn fetch-msgs
[stop-ch incoming-ch]
(go
(loop []
(let [msg (transport/recv-msg {:maxWait 200 :queue "system/incoming"})]
(when msg ;; msg is nil when no message to receive after maxWait
(let [[_ ch] (alts! [[incoming-ch msg] stop-ch])]
(condp = ch
incoming-ch (recur)
stop-ch :complete)))))
(println "fetch-msgs stopping.")))