Files
atomsync/src/pocketbook/server.clj
Florian Schroedl 455e80f4e8 feat: add SSE live sync between clients
When any client pushes changes, the server broadcasts an SSE event to
all connected clients watching that group. Clients immediately pull
the latest data — no polling delay, no page reload.

Server (~30 lines):
- GET /events?group=G — SSE endpoint, holds connection open
- On successful POST /sync, notify all SSE listeners for affected groups
- Auth-aware: respects token and group permissions
- Auto-cleanup on disconnect via http-kit on-close

Client (~25 lines):
- EventSource connects to /events?group=G on sync loop start
- On SSE message, triggers do-pull! to fetch latest data
- Auto-reconnects (browser EventSource built-in behavior)
- Cleanup on destroy!

Periodic polling remains as fallback (30s default). SSE provides
sub-second sync for the common case.
2026-04-04 17:14:55 +02:00

292 lines
11 KiB
Clojure

(ns pocketbook.server
"Pocketbook sync server. Single-file HTTP server backed by SQLite.
Endpoints:
GET /sync?since=T&group=G — pull changes since timestamp
POST /sync — push local changes (with version checks)
Start:
clj -M:server
bb -m pocketbook.server"
(:require [org.httpkit.server :as http]
[pocketbook.db :as db]
[pocketbook.transit :as t]
[clojure.string :as str]
[clojure.java.io :as io])
(:gen-class))
;; ---------------------------------------------------------------------------
;; Config
;; ---------------------------------------------------------------------------
(def default-config
{:port 8090
:db-path "pocketbook.db"
:static-dir nil ;; nil = no static serving, or path like "example/todomvc"
:users nil ;; nil = no auth, or {"alice" {:token "abc" :groups #{"todo"}}}
:cors true})
;; ---------------------------------------------------------------------------
;; Auth
;; ---------------------------------------------------------------------------
(defn- authenticate
"Check Authorization header against config. Returns user map or nil."
[config req]
(if-let [users (:users config)]
(let [header (get-in req [:headers "authorization"] "")
token (str/replace header #"^Bearer\s+" "")]
(some (fn [[username user]]
(when (= token (:token user))
(assoc user :username username)))
users))
;; No auth configured — allow all
{:username "anonymous" :groups nil}))
(defn- authorized-group?
"Check if user has access to a specific group."
[user group]
(or (nil? (:groups user)) ;; nil = access to all groups
(contains? (:groups user) group)))
;; ---------------------------------------------------------------------------
;; SSE — live change notifications
;; ---------------------------------------------------------------------------
(defonce ^:private sse-clients
;; {group -> #{http-kit-channel ...}}
(atom {}))
(defn- sse-subscribe!
"Add an http-kit async channel to the SSE listener set for `group`."
[group ch]
(swap! sse-clients update group (fnil conj #{}) ch)
(http/on-close ch
(fn [_status]
(swap! sse-clients update group disj ch))))
(defn- sse-notify!
"Send an SSE event to all listeners of the given groups."
[groups]
(doseq [group groups
ch (get @sse-clients group)]
(http/send! ch {:status 200
:headers {"Content-Type" "text/event-stream"
"Cache-Control" "no-cache"}
:body (str "data: " group "\n\n")}
false))) ;; false = don't close, keep streaming
;; ---------------------------------------------------------------------------
;; Handlers
;; ---------------------------------------------------------------------------
(defn- transit-response [status body]
{:status status
:headers {"Content-Type" "application/transit+json"
"Cache-Control" "no-cache"}
:body (t/encode body)})
(defn- cors-headers [resp]
(update resp :headers merge
{"Access-Control-Allow-Origin" "*"
"Access-Control-Allow-Methods" "GET, POST, OPTIONS"
"Access-Control-Allow-Headers" "Content-Type, Authorization"
"Access-Control-Max-Age" "86400"}))
(defn- handle-pull
"GET /sync?since=T&group=G — return all docs updated since T in group G."
[ds user req]
(let [params (or (:query-params req) (:params req) {})
group (get params "group" (get params :group))
since (parse-long (or (get params "since" (get params :since)) "0"))]
(if-not group
(transit-response 400 {:error "Missing 'group' parameter"})
(if-not (authorized-group? user group)
(transit-response 403 {:error "Access denied to group"})
(let [docs (db/docs-since ds group since)]
(transit-response 200 docs))))))
(defn- handle-push
"POST /sync — accept a batch of document writes.
Body: [{:id ... :value ... :base-version N} ...]
Entries with :deleted true are treated as deletes."
[ds user req]
(let [body (t/decode (:body req))
docs (if (map? body) [body] body)
;; Check all docs belong to authorized groups
groups (into #{} (map #(first (str/split (:id %) #":" 2))) docs)
denied (remove #(authorized-group? user %) groups)]
(if (seq denied)
(transit-response 403 {:error (str "Access denied to groups: " (str/join ", " denied))})
(let [results (mapv (fn [doc]
(if (:deleted doc)
(db/delete! ds {:id (:id doc)
:base-version (:base-version doc 0)})
(db/upsert! ds {:id (:id doc)
:value (:value doc)
:base-version (:base-version doc 0)})))
docs)]
;; Notify SSE listeners for affected groups
(when (some #(= :ok (:status %)) results)
(sse-notify! groups))
(transit-response 200 results)))))
(defn- handle-events
"GET /events?group=G — SSE endpoint. Holds connection open."
[config req]
(let [params (or (:query-params req) {})
group (get params "group")
user (authenticate config req)]
(cond
(not user)
(transit-response 401 {:error "Unauthorized"})
(not group)
(transit-response 400 {:error "Missing 'group' parameter"})
(not (authorized-group? user group))
(transit-response 403 {:error "Access denied to group"})
:else
(http/with-channel req ch
(http/send! ch {:status 200
:headers {"Content-Type" "text/event-stream"
"Cache-Control" "no-cache"
"Connection" "keep-alive"
"Access-Control-Allow-Origin" "*"
"X-Accel-Buffering" "no"}
:body "data: connected\n\n"}
false)
(sse-subscribe! group ch)))))
;; ---------------------------------------------------------------------------
;; Ring handler
;; ---------------------------------------------------------------------------
(defn- parse-query-params [query-string]
(when query-string
(into {}
(for [pair (str/split query-string #"&")
:let [[k v] (str/split pair #"=" 2)]
:when k]
[k (or v "")]))))
;; ---------------------------------------------------------------------------
;; Static file serving
;; ---------------------------------------------------------------------------
(def ^:private content-types
{"html" "text/html; charset=utf-8"
"css" "text/css; charset=utf-8"
"js" "application/javascript; charset=utf-8"
"json" "application/json"
"svg" "image/svg+xml"
"png" "image/png"
"jpg" "image/jpeg"
"ico" "image/x-icon"
"woff2" "font/woff2"
"woff" "font/woff"
"map" "application/json"})
(defn- ext [path]
(let [i (str/last-index-of path ".")]
(when (and i (pos? i))
(subs path (inc i)))))
(defn- serve-static
"Attempt to serve a static file from a directory. Returns response or nil."
[static-dir uri]
(when static-dir
(let [rel (if (= "/" uri) "/todomvc.html" uri)
file (io/file static-dir (subs rel 1))]
(when (and (.isFile file) (.canRead file)
;; Prevent path traversal
(.startsWith (.toPath file) (.toPath (io/file static-dir))))
{:status 200
:headers {"Content-Type" (get content-types (ext (.getName file))
"application/octet-stream")
"Cache-Control" "no-cache"}
:body (io/input-stream file)}))))
;; ---------------------------------------------------------------------------
;; Ring handler
;; ---------------------------------------------------------------------------
(defn make-handler
"Create the Ring handler function."
[ds config]
(fn [req]
(let [req (assoc req :query-params (parse-query-params (:query-string req)))
resp (cond
;; CORS preflight
(= :options (:request-method req))
{:status 204 :headers {} :body nil}
;; SSE live events
(= "/events" (:uri req))
(handle-events config req)
;; Sync endpoints
(= "/sync" (:uri req))
(let [user (authenticate config req)]
(if-not user
(transit-response 401 {:error "Unauthorized"})
(case (:request-method req)
:get (handle-pull ds user req)
:post (handle-push ds user req)
(transit-response 405 {:error "Method not allowed"}))))
;; Static files (including / → todomvc.html)
:else
(or (serve-static (:static-dir config) (:uri req))
{:status 404
:headers {"Content-Type" "text/plain"}
:body "Not found"}))]
(if (:cors config)
(cors-headers resp)
resp))))
;; ---------------------------------------------------------------------------
;; Server lifecycle
;; ---------------------------------------------------------------------------
(defn start!
"Start the Pocketbook server. Returns a stop function."
([]
(start! {}))
([config]
(let [config (merge default-config config)
ds (db/open (:db-path config))
handler (make-handler ds config)
server (http/run-server handler {:port (:port config)})]
(println (str "🔶 Pocketbook server running on http://localhost:" (:port config)))
(println (str " Database: " (:db-path config)))
(println (str " Auth: " (if (:users config) "enabled" "disabled")))
(when (:static-dir config)
(println (str " Static: " (:static-dir config)))
(println (str " App: http://localhost:" (:port config) "/")))
{:stop server
:ds ds
:config config})))
(defn stop!
"Stop a running server."
[{:keys [stop]}]
(when stop (stop)))
;; ---------------------------------------------------------------------------
;; Main
;; ---------------------------------------------------------------------------
(defn -main [& args]
(let [opts (apply hash-map (map #(if (str/starts-with? % "--")
(keyword (subs % 2))
%)
args))
config (cond-> {}
(:port opts) (assoc :port (parse-long (:port opts)))
(:db-path opts) (assoc :db-path (:db-path opts))
(:static-dir opts) (assoc :static-dir (:static-dir opts)))]
(start! config)
@(promise)))