*Feb 2025* #clojure #datastar In [[Building an SSE endpoint with Clojure, core.async, Ring — Part 1 (a tracer-bullet)|part 1]], we built a tracer-bullet for an SSE connection. Now I'm looking into improving it: - Broadcast the same SSE event to multiple clients - Handle client disconnects - Design the code such that a message can be sent to a selected connection, a subset of connections or to all connections. ### Modelling a connection There seem to be at least two approaches to modelling an SSE connection: - A connection is an output stream or a writer - A connection is a core-async channel Some thoughts on this: - The Datastar Clojure SDK [uses BufferedWriter](https://github.com/starfederation/datastar/blob/v1.0.0-beta.7/sdk/clojure/adapter-ring/src/main/starfederation/datastar/clojure/adapter/ring/impl.clj#L15), with locks preventing concurrent writes. I assume SDK uses `BufferedWriter` to give its users freedom to use whatever message producers they prefer, such as core-async channels or others. - Looking into [Anders exploration](https://andersmurphy.com/2024/10/07/clojure-synchronous-server-sent-events-with-virtual-threads-and-channels.html), a connection is modelled as a core-async channel. If we were to block the request thread with a blocking take (`>!!`), as far as I can tell, there wouldn't be concurrency issues, because we write one SSE event at a time. - It seems to me that writers/output-stream model provides flexibility but requires managing concurrency, while the core-async model encapsulates concurrency at the cost of flexibility. Since I'm using core-async channels already on the producer side, modelling a connection as a core-async channel seems like a reasonable choice at this point. ```clojure (defonce sse-connections (atom #{})) (defn add-connection! [conn] (let [chan (async/chan 10) client' (assoc conn :channel chan)] (println "SSE connection added") (swap! sse-connections conj client') client')) (defn remove-connection! [conn] (println "SSE connection removed") (swap! sse-connections disj conn)) (defn list-connections [] @sse-connections) ``` I thought of making `connection` a map, so that a Ring request map can be passed to it as well. In future, if we want to extract data from the Ring request, such as the logged user, we can do so in `add-connection!`. Such data could be useful in future when broadcasting messages. ### Adding a connection on each new request Now we need to create and add the new connection on each new request to the SSE endpoint: ```clojure (defn sse-handler [] (fn [ring-request] (let [conn (add-connection! {:request ring-request}) channel (:channel conn)] {:status 200, :headers {"Content-Type" "text/event-stream" "Cache-Control" "no-cache, no-store"}, :body channel}))) ``` We should now be able to start the server, connect e.g. via curl, and see the number of active connection ```clojure (comment) (def server (jetty/run-jetty (app) {:port 3001 :join? false})) (count (list-connections)) ``` ### Handling disconnects It seems that with Ring/Jetty, the only way to determine that a client has disconnected is that writing to the output-stream throws an exception. We can capture that exception in `write-body-to-stream`. We can close the channel there, but how to remove the connection from the pool? We don't have pool available in `write-body-to-stream`. ```clojure (defn channel->output-stream [channel output-stream] (try (with-open [out output-stream writer (io/writer out)] (loop [] (when-let [msg (async/<!! channel)] (doto writer (.write msg) (.flush)) (recur)))) (catch IOException e ;; Ring/Jetty throws on a disconnected client ... ??? ... )))) ;; What to do here? ``` One approach is injecting the connection pool into `write-body-to-stream`. But that would unnecessarily couple things together. Another approach is closing the connection in `write-body-to-stream` and check in another place if a channel is closed. Let's do that. ```clojure (defn channel->output-stream [channel output-stream] (with-open [out output-stream writer (io/writer out)] (try (loop [] (println "Blocking on channel" channel) (when-let [msg (async/<!! channel)] (doto writer (.write msg) (.flush)) (recur))) (catch IOException _) ; ignoring the exception (finally (async/close! channel))))) ; closing the channel ``` ### Broadcasting & removing connections from the pool It turns out that `<!!` returns `false` on a closed connection, we can use that. We can use a periodic heartbeat to check for closed connections, as Anders [chose to](https://andersmurphy.com/2024/10/07/clojure-synchronous-server-sent-events-with-virtual-threads-and-channels.html) do; or we can check for a closed channel when we broadcast a message. To keep things as simple as possible, let's go with the broadcasting approach for now. It's not ideal, we might keep connections in the pool while we don't send SSE events, but I think that's ok for now. ```clojure (defn broadcast [msg] (doseq [client @sse-connections] (let [sent? (async/>!! (:channel client) msg)] (if sent? (println "message sent") (remove-connection! client))))) ``` ### Final thoughts I think this would suffice as a crude first working solution. There are areas to improve it later: - Use heartbeat for checking disconnected clients - Checkout http-kit or ring-async how they handle client disconnects - Move to Clojure SDK for Datastar for optimized high-performance endpoint not coupled to a core-async. _Thanks to Jeremy and Anders for the discussions on Datastar Discord, their posts and the Clojure Datastar SDK. _ ### Putting it all together ```clojure (ns fiddle.core-async-sse (:require [clojure.core.async :as async] [clojure.java.io :as io] [reitit.ring :as ring] [ring.adapter.jetty :as jetty] [ring.core.protocols :refer [StreamableResponseBody]]) (:import (clojure.core.async.impl.channels ManyToManyChannel) (java.io IOException))) ;; ------------------------------------------- ;; Clients ;; ------------------------------------------ (defonce sse-connections (atom #{})) (defn add-connection! [conn] (let [chan (async/chan 10) client' (assoc conn :channel chan)] (println "SSE connection added") (swap! sse-connections conj client') client')) (defn remove-connection! [conn] (println "SSE connection removed") (swap! sse-connections disj conn)) (defn list-connections [] @sse-connections) ;; ------------------------------------------- ;; Ring handler ;; ------------------------------------------ ; convert a channel to an output stream (defn channel->output-stream [channel output-stream] (with-open [out output-stream writer (io/writer out)] (try (loop [] (println "Blocking on channel" channel) (when-let [msg (async/<!! channel)] (doto writer (.write msg) (.flush)) (recur))) (catch IOException _) ;; Ring/Jetty throws here on disconnected client (finally (async/close! channel))))) ; teaching Ring to convert a channel to a response stream (extend-type ManyToManyChannel StreamableResponseBody (write-body-to-stream [ch _response output-stream] (channel->output-stream ch output-stream))) ; sync Ring handler (defn sse-handler [] (fn [ring-request] (let [conn (add-connection! {:request ring-request}) channel (:channel conn)] {:status 200, :headers {"Content-Type" "text/event-stream" "Cache-Control" "no-cache, no-store"}, :body channel}))) (defn app [] (ring/ring-handler (ring/router [["/" {:get {:handler (sse-handler)}}] ["/status" {:get {:handler (constantly {:status 200})}}]]))) ;; ------------------------------------------- ;; Broadcasting ;; ------------------------------------------ (defn some-sse-message [t] (format "event: hello\ndata: current time is %s\n\n" t)) (defn broadcast [msg] (doseq [client @sse-connections] (let [sent? (async/>!! (:channel client) msg)] (if sent? (println "message sent") (remove-connection! client))))) (comment (when server (.stop server)) (def server (jetty/run-jetty (app) {:port 3001 :join? false})) (count (list-connections)) (broadcast (some-sse-message (System/currentTimeMillis)))) ```