feat: implement Pocketbook — a Clojure-native synced atom
Offline-first key-value store with atom interface (swap!, deref, add-watch) that syncs to a SQLite-backed server over Transit. Server (CLJ): - SQLite storage with Nippy serialization preserving all Clojure types - GET /sync?group=G&since=T pull endpoint with prefix-based groups - POST /sync push endpoint with per-document version checking - Conflict detection (stale write rejection) - Token-based auth with per-user group access - CORS support, soft deletes, purge compaction Client (CLJS): - IndexedDB wrapper with Transit serialization - SyncedAtom implementing IAtom (IDeref, ISwap, IReset, IWatchable) - Write-through to IndexedDB on every swap! - Background sync loop (pull + push) with configurable interval - Online/offline detection with reconnect sync - Conflict resolution (accept server value) - ready? channel for initial load - Custom cache atom support (Reagent ratom compatible) 25 tests, 77 assertions across db, transit, server, and auth.
This commit is contained in:
355
src/pocketbook/core.cljs
Normal file
355
src/pocketbook/core.cljs
Normal file
@@ -0,0 +1,355 @@
|
||||
(ns pocketbook.core
|
||||
"Pocketbook: a Clojure-native synced atom.
|
||||
|
||||
Usage:
|
||||
(def conn (pocketbook/open \"my-app\"))
|
||||
(def todos (pocketbook/synced-atom conn \"todo\"
|
||||
{:server \"http://localhost:8090/sync\"}))
|
||||
(go (<! (ready? todos))
|
||||
(swap! todos assoc \"todo:1\" {:text \"Buy milk\"}))
|
||||
@todos ;=> {\"todo:1\" {:text \"Buy milk\"}}
|
||||
"
|
||||
(:require [pocketbook.idb :as idb]
|
||||
[pocketbook.sync :as sync]
|
||||
[cljs.core.async :refer [go go-loop <! >! chan put! close! timeout alts!]]))
|
||||
|
||||
;; ---------------------------------------------------------------------------
|
||||
;; Connection (IDB handle)
|
||||
;; ---------------------------------------------------------------------------
|
||||
|
||||
(defn open
|
||||
"Open a Pocketbook connection (IndexedDB database).
|
||||
Returns a channel yielding the connection map."
|
||||
[db-name]
|
||||
(let [ch (chan 1)]
|
||||
(go
|
||||
(let [db (<! (idb/open db-name))]
|
||||
(>! ch {:db db :db-name db-name})
|
||||
(close! ch)))
|
||||
ch))
|
||||
|
||||
(defn close!
|
||||
"Close a Pocketbook connection."
|
||||
[{:keys [db atoms]}]
|
||||
;; Stop all sync loops
|
||||
(doseq [[_ sa] @(or atoms (atom {}))]
|
||||
(when-let [stop (:stop-fn sa)]
|
||||
(stop)))
|
||||
(idb/close! db))
|
||||
|
||||
;; ---------------------------------------------------------------------------
|
||||
;; Synced Atom — implements IAtom semantics
|
||||
;; ---------------------------------------------------------------------------
|
||||
|
||||
(deftype SyncedAtom [group ;; string prefix, e.g. "todo"
|
||||
conn ;; {:db idb, ...}
|
||||
cache ;; atom containing {id -> value}
|
||||
versions ;; atom containing {id -> version}
|
||||
pending ;; atom containing #{id} — unsynced ids
|
||||
server-opts ;; {:server url :token str} or nil
|
||||
last-sync ;; atom containing epoch ms
|
||||
ready-ch ;; channel, closed when initial load complete
|
||||
stop-ch ;; channel to signal stop
|
||||
cleanup-fn ;; atom holding connectivity cleanup fn
|
||||
sync-interval ;; ms
|
||||
_meta] ;; metadata atom
|
||||
|
||||
IAtom
|
||||
|
||||
IDeref
|
||||
(-deref [_]
|
||||
@cache)
|
||||
|
||||
IReset
|
||||
(-reset! [_ new-val]
|
||||
;; Replace the entire cache (all docs in group)
|
||||
(let [old @cache]
|
||||
(reset! cache new-val)
|
||||
;; Track which docs changed/added/removed
|
||||
(let [all-keys (into (set (keys old)) (keys new-val))]
|
||||
(doseq [k all-keys]
|
||||
(when (not= (get old k) (get new-val k))
|
||||
(swap! pending conj k)
|
||||
;; Write to IDB
|
||||
(let [v (get new-val k)]
|
||||
(if (nil? v)
|
||||
;; Doc was dissoc'd — mark deleted
|
||||
(idb/put-doc! (:db conn)
|
||||
{:id k :value nil :version (get @versions k 0)
|
||||
:updated (.now js/Date) :deleted true :synced false})
|
||||
(idb/put-doc! (:db conn)
|
||||
{:id k :value v :version (get @versions k 0)
|
||||
:updated (.now js/Date) :deleted false :synced false}))))))
|
||||
new-val))
|
||||
|
||||
ISwap
|
||||
(-swap! [o f]
|
||||
(-reset! o (f @cache)))
|
||||
(-swap! [o f a]
|
||||
(-reset! o (f @cache a)))
|
||||
(-swap! [o f a b]
|
||||
(-reset! o (f @cache a b)))
|
||||
(-swap! [o f a b xs]
|
||||
(-reset! o (apply f @cache a b xs)))
|
||||
|
||||
IWatchable
|
||||
(-add-watch [_ key f]
|
||||
(add-watch cache key f))
|
||||
(-remove-watch [_ key]
|
||||
(remove-watch cache key))
|
||||
(-notify-watches [_ old new]
|
||||
;; Delegated to the inner atom
|
||||
nil)
|
||||
|
||||
IMeta
|
||||
(-meta [_] @_meta)
|
||||
|
||||
IWithMeta
|
||||
(-with-meta [_ m] (reset! _meta m))
|
||||
|
||||
IPrintWithWriter
|
||||
(-pr-writer [_ writer opts]
|
||||
(-write writer (str "#<SyncedAtom[" group "] " (count @cache) " docs>"))))
|
||||
|
||||
;; ---------------------------------------------------------------------------
|
||||
;; Internal helpers
|
||||
;; ---------------------------------------------------------------------------
|
||||
|
||||
(defn- prefix-str [group]
|
||||
(str group ":"))
|
||||
|
||||
(defn- now-ms []
|
||||
(.now js/Date))
|
||||
|
||||
(defn- doc-in-group? [group id]
|
||||
(clojure.string/starts-with? id (prefix-str group)))
|
||||
|
||||
;; ---------------------------------------------------------------------------
|
||||
;; IDB ↔ Atom sync
|
||||
;; ---------------------------------------------------------------------------
|
||||
|
||||
(defn- load-from-idb!
|
||||
"Load all docs for the group from IndexedDB into the atom.
|
||||
Returns a channel that closes when done."
|
||||
[sa]
|
||||
(let [ch (chan 1)]
|
||||
(go
|
||||
(let [prefix (prefix-str (.-group sa))
|
||||
docs (<! (idb/get-all-by-prefix (:db (.-conn sa)) prefix))
|
||||
state (into {}
|
||||
(comp
|
||||
(remove :deleted)
|
||||
(map (fn [d] [(:id d) (:value d)])))
|
||||
docs)
|
||||
vers (into {}
|
||||
(map (fn [d] [(:id d) (:version d)]))
|
||||
docs)]
|
||||
(reset! (.-cache sa) state)
|
||||
(reset! (.-versions sa) vers)
|
||||
;; Load last-sync from IDB meta
|
||||
(let [ls (<! (idb/get-meta (:db (.-conn sa))
|
||||
(str "last-sync:" (.-group sa))))]
|
||||
(reset! (.-last_sync sa) (or ls 0)))
|
||||
(put! ch true)
|
||||
(close! ch)))
|
||||
ch))
|
||||
|
||||
(defn- write-doc-to-idb!
|
||||
"Persist a single doc to IDB. Returns a channel."
|
||||
[sa id value deleted?]
|
||||
(idb/put-doc! (:db (.-conn sa))
|
||||
{:id id
|
||||
:value value
|
||||
:version (get @(.-versions sa) id 0)
|
||||
:updated (now-ms)
|
||||
:deleted (boolean deleted?)
|
||||
:synced false}))
|
||||
|
||||
(defn- mark-synced!
|
||||
"Mark a doc as synced in IDB and update its version."
|
||||
[sa id version]
|
||||
(swap! (.-versions sa) assoc id version)
|
||||
(swap! (.-pending sa) disj id)
|
||||
(let [value (get @(.-cache sa) id)]
|
||||
(idb/put-doc! (:db (.-conn sa))
|
||||
{:id id
|
||||
:value value
|
||||
:version version
|
||||
:updated (now-ms)
|
||||
:deleted (nil? value)
|
||||
:synced true})))
|
||||
|
||||
;; ---------------------------------------------------------------------------
|
||||
;; Sync logic
|
||||
;; ---------------------------------------------------------------------------
|
||||
|
||||
(defn- do-pull!
|
||||
"Pull changes from server, merge into atom + IDB."
|
||||
[sa]
|
||||
(go
|
||||
(when-let [opts (.-server_opts sa)]
|
||||
(let [since @(.-last_sync sa)
|
||||
result (<! (sync/pull! opts (.-group sa) since))]
|
||||
(when (:ok result)
|
||||
(let [docs (:docs result)
|
||||
max-ts (reduce max @(.-last_sync sa)
|
||||
(map :updated docs))]
|
||||
;; Merge each doc into cache
|
||||
(doseq [doc docs]
|
||||
(let [id (:id doc)]
|
||||
;; Only apply if server version > local version
|
||||
(when (> (:version doc) (get @(.-versions sa) id 0))
|
||||
(if (:deleted doc)
|
||||
(do
|
||||
(swap! (.-cache sa) dissoc id)
|
||||
(swap! (.-versions sa) assoc id (:version doc))
|
||||
(idb/put-doc! (:db (.-conn sa))
|
||||
{:id id :value nil :version (:version doc)
|
||||
:updated (:updated doc) :deleted true :synced true}))
|
||||
(do
|
||||
(swap! (.-cache sa) assoc id (:value doc))
|
||||
(swap! (.-versions sa) assoc id (:version doc))
|
||||
(idb/put-doc! (:db (.-conn sa))
|
||||
{:id id :value (:value doc) :version (:version doc)
|
||||
:updated (:updated doc) :deleted false :synced true}))))))
|
||||
;; Update last-sync
|
||||
(reset! (.-last_sync sa) max-ts)
|
||||
(idb/set-meta! (:db (.-conn sa))
|
||||
(str "last-sync:" (.-group sa)) max-ts)))
|
||||
true)))))
|
||||
|
||||
(defn- do-push!
|
||||
"Push all unsynced local docs to the server."
|
||||
[sa]
|
||||
(go
|
||||
(when-let [opts (.-server_opts sa)]
|
||||
(let [pending-ids @(.-pending sa)]
|
||||
(when (seq pending-ids)
|
||||
(let [docs (mapv (fn [id]
|
||||
(let [v (get @(.-cache sa) id)]
|
||||
(if (nil? v)
|
||||
{:id id :deleted true
|
||||
:base-version (get @(.-versions sa) id 0)}
|
||||
{:id id :value v
|
||||
:base-version (get @(.-versions sa) id 0)})))
|
||||
pending-ids)
|
||||
result (<! (sync/push! opts docs))]
|
||||
(when (:ok result)
|
||||
(doseq [r (:results result)]
|
||||
(case (:status r)
|
||||
:ok
|
||||
(<! (mark-synced! sa (:id r) (:version r)))
|
||||
|
||||
:conflict
|
||||
;; On conflict, accept server value and mark synced
|
||||
(do
|
||||
(when (:value r)
|
||||
(swap! (.-cache sa) assoc (:id r) (:value r)))
|
||||
(swap! (.-versions sa) assoc (:id r) (:current-version r))
|
||||
(swap! (.-pending sa) disj (:id r))
|
||||
(idb/put-doc! (:db (.-conn sa))
|
||||
{:id (:id r)
|
||||
:value (or (:value r) (get @(.-cache sa) (:id r)))
|
||||
:version (:current-version r)
|
||||
:updated (now-ms)
|
||||
:deleted false
|
||||
:synced true}))
|
||||
|
||||
;; Unknown status, log
|
||||
(js/console.warn "Unknown push result:" (pr-str r))))
|
||||
true)))))))
|
||||
|
||||
(defn- do-sync!
|
||||
"Run a full sync cycle: pull then push."
|
||||
[sa]
|
||||
(go
|
||||
(when (sync/online?)
|
||||
(<! (do-pull! sa))
|
||||
(<! (do-push! sa)))))
|
||||
|
||||
;; ---------------------------------------------------------------------------
|
||||
;; Sync loop
|
||||
;; ---------------------------------------------------------------------------
|
||||
|
||||
(defn- start-sync-loop!
|
||||
"Start the background sync loop. Returns a stop function."
|
||||
[sa]
|
||||
(let [stop-ch (.-stop_ch sa)
|
||||
interval (.-sync_interval sa)]
|
||||
;; Periodic sync
|
||||
(go-loop []
|
||||
(let [[_ ch] (alts! [stop-ch (timeout interval)])]
|
||||
(when-not (= ch stop-ch)
|
||||
(<! (do-sync! sa))
|
||||
(recur))))
|
||||
;; Online/offline handler
|
||||
(let [cleanup (sync/on-connectivity-change
|
||||
(fn [] ; online
|
||||
(go (<! (do-sync! sa))))
|
||||
(fn [] ; offline
|
||||
nil))]
|
||||
(reset! (.-cleanup_fn sa) cleanup))
|
||||
;; Return stop function
|
||||
(fn []
|
||||
(put! stop-ch :stop)
|
||||
(when-let [cleanup @(.-cleanup_fn sa)]
|
||||
(cleanup)))))
|
||||
|
||||
;; ---------------------------------------------------------------------------
|
||||
;; Public API
|
||||
;; ---------------------------------------------------------------------------
|
||||
|
||||
(defn synced-atom
|
||||
"Create a synced atom for a document group.
|
||||
|
||||
Options:
|
||||
:server — server URL (e.g. \"http://localhost:8090/sync\")
|
||||
:token — auth token
|
||||
:cache — custom atom to use (e.g. reagent/atom). Default: cljs.core/atom
|
||||
:interval — sync interval in ms (default 30000)"
|
||||
[conn group & [{:keys [server token cache interval]
|
||||
:or {interval 30000}}]]
|
||||
(let [cache-atom (or cache (atom {}))
|
||||
versions (atom {})
|
||||
pending (atom #{})
|
||||
server-opts (when server {:server server :token token})
|
||||
last-sync (atom 0)
|
||||
ready-ch (chan 1)
|
||||
stop-ch (chan 1)
|
||||
cleanup-fn (atom nil)
|
||||
meta-atom (atom nil)
|
||||
sa (SyncedAtom. group conn cache-atom versions pending
|
||||
server-opts last-sync ready-ch stop-ch
|
||||
cleanup-fn interval meta-atom)]
|
||||
;; Load from IDB, then start sync
|
||||
(go
|
||||
(<! (load-from-idb! sa))
|
||||
(put! ready-ch true)
|
||||
(close! ready-ch)
|
||||
;; Initial sync
|
||||
(when server-opts
|
||||
(<! (do-sync! sa))
|
||||
(start-sync-loop! sa)))
|
||||
sa))
|
||||
|
||||
(defn ready?
|
||||
"Returns a channel that yields true when the atom has finished loading from IDB."
|
||||
[sa]
|
||||
(.-ready_ch sa))
|
||||
|
||||
(defn sync-now!
|
||||
"Trigger an immediate sync cycle. Returns a channel."
|
||||
[sa]
|
||||
(do-sync! sa))
|
||||
|
||||
(defn pending-count
|
||||
"Number of documents waiting to be synced."
|
||||
[sa]
|
||||
(count @(.-pending sa)))
|
||||
|
||||
(defn destroy!
|
||||
"Stop the sync loop and clean up. Does not close the IDB connection."
|
||||
[sa]
|
||||
(put! (.-stop_ch sa) :stop)
|
||||
(when-let [cleanup @(.-cleanup_fn sa)]
|
||||
(cleanup)))
|
||||
Reference in New Issue
Block a user