diff --git a/src/pocketbook/core.cljs b/src/pocketbook/core.cljs index 55041f0..718a19c 100644 --- a/src/pocketbook/core.cljs +++ b/src/pocketbook/core.cljs @@ -275,26 +275,31 @@ (defn- start-sync-loop! "Start the background sync loop. Returns a stop function." [sa] - (let [stop-ch (.-stop_ch sa) - interval (.-sync_interval sa)] - ;; Periodic sync + (let [stop-ch (.-stop_ch sa) + interval (.-sync_interval sa) + cleanups (atom [])] + ;; Periodic sync (fallback) (go-loop [] (let [[_ ch] (alts! [stop-ch (timeout interval)])] (when-not (= ch stop-ch) ( #{http-kit-channel ...}} + (atom {})) + +(defn- sse-subscribe! + "Add an http-kit async channel to the SSE listener set for `group`." + [group ch] + (swap! sse-clients update group (fnil conj #{}) ch) + (http/on-close ch + (fn [_status] + (swap! sse-clients update group disj ch)))) + +(defn- sse-notify! + "Send an SSE event to all listeners of the given groups." + [groups] + (doseq [group groups + ch (get @sse-clients group)] + (http/send! ch {:status 200 + :headers {"Content-Type" "text/event-stream" + "Cache-Control" "no-cache"} + :body (str "data: " group "\n\n")} + false))) ;; false = don't close, keep streaming + ;; --------------------------------------------------------------------------- ;; Handlers ;; --------------------------------------------------------------------------- @@ -99,8 +126,39 @@ :value (:value doc) :base-version (:base-version doc 0)}))) docs)] + ;; Notify SSE listeners for affected groups + (when (some #(= :ok (:status %)) results) + (sse-notify! groups)) (transit-response 200 results))))) +(defn- handle-events + "GET /events?group=G — SSE endpoint. Holds connection open." + [config req] + (let [params (or (:query-params req) {}) + group (get params "group") + user (authenticate config req)] + (cond + (not user) + (transit-response 401 {:error "Unauthorized"}) + + (not group) + (transit-response 400 {:error "Missing 'group' parameter"}) + + (not (authorized-group? user group)) + (transit-response 403 {:error "Access denied to group"}) + + :else + (http/with-channel req ch + (http/send! ch {:status 200 + :headers {"Content-Type" "text/event-stream" + "Cache-Control" "no-cache" + "Connection" "keep-alive" + "Access-Control-Allow-Origin" "*" + "X-Accel-Buffering" "no"} + :body "data: connected\n\n"} + false) + (sse-subscribe! group ch))))) + ;; --------------------------------------------------------------------------- ;; Ring handler ;; --------------------------------------------------------------------------- @@ -164,6 +222,10 @@ (= :options (:request-method req)) {:status 204 :headers {} :body nil} + ;; SSE live events + (= "/events" (:uri req)) + (handle-events config req) + ;; Sync endpoints (= "/sync" (:uri req)) (let [user (authenticate config req)] diff --git a/src/pocketbook/sync.cljs b/src/pocketbook/sync.cljs index 6e3610f..eadfd77 100644 --- a/src/pocketbook/sync.cljs +++ b/src/pocketbook/sync.cljs @@ -1,6 +1,7 @@ (ns pocketbook.sync "HTTP sync client — pull and push documents to/from the Pocketbook server." (:require [cognitect.transit :as t] + [clojure.string :as str] [cljs.core.async :as async :refer [chan put!]])) ;; --------------------------------------------------------------------------- @@ -91,6 +92,33 @@ (async/close! ch))) ch)) +;; --------------------------------------------------------------------------- +;; SSE — live change notifications +;; --------------------------------------------------------------------------- + +(defn listen-events + "Open an SSE connection to /events?group=G. Calls `on-change` when the + server signals new data. Returns a cleanup function." + [{:keys [server token]} group on-change] + (let [base-url (-> server + (str/replace #"/sync$" "") + (str/replace #"/$" "")) + url (str base-url "/events?group=" (js/encodeURIComponent group)) + es (js/EventSource. url)] + (set! (.-onmessage es) + (fn [e] + (let [data (.-data e)] + ;; Skip initial "connected" message + (when (not= data "connected") + (on-change data))))) + (set! (.-onerror es) + (fn [_e] + ;; EventSource auto-reconnects; nothing to do + nil)) + ;; Return cleanup + (fn [] + (.close es)))) + ;; --------------------------------------------------------------------------- ;; Online detection ;; --------------------------------------------------------------------------- diff --git a/test/pocketbook/server_test.clj b/test/pocketbook/server_test.clj index 8c84c3e..c76f772 100644 --- a/test/pocketbook/server_test.clj +++ b/test/pocketbook/server_test.clj @@ -140,3 +140,40 @@ (is (= uuid (:uuid pulled))) (is (= inst (:inst pulled))) (is (= {:a {:b 42}} (:nested pulled)))))) + +(deftest sse-notifies-on-push + (testing "SSE endpoint sends event when a push succeeds" + (let [;; Open SSE connection + sse-req (-> (HttpRequest/newBuilder) + (.uri (URI. (url "/events" "group=todo"))) + (.GET) + (.build)) + ;; Use a short timeout — we'll read what we get + events (atom []) + future (java.util.concurrent.CompletableFuture/supplyAsync + (reify java.util.function.Supplier + (get [_] + (try + (let [resp (.send client sse-req + (HttpResponse$BodyHandlers/ofInputStream)) + is (.body resp) + rdr (java.io.BufferedReader. + (java.io.InputStreamReader. is "UTF-8"))] + ;; Read lines until we get a data event about "todo" + (loop [deadline (+ (System/currentTimeMillis) 3000)] + (when (< (System/currentTimeMillis) deadline) + (when-let [line (.readLine rdr)] + (swap! events conj line) + (if (= line "data: todo") + @events + (recur deadline)))))) + (catch Exception _ @events)))))] + ;; Give SSE time to connect + (Thread/sleep 200) + ;; Push a doc — should trigger SSE event + (post-transit "/sync" + [{:id "todo:sse-test" :value {:text "SSE!"} :base-version 0}]) + ;; Wait for SSE reader to receive it + (let [result (.get future 4 java.util.concurrent.TimeUnit/SECONDS)] + (is (some #(= "data: todo" %) result) + "SSE should have received a 'data: todo' event")))))