*Feb 2025* #clojure #datastar #server-sent-events Struck by [Datastar](https://data-star.dev/), I'm trying to learn how to build a server-sent-event (SSE) endpoint in Clojure, using Ring with Jetty adapter, and Reitit router. This is Part 1, there's also [[Building an SSE endpoint with Clojure, core.async, Ring — Part 2 (connection pools)|Part 2]]. ### The problem I want to get a single client to connect to the SSE endpoint and see the messages. At this point, I am not interested in disconnects, multiple clients, efficiency, etc. I want the simplest imperfect, resource-leaking tracer-bullet solution, just to get something off the ground. I assume I already have a core.async channel containing the text representations of SSE messages. ```clojure ; What we already have (async/>!! my-sse-channel) ; outputs: 'data: current time 12:33:03\n\n' (async/>!! my-sse-channel) ; outputs: 'data: current time 12:33:04\n\n' (async/>!! my-sse-channel) ; outputs: 'data: current time 12:33:05\n\n' ; we need to transform this somehow into a stream of events {:status 200 :headers {}, :body ... a stream of SSE events ...}) ``` What follows is my step-by-step deconstruction of what [James Reeves](https://www.booleanknot.com/blog/2016/07/15/asynchronous-ring.html) and [Anders Murphy](https://andersmurphy.com/2024/10/07/clojure-synchronous-server-sent-events-with-virtual-threads-and-channels.html) wrote on the topic. (Thanks a lot for writing those explanations.) Core-async is new to me, as well as Ring, and it took me a while to understand what is going on. Here's my attempt at that deconstruction. I reused some samples from those two posts, which I hope is okay. Alright, here we go. ### Ring handlers: sync or async? At minimum, we need to return a Ring response map that looks something like this: ```clojure {:status 200 ; return immediately :headers {"Content-Type" "text/event-stream"} ; return immediately :body ...output stream of events, but how?...} ; ??? ``` We need to *immediately* return the `status` and `headers`, and somehow we need to tell Ring that it should treat the `body` as an output stream. For that matter, Ring supports **both sync and async handlers** but I was struggling to figure out which one to use. ```clojure (defn handler ([request] ; sync handler, mapping a request {:status 200, :headers {}, :body "Hello World"}) ([request respond raise] ; async handler, returns result using `respond` fn (respond {:status 200, :headers {}, :body "Hello World"})) ``` How to return a streaming response, which seems to belong to the async realm, if all we have available are either a fully sync or fully async responses? I had this table in my head all the time that I kept pondering: ![[CleanShot 2025-02-18 at 20.04.59.png]] ### From a core.async channel to an output-stream As a first step, we can go from a channel to an output stream with a function that looks like this. To keep things simple, we ignore as much as we can: we ignore client disconnects and we use `async/<!!` that blocks the current thread (more on blocking later). ```clojure (defn channel->output-stream [channel output-stream] (with-open [out output-stream writer (io/writer out)] (loop [] (when-let [^String msg (async/<!! channel)] (doto writer (.write msg) (.flush)) (recur))))) ``` Now, how can we send the output stream to the response? ### The big AHA moment The big AHA moment that unlocked this for me was the following: > In previous versions of Ring, there was a choice of four types for the response body: strings, seqs, files and input streams. The current beta expands this to include the `StreamableResponseBody` protocol, which allows you to define how a type is written to the response `OutputStream`. > — [Boolean Knot - Asynchronous Ring](https://www.booleanknot.com/blog/2016/07/15/asynchronous-ring.html) As I see it, there are two key learnings here. First, the `:body` supports any `OutputStream`, so we can use sync handlers to populate that output stream. Second, we can use protocols to transform the channel into the output-stream. It was helpful to also peek at the documentation for Ring `StreamableResponseBody` method: > Write a value representing a response body to an output stream. The stream will be closed after the value had been written. The stream may be written asynchronously from asynchronous handlers. In synchronous handlers, the response is considered completed once this method ends." Putting the two together, we get: ```clojure (extend-type ManyToManyChannel StreamableResponseBody (write-body-to-stream [ch _response output-stream] (channel->output-stream ch output-stream))) (defn sse-handler [sse-chan] (fn [_request] {:status 200, :headers {"Content-Type" "text/event-stream"}, :body sse-chan})) ``` That worked, great! Yet there was one question that kept puzzling me. ### Jetty threading model The question left was about thread blocking. Remember how we block the current thread in the `channel->output-stream`. How does that work with respect to requests? Are we in trouble by blocking the request thread? It turns out that the threading model is controlled by the underlying Ring adapter, which in my case was Jetty. The best answer on Jetty threading model I could find was [this answer](https://clojureverse.org/t/what-is-the-request-to-thread-mapping-when-using-ring-jetty-adapter/7074/5) from Clojureverse: > Jetty 9 is like a hybrid async/sync. It’ll process everything async, but make them sync if need be. So it’ll create a thread for you to handle the request, and then your handler can block, in which case the thread is blocked, and Jetty will create another thread to handle another request. But your handler can also choose to not block, but go in a sync pending state, on which case jetty will park it and the thread will be released to the pool, so when another request comes in it can reuse that same thread. > > > The number of request that I can handle concurrently == the number of threads available ? > > Yes, if you use Ring’s synchronous handler, and don’t do anything special to become async within it. I don't quite understand what "sync pending state" means, but if we disregard that, it seems the threading model is one thread per one request. Because this is a tracer-bullet, I'd say the per-request threading model suffices for now. ### Improving the tracer-bullet With the tracer bullet working, there are thing we can improve: - Client management. The tracer bullet works only for one connection, because a message in core async channel can be taken by only one consumer. Blog posts with info - [Clojure: Synchronous server sent events with virtual threads and channels](https://andersmurphy.com/2024/10/07/clojure-synchronous-server-sent-events-with-virtual-threads-and-channels.html) - Efficiency. We are currently using platform threads, but there are more efficient alternatives, namely async/go blocks or Java virtual threads. Blog posts with info: - [Clojure: virtual threads with ring and jetty](https://andersmurphy.com/2023/09/16/clojure-virtual-threads-with-ring-and-jetty.html) - [Virtual Threads in Clojure](https://ericnormand.me/guide/clojure-virtual-threads) There's [[Building an SSE endpoint with Clojure, core.async, Ring — Part 2 (connection pools)||Part 2]] where I tried to add connection management. ### Putting it all together For the record, here's the complete code I used for fiddling. ```clojure (ns fiddle.core-async-sse (:require [clojure.core.async :as async] [clojure.java.io :as io] [reitit.ring :as ring] [ring.adapter.jetty :as jetty] [ring.core.protocols :refer [StreamableResponseBody]]) (:import (clojure.core.async.impl.channels ManyToManyChannel))) ; convert a channel to an output stream (defn channel->output-stream [channel output-stream] (with-open [out output-stream writer (io/writer out)] (loop [] (when-let [^String msg (async/<!! channel)] (doto writer (.write msg) (.flush)) (recur))))) ; teaching Ring to convert a channel to a response stream (extend-type ManyToManyChannel StreamableResponseBody (write-body-to-stream [ch _response output-stream] (channel->output-stream ch output-stream))) ; sync Ring handler (defn sse-handler [sse-chan] (fn [_request] (println "Connected to SSE endpoint") {:status 200, :headers {"Content-Type" "text/event-stream" "Cache-Control" "no-cache, no-store"}, :body sse-chan})) (defn app [sse-chanel] (ring/ring-handler (ring/router [["/" {:get {:handler (sse-handler sse-chanel)}}] ["/status" {:get {:handler (constantly {:status 200})}}]]))) (defn some-sse-message [t] (format "event: hello\ndata: current time is %s\n\n" t)) (defn produce-random-sse-events! [ch] (async/thread (loop [] (Thread/sleep 2000) (let [msg (some-sse-message (System/currentTimeMillis))] (when (async/>!! ch msg) (println msg))) (recur)))) (comment (do (def my-sse-channel (async/chan 10)) (def server (jetty/run-jetty (app my-sse-channel) {:port 3001 :join? false})) (produce-random-sse-events! my-sse-channel)) (when server (.stop server)) (async/close! my-sse-channel)) ```