Files
atomsync/src/pocketbook/core.cljs
Florian Schroedl 5ab102b550 refactor: remove auth system
Remove Bearer token auth, per-user group ACLs, and auth_test.clj.
The server now accepts all requests without authentication.
2026-04-16 19:17:46 +02:00

370 lines
13 KiB
Clojure

(ns pocketbook.core
"Pocketbook: a Clojure-native synced atom.
Usage:
(def conn (pocketbook/open \"my-app\"))
(def todos (pocketbook/synced-atom conn \"todo\"
{:server \"http://localhost:8090/sync\"}))
(go (<! (ready? todos))
(swap! todos assoc \"todo:1\" {:text \"Buy milk\"}))
@todos ;=> {\"todo:1\" {:text \"Buy milk\"}}
"
(:require [pocketbook.idb :as idb]
[pocketbook.sync :as sync]
[clojure.string :as str]
[cljs.core.async :as async :refer [go go-loop <! >! chan put! timeout alts!]]))
;; ---------------------------------------------------------------------------
;; Connection (IDB handle)
;; ---------------------------------------------------------------------------
(defn open
"Open a Pocketbook connection (IndexedDB database).
Returns a channel yielding the connection map."
[db-name]
(let [ch (chan 1)]
(go
(let [db (<! (idb/open db-name))]
(>! ch {:db db :db-name db-name})
(async/close! ch)))
ch))
(defn shutdown!
"Close a Pocketbook connection."
[{:keys [db atoms]}]
;; Stop all sync loops
(doseq [[_ sa] @(or atoms (atom {}))]
(when-let [stop (:stop-fn sa)]
(stop)))
(idb/close-db! db))
;; ---------------------------------------------------------------------------
;; Synced Atom — implements IAtom semantics
;; ---------------------------------------------------------------------------
(deftype SyncedAtom [group ;; string prefix, e.g. "todo"
conn ;; {:db idb, ...}
cache ;; atom containing {id -> value}
versions ;; atom containing {id -> version}
pending ;; atom containing #{id} — unsynced ids
server-opts ;; {:server url} or nil
last-sync ;; atom containing epoch ms
ready-ch ;; channel, closed when initial load complete
stop-ch ;; channel to signal stop
kick-ch ;; channel to trigger immediate push
cleanup-fn ;; atom holding connectivity cleanup fn
sync-interval ;; ms
_meta] ;; metadata atom
IAtom
IDeref
(-deref [_]
@cache)
IReset
(-reset! [_ new-val]
;; Replace the entire cache (all docs in group)
(let [old @cache]
(reset! cache new-val)
;; Track which docs changed/added/removed
(let [all-keys (into (set (keys old)) (keys new-val))
changed? (volatile! false)]
(doseq [k all-keys]
(when (not= (get old k) (get new-val k))
(vreset! changed? true)
(swap! pending conj k)
;; Write to IDB
(let [v (get new-val k)]
(if (nil? v)
;; Doc was dissoc'd — mark deleted
(idb/put-doc! (:db conn)
{:id k :value nil :version (get @versions k 0)
:updated (.now js/Date) :deleted true :synced false})
(idb/put-doc! (:db conn)
{:id k :value v :version (get @versions k 0)
:updated (.now js/Date) :deleted false :synced false})))))
;; Kick the sync loop to push immediately
(when @changed?
(put! kick-ch :kick)))
new-val))
ISwap
(-swap! [o f]
(-reset! o (f @cache)))
(-swap! [o f a]
(-reset! o (f @cache a)))
(-swap! [o f a b]
(-reset! o (f @cache a b)))
(-swap! [o f a b xs]
(-reset! o (apply f @cache a b xs)))
IWatchable
(-add-watch [_ key f]
(add-watch cache key f))
(-remove-watch [_ key]
(remove-watch cache key))
(-notify-watches [_ old new]
;; Delegated to the inner atom
nil)
IMeta
(-meta [_] @_meta)
IWithMeta
(-with-meta [_ m] (reset! _meta m))
IPrintWithWriter
(-pr-writer [_ writer opts]
(-write writer (str "#<SyncedAtom[" group "] " (count @cache) " docs>"))))
;; ---------------------------------------------------------------------------
;; Internal helpers
;; ---------------------------------------------------------------------------
(defn- prefix-str [group]
(str group ":"))
(defn- now-ms []
(.now js/Date))
(defn- doc-in-group? [group id]
(str/starts-with? id (prefix-str group)))
;; ---------------------------------------------------------------------------
;; IDB ↔ Atom sync
;; ---------------------------------------------------------------------------
(defn- load-from-idb!
"Load all docs for the group from IndexedDB into the atom.
Returns a channel that closes when done."
[sa]
(let [ch (chan 1)]
(go
(let [prefix (prefix-str (.-group sa))
docs (<! (idb/get-all-by-prefix (:db (.-conn sa)) prefix))
state (into {}
(comp
(remove :deleted)
(map (fn [d] [(:id d) (:value d)])))
docs)
vers (into {}
(map (fn [d] [(:id d) (:version d)]))
docs)]
(reset! (.-cache sa) state)
(reset! (.-versions sa) vers)
;; Load last-sync from IDB meta
(let [ls (<! (idb/get-meta (:db (.-conn sa))
(str "last-sync:" (.-group sa))))]
(reset! (.-last_sync sa) (or ls 0)))
(put! ch true)
(async/close! ch)))
ch))
(defn- write-doc-to-idb!
"Persist a single doc to IDB. Returns a channel."
[sa id value deleted?]
(idb/put-doc! (:db (.-conn sa))
{:id id
:value value
:version (get @(.-versions sa) id 0)
:updated (now-ms)
:deleted (boolean deleted?)
:synced false}))
(defn- mark-synced!
"Mark a doc as synced in IDB and update its version."
[sa id version]
(swap! (.-versions sa) assoc id version)
(swap! (.-pending sa) disj id)
(let [value (get @(.-cache sa) id)]
(idb/put-doc! (:db (.-conn sa))
{:id id
:value value
:version version
:updated (now-ms)
:deleted (nil? value)
:synced true})))
;; ---------------------------------------------------------------------------
;; Sync logic
;; ---------------------------------------------------------------------------
(defn- do-pull!
"Pull changes from server, merge into atom + IDB."
[sa]
(go
(when-let [opts (.-server_opts sa)]
(let [since @(.-last_sync sa)
result (<! (sync/pull! opts (.-group sa) since))]
(when (:ok result)
(let [docs (:docs result)
max-ts (reduce max @(.-last_sync sa)
(map :updated docs))]
;; Merge each doc into cache
(doseq [doc docs]
(let [id (:id doc)]
;; Only apply if server version > local version
(when (> (:version doc) (get @(.-versions sa) id 0))
(if (:deleted doc)
(do
(swap! (.-cache sa) dissoc id)
(swap! (.-versions sa) assoc id (:version doc))
(idb/put-doc! (:db (.-conn sa))
{:id id :value nil :version (:version doc)
:updated (:updated doc) :deleted true :synced true}))
(do
(swap! (.-cache sa) assoc id (:value doc))
(swap! (.-versions sa) assoc id (:version doc))
(idb/put-doc! (:db (.-conn sa))
{:id id :value (:value doc) :version (:version doc)
:updated (:updated doc) :deleted false :synced true}))))))
;; Update last-sync
(reset! (.-last_sync sa) max-ts)
(idb/set-meta! (:db (.-conn sa))
(str "last-sync:" (.-group sa)) max-ts))
true)))))
(defn- do-push!
"Push all unsynced local docs to the server."
[sa]
(go
(when-let [opts (.-server_opts sa)]
(let [pending-ids @(.-pending sa)]
(when (seq pending-ids)
(let [docs (mapv (fn [id]
(let [v (get @(.-cache sa) id)]
(if (nil? v)
{:id id :deleted true
:base-version (get @(.-versions sa) id 0)}
{:id id :value v
:base-version (get @(.-versions sa) id 0)})))
pending-ids)
result (<! (sync/push! opts docs))]
(when (:ok result)
(doseq [r (:results result)]
(case (:status r)
:ok
(<! (mark-synced! sa (:id r) (:version r)))
:conflict
;; On conflict, accept server value and mark synced
(do
(when (:value r)
(swap! (.-cache sa) assoc (:id r) (:value r)))
(swap! (.-versions sa) assoc (:id r) (:current-version r))
(swap! (.-pending sa) disj (:id r))
(idb/put-doc! (:db (.-conn sa))
{:id (:id r)
:value (or (:value r) (get @(.-cache sa) (:id r)))
:version (:current-version r)
:updated (now-ms)
:deleted false
:synced true}))
;; Unknown status, log
(js/console.warn "Unknown push result:" (pr-str r))))
true)))))))
(defn- do-sync!
"Run a full sync cycle: pull then push."
[sa]
(go
(when (sync/online?)
(<! (do-pull! sa))
(<! (do-push! sa)))))
;; ---------------------------------------------------------------------------
;; Sync loop
;; ---------------------------------------------------------------------------
(defn- start-sync-loop!
"Start the background sync loop. Returns a stop function."
[sa]
(let [stop-ch (.-stop_ch sa)
kick-ch (.-kick_ch sa)
interval (.-sync_interval sa)
cleanups (atom [])]
;; Periodic sync (fallback) + immediate push on kick
(go-loop []
(let [[_ ch] (alts! [stop-ch kick-ch (timeout interval)])]
(when-not (= ch stop-ch)
(if (= ch kick-ch)
(<! (do-push! sa)) ;; kick = local write, just push
(<! (do-sync! sa))) ;; timeout = full pull+push
(recur))))
;; Online/offline handler
(swap! cleanups conj
(sync/on-connectivity-change
(fn [] (go (<! (do-sync! sa))))
(fn [] nil)))
;; SSE — live pull on server push
(when-let [opts (.-server_opts sa)]
(swap! cleanups conj
(sync/listen-events opts (.-group sa)
(fn [_group]
(go (<! (do-pull! sa)))))))
(reset! (.-cleanup_fn sa) cleanups)
;; Return stop function
(fn []
(put! stop-ch :stop)
(doseq [f @(.-cleanup_fn sa)] (f)))))
;; ---------------------------------------------------------------------------
;; Public API
;; ---------------------------------------------------------------------------
(defn synced-atom
"Create a synced atom for a document group.
Options:
:server — server URL (e.g. \"http://localhost:8090/sync\")
:cache — custom atom to use (e.g. reagent/atom). Default: cljs.core/atom
:interval — sync interval in ms (default 30000)"
[conn group & [{:keys [server cache interval]
:or {interval 30000}}]]
(let [cache-atom (or cache (atom {}))
versions (atom {})
pending (atom #{})
server-opts (when server {:server server})
last-sync (atom 0)
ready-ch (chan 1)
stop-ch (chan 1)
kick-ch (chan (async/sliding-buffer 1))
cleanup-fn (atom nil)
meta-atom (atom nil)
sa (SyncedAtom. group conn cache-atom versions pending
server-opts last-sync ready-ch stop-ch kick-ch
cleanup-fn interval meta-atom)]
;; Load from IDB, then start sync
(go
(<! (load-from-idb! sa))
(put! ready-ch true)
(async/close! ready-ch)
;; Initial sync
(when server-opts
(<! (do-sync! sa))
(start-sync-loop! sa)))
sa))
(defn ready?
"Returns a channel that yields true when the atom has finished loading from IDB."
[sa]
(.-ready_ch sa))
(defn sync-now!
"Trigger an immediate sync cycle. Returns a channel."
[sa]
(do-sync! sa))
(defn pending-count
"Number of documents waiting to be synced."
[sa]
(count @(.-pending sa)))
(defn destroy!
"Stop the sync loop and clean up. Does not close the IDB connection."
[sa]
(put! (.-stop_ch sa) :stop)
(doseq [f @(.-cleanup_fn sa)] (f)))