feat: add SSE live sync between clients
When any client pushes changes, the server broadcasts an SSE event to all connected clients watching that group. Clients immediately pull the latest data — no polling delay, no page reload. Server (~30 lines): - GET /events?group=G — SSE endpoint, holds connection open - On successful POST /sync, notify all SSE listeners for affected groups - Auth-aware: respects token and group permissions - Auto-cleanup on disconnect via http-kit on-close Client (~25 lines): - EventSource connects to /events?group=G on sync loop start - On SSE message, triggers do-pull! to fetch latest data - Auto-reconnects (browser EventSource built-in behavior) - Cleanup on destroy! Periodic polling remains as fallback (30s default). SSE provides sub-second sync for the common case.
This commit is contained in:
@@ -275,26 +275,31 @@
|
|||||||
(defn- start-sync-loop!
|
(defn- start-sync-loop!
|
||||||
"Start the background sync loop. Returns a stop function."
|
"Start the background sync loop. Returns a stop function."
|
||||||
[sa]
|
[sa]
|
||||||
(let [stop-ch (.-stop_ch sa)
|
(let [stop-ch (.-stop_ch sa)
|
||||||
interval (.-sync_interval sa)]
|
interval (.-sync_interval sa)
|
||||||
;; Periodic sync
|
cleanups (atom [])]
|
||||||
|
;; Periodic sync (fallback)
|
||||||
(go-loop []
|
(go-loop []
|
||||||
(let [[_ ch] (alts! [stop-ch (timeout interval)])]
|
(let [[_ ch] (alts! [stop-ch (timeout interval)])]
|
||||||
(when-not (= ch stop-ch)
|
(when-not (= ch stop-ch)
|
||||||
(<! (do-sync! sa))
|
(<! (do-sync! sa))
|
||||||
(recur))))
|
(recur))))
|
||||||
;; Online/offline handler
|
;; Online/offline handler
|
||||||
(let [cleanup (sync/on-connectivity-change
|
(swap! cleanups conj
|
||||||
(fn [] ; online
|
(sync/on-connectivity-change
|
||||||
(go (<! (do-sync! sa))))
|
(fn [] (go (<! (do-sync! sa))))
|
||||||
(fn [] ; offline
|
(fn [] nil)))
|
||||||
nil))]
|
;; SSE — live pull on server push
|
||||||
(reset! (.-cleanup_fn sa) cleanup))
|
(when-let [opts (.-server_opts sa)]
|
||||||
|
(swap! cleanups conj
|
||||||
|
(sync/listen-events opts (.-group sa)
|
||||||
|
(fn [_group]
|
||||||
|
(go (<! (do-pull! sa)))))))
|
||||||
|
(reset! (.-cleanup_fn sa) cleanups)
|
||||||
;; Return stop function
|
;; Return stop function
|
||||||
(fn []
|
(fn []
|
||||||
(put! stop-ch :stop)
|
(put! stop-ch :stop)
|
||||||
(when-let [cleanup @(.-cleanup_fn sa)]
|
(doseq [f @(.-cleanup_fn sa)] (f)))))
|
||||||
(cleanup)))))
|
|
||||||
|
|
||||||
;; ---------------------------------------------------------------------------
|
;; ---------------------------------------------------------------------------
|
||||||
;; Public API
|
;; Public API
|
||||||
@@ -352,5 +357,4 @@
|
|||||||
"Stop the sync loop and clean up. Does not close the IDB connection."
|
"Stop the sync loop and clean up. Does not close the IDB connection."
|
||||||
[sa]
|
[sa]
|
||||||
(put! (.-stop_ch sa) :stop)
|
(put! (.-stop_ch sa) :stop)
|
||||||
(when-let [cleanup @(.-cleanup_fn sa)]
|
(doseq [f @(.-cleanup_fn sa)] (f)))
|
||||||
(cleanup)))
|
|
||||||
|
|||||||
@@ -49,6 +49,33 @@
|
|||||||
(or (nil? (:groups user)) ;; nil = access to all groups
|
(or (nil? (:groups user)) ;; nil = access to all groups
|
||||||
(contains? (:groups user) group)))
|
(contains? (:groups user) group)))
|
||||||
|
|
||||||
|
;; ---------------------------------------------------------------------------
|
||||||
|
;; SSE — live change notifications
|
||||||
|
;; ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
(defonce ^:private sse-clients
|
||||||
|
;; {group -> #{http-kit-channel ...}}
|
||||||
|
(atom {}))
|
||||||
|
|
||||||
|
(defn- sse-subscribe!
|
||||||
|
"Add an http-kit async channel to the SSE listener set for `group`."
|
||||||
|
[group ch]
|
||||||
|
(swap! sse-clients update group (fnil conj #{}) ch)
|
||||||
|
(http/on-close ch
|
||||||
|
(fn [_status]
|
||||||
|
(swap! sse-clients update group disj ch))))
|
||||||
|
|
||||||
|
(defn- sse-notify!
|
||||||
|
"Send an SSE event to all listeners of the given groups."
|
||||||
|
[groups]
|
||||||
|
(doseq [group groups
|
||||||
|
ch (get @sse-clients group)]
|
||||||
|
(http/send! ch {:status 200
|
||||||
|
:headers {"Content-Type" "text/event-stream"
|
||||||
|
"Cache-Control" "no-cache"}
|
||||||
|
:body (str "data: " group "\n\n")}
|
||||||
|
false))) ;; false = don't close, keep streaming
|
||||||
|
|
||||||
;; ---------------------------------------------------------------------------
|
;; ---------------------------------------------------------------------------
|
||||||
;; Handlers
|
;; Handlers
|
||||||
;; ---------------------------------------------------------------------------
|
;; ---------------------------------------------------------------------------
|
||||||
@@ -99,8 +126,39 @@
|
|||||||
:value (:value doc)
|
:value (:value doc)
|
||||||
:base-version (:base-version doc 0)})))
|
:base-version (:base-version doc 0)})))
|
||||||
docs)]
|
docs)]
|
||||||
|
;; Notify SSE listeners for affected groups
|
||||||
|
(when (some #(= :ok (:status %)) results)
|
||||||
|
(sse-notify! groups))
|
||||||
(transit-response 200 results)))))
|
(transit-response 200 results)))))
|
||||||
|
|
||||||
|
(defn- handle-events
|
||||||
|
"GET /events?group=G — SSE endpoint. Holds connection open."
|
||||||
|
[config req]
|
||||||
|
(let [params (or (:query-params req) {})
|
||||||
|
group (get params "group")
|
||||||
|
user (authenticate config req)]
|
||||||
|
(cond
|
||||||
|
(not user)
|
||||||
|
(transit-response 401 {:error "Unauthorized"})
|
||||||
|
|
||||||
|
(not group)
|
||||||
|
(transit-response 400 {:error "Missing 'group' parameter"})
|
||||||
|
|
||||||
|
(not (authorized-group? user group))
|
||||||
|
(transit-response 403 {:error "Access denied to group"})
|
||||||
|
|
||||||
|
:else
|
||||||
|
(http/with-channel req ch
|
||||||
|
(http/send! ch {:status 200
|
||||||
|
:headers {"Content-Type" "text/event-stream"
|
||||||
|
"Cache-Control" "no-cache"
|
||||||
|
"Connection" "keep-alive"
|
||||||
|
"Access-Control-Allow-Origin" "*"
|
||||||
|
"X-Accel-Buffering" "no"}
|
||||||
|
:body "data: connected\n\n"}
|
||||||
|
false)
|
||||||
|
(sse-subscribe! group ch)))))
|
||||||
|
|
||||||
;; ---------------------------------------------------------------------------
|
;; ---------------------------------------------------------------------------
|
||||||
;; Ring handler
|
;; Ring handler
|
||||||
;; ---------------------------------------------------------------------------
|
;; ---------------------------------------------------------------------------
|
||||||
@@ -164,6 +222,10 @@
|
|||||||
(= :options (:request-method req))
|
(= :options (:request-method req))
|
||||||
{:status 204 :headers {} :body nil}
|
{:status 204 :headers {} :body nil}
|
||||||
|
|
||||||
|
;; SSE live events
|
||||||
|
(= "/events" (:uri req))
|
||||||
|
(handle-events config req)
|
||||||
|
|
||||||
;; Sync endpoints
|
;; Sync endpoints
|
||||||
(= "/sync" (:uri req))
|
(= "/sync" (:uri req))
|
||||||
(let [user (authenticate config req)]
|
(let [user (authenticate config req)]
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
(ns pocketbook.sync
|
(ns pocketbook.sync
|
||||||
"HTTP sync client — pull and push documents to/from the Pocketbook server."
|
"HTTP sync client — pull and push documents to/from the Pocketbook server."
|
||||||
(:require [cognitect.transit :as t]
|
(:require [cognitect.transit :as t]
|
||||||
|
[clojure.string :as str]
|
||||||
[cljs.core.async :as async :refer [chan put!]]))
|
[cljs.core.async :as async :refer [chan put!]]))
|
||||||
|
|
||||||
;; ---------------------------------------------------------------------------
|
;; ---------------------------------------------------------------------------
|
||||||
@@ -91,6 +92,33 @@
|
|||||||
(async/close! ch)))
|
(async/close! ch)))
|
||||||
ch))
|
ch))
|
||||||
|
|
||||||
|
;; ---------------------------------------------------------------------------
|
||||||
|
;; SSE — live change notifications
|
||||||
|
;; ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
(defn listen-events
|
||||||
|
"Open an SSE connection to /events?group=G. Calls `on-change` when the
|
||||||
|
server signals new data. Returns a cleanup function."
|
||||||
|
[{:keys [server token]} group on-change]
|
||||||
|
(let [base-url (-> server
|
||||||
|
(str/replace #"/sync$" "")
|
||||||
|
(str/replace #"/$" ""))
|
||||||
|
url (str base-url "/events?group=" (js/encodeURIComponent group))
|
||||||
|
es (js/EventSource. url)]
|
||||||
|
(set! (.-onmessage es)
|
||||||
|
(fn [e]
|
||||||
|
(let [data (.-data e)]
|
||||||
|
;; Skip initial "connected" message
|
||||||
|
(when (not= data "connected")
|
||||||
|
(on-change data)))))
|
||||||
|
(set! (.-onerror es)
|
||||||
|
(fn [_e]
|
||||||
|
;; EventSource auto-reconnects; nothing to do
|
||||||
|
nil))
|
||||||
|
;; Return cleanup
|
||||||
|
(fn []
|
||||||
|
(.close es))))
|
||||||
|
|
||||||
;; ---------------------------------------------------------------------------
|
;; ---------------------------------------------------------------------------
|
||||||
;; Online detection
|
;; Online detection
|
||||||
;; ---------------------------------------------------------------------------
|
;; ---------------------------------------------------------------------------
|
||||||
|
|||||||
@@ -140,3 +140,40 @@
|
|||||||
(is (= uuid (:uuid pulled)))
|
(is (= uuid (:uuid pulled)))
|
||||||
(is (= inst (:inst pulled)))
|
(is (= inst (:inst pulled)))
|
||||||
(is (= {:a {:b 42}} (:nested pulled))))))
|
(is (= {:a {:b 42}} (:nested pulled))))))
|
||||||
|
|
||||||
|
(deftest sse-notifies-on-push
|
||||||
|
(testing "SSE endpoint sends event when a push succeeds"
|
||||||
|
(let [;; Open SSE connection
|
||||||
|
sse-req (-> (HttpRequest/newBuilder)
|
||||||
|
(.uri (URI. (url "/events" "group=todo")))
|
||||||
|
(.GET)
|
||||||
|
(.build))
|
||||||
|
;; Use a short timeout — we'll read what we get
|
||||||
|
events (atom [])
|
||||||
|
future (java.util.concurrent.CompletableFuture/supplyAsync
|
||||||
|
(reify java.util.function.Supplier
|
||||||
|
(get [_]
|
||||||
|
(try
|
||||||
|
(let [resp (.send client sse-req
|
||||||
|
(HttpResponse$BodyHandlers/ofInputStream))
|
||||||
|
is (.body resp)
|
||||||
|
rdr (java.io.BufferedReader.
|
||||||
|
(java.io.InputStreamReader. is "UTF-8"))]
|
||||||
|
;; Read lines until we get a data event about "todo"
|
||||||
|
(loop [deadline (+ (System/currentTimeMillis) 3000)]
|
||||||
|
(when (< (System/currentTimeMillis) deadline)
|
||||||
|
(when-let [line (.readLine rdr)]
|
||||||
|
(swap! events conj line)
|
||||||
|
(if (= line "data: todo")
|
||||||
|
@events
|
||||||
|
(recur deadline))))))
|
||||||
|
(catch Exception _ @events)))))]
|
||||||
|
;; Give SSE time to connect
|
||||||
|
(Thread/sleep 200)
|
||||||
|
;; Push a doc — should trigger SSE event
|
||||||
|
(post-transit "/sync"
|
||||||
|
[{:id "todo:sse-test" :value {:text "SSE!"} :base-version 0}])
|
||||||
|
;; Wait for SSE reader to receive it
|
||||||
|
(let [result (.get future 4 java.util.concurrent.TimeUnit/SECONDS)]
|
||||||
|
(is (some #(= "data: todo" %) result)
|
||||||
|
"SSE should have received a 'data: todo' event")))))
|
||||||
|
|||||||
Reference in New Issue
Block a user