refactor: extract shared .cljc library with store protocol

Move core, sync, and transit from platform-specific .clj/.cljs to
shared .cljc files with reader conditionals. This enables testing
the full sync logic on the JVM and using SyncedAtom from Clojure
clients.

Key changes:
- PStore protocol (store.cljc) decouples core from storage backend
- IDB store (store/idb.cljs) and memory store (store/memory.cljc)
- SyncedAtom implements CLJ IDeref/IAtom/IRef + CLJS equivalents
- Sync client uses java.net.http on CLJ, fetch on CLJS
- SSE remains CLJS-only; JVM clients use polling
- API change: store passed explicitly instead of pb/open
- 7 new JVM tests: local ops, persistence, watches, two-client sync
- 28 tests total, 87 assertions, all passing
This commit is contained in:
Florian Schroedl
2026-04-16 19:42:06 +02:00
parent 5ab102b550
commit 86b54e1291
15 changed files with 711 additions and 552 deletions

2
bb.edn
View File

@@ -14,7 +14,7 @@
test {:doc "Run all server tests"
:task (let [expr (str "(require 'pocketbook.db-test 'pocketbook.transit-test"
" 'pocketbook.server-test)"
" 'pocketbook.server-test 'pocketbook.core-test)"
" (let [r (clojure.test/run-all-tests #\"pocketbook\\..*\")]"
" (System/exit (if (and (zero? (:fail r)) (zero? (:error r))) 0 1)))")]
(shell "clj" "-M:dev" "-e" expr))}

View File

@@ -1,5 +1,6 @@
{:paths ["src"]
:deps {org.clojure/clojure {:mvn/version "1.12.0"}
org.clojure/core.async {:mvn/version "1.7.701"}
http-kit/http-kit {:mvn/version "2.8.0"}
com.cognitect/transit-clj {:mvn/version "1.0.333"}
com.taoensso/nippy {:mvn/version "3.4.2"}

View File

@@ -1,6 +1,7 @@
(ns pocketbook.todomvc
"TodoMVC built on Pocketbook — offline-first, synced, Clojure-native."
(:require [pocketbook.core :as pb]
[pocketbook.store.idb :as idb]
[cljs.core.async :refer [go <!]]
[clojure.string :as str]))
@@ -241,11 +242,11 @@
(defn ^:export init []
(go
(let [conn (<! (pb/open "pocketbook-todomvc"))
todos (pb/synced-atom conn "todo"
(let [store (<! (idb/open "pocketbook-todomvc"))
todos (pb/synced-atom store "todo"
{:server "http://localhost:8090/sync"
:interval 15000})]
(reset! !conn conn)
(reset! !conn store)
(reset! !todos todos)
;; Render + bind immediately (empty or stale is fine)
(render!)

View File

@@ -2,48 +2,63 @@
"Pocketbook: a Clojure-native synced atom.
Usage:
(def conn (pocketbook/open \"my-app\"))
(def todos (pocketbook/synced-atom conn \"todo\"
(def store (<! (idb/open \"my-app\"))) ;; or (memory/create)
(def todos (pb/synced-atom store \"todo\"
{:server \"http://localhost:8090/sync\"}))
(go (<! (ready? todos))
(swap! todos assoc \"todo:1\" {:text \"Buy milk\"}))
@todos ;=> {\"todo:1\" {:text \"Buy milk\"}}
"
(:require [pocketbook.idb :as idb]
(:require [pocketbook.store :as store]
[pocketbook.sync :as sync]
[clojure.string :as str]
[cljs.core.async :as async :refer [go go-loop <! >! chan put! timeout alts!]]))
#?(:clj [clojure.core.async :as async :refer [go go-loop <! >! chan put! timeout alts!]]
:cljs [cljs.core.async :as async :refer [go go-loop <! >! chan put! timeout alts!]])))
;; ---------------------------------------------------------------------------
;; Connection (IDB handle)
;; Internal helpers
;; ---------------------------------------------------------------------------
(defn open
"Open a Pocketbook connection (IndexedDB database).
Returns a channel yielding the connection map."
[db-name]
(let [ch (chan 1)]
(go
(let [db (<! (idb/open db-name))]
(>! ch {:db db :db-name db-name})
(async/close! ch)))
ch))
(defn- now-ms []
#?(:clj (System/currentTimeMillis)
:cljs (.now js/Date)))
(defn shutdown!
"Close a Pocketbook connection."
[{:keys [db atoms]}]
;; Stop all sync loops
(doseq [[_ sa] @(or atoms (atom {}))]
(when-let [stop (:stop-fn sa)]
(stop)))
(idb/close-db! db))
(defn- prefix-str [group]
(str group ":"))
;; ---------------------------------------------------------------------------
;; Synced Atom — implements IAtom semantics
;; Shared reset logic
;; ---------------------------------------------------------------------------
(defn- do-reset!*
"Shared reset implementation. Updates cache, tracks pending, writes to store, kicks sync."
[store cache versions pending kick-ch new-val]
(let [old @cache]
(reset! cache new-val)
(let [all-keys (into (set (keys old)) (keys new-val))
changed? (volatile! false)]
(doseq [k all-keys]
(when (not= (get old k) (get new-val k))
(vreset! changed? true)
(swap! pending conj k)
(let [v (get new-val k)]
(if (nil? v)
(store/put-doc! store
{:id k :value nil :version (get @versions k 0)
:updated (now-ms) :deleted true :synced false})
(store/put-doc! store
{:id k :value v :version (get @versions k 0)
:updated (now-ms) :deleted false :synced false})))))
(when @changed?
(put! kick-ch :kick)))
new-val))
;; ---------------------------------------------------------------------------
;; Synced Atom
;; ---------------------------------------------------------------------------
(deftype SyncedAtom [group ;; string prefix, e.g. "todo"
conn ;; {:db idb, ...}
store ;; PStore implementation
cache ;; atom containing {id -> value}
versions ;; atom containing {id -> version}
pending ;; atom containing #{id} — unsynced ids
@@ -56,38 +71,42 @@
sync-interval ;; ms
_meta] ;; metadata atom
IAtom
#?@(:clj
[clojure.lang.IDeref
(deref [_] @cache)
clojure.lang.IRef
(addWatch [this key f] (add-watch cache key f) this)
(removeWatch [this key] (remove-watch cache key) this)
(getWatches [_] (.getWatches ^clojure.lang.IRef cache))
(getValidator [_] nil)
(setValidator [_ _vf] nil)
clojure.lang.IAtom
(reset [_ newval]
(do-reset!* store cache versions pending kick-ch newval))
(swap [this f]
(.reset this (f @cache)))
(swap [this f arg]
(.reset this (f @cache arg)))
(swap [this f arg1 arg2]
(.reset this (f @cache arg1 arg2)))
(swap [this f x y args]
(.reset this (apply f @cache x y args)))
(compareAndSet [this old new]
(if (= @cache old)
(do (.reset this new) true)
false))]
:cljs
[IAtom
IDeref
(-deref [_]
@cache)
(-deref [_] @cache)
IReset
(-reset! [_ new-val]
;; Replace the entire cache (all docs in group)
(let [old @cache]
(reset! cache new-val)
;; Track which docs changed/added/removed
(let [all-keys (into (set (keys old)) (keys new-val))
changed? (volatile! false)]
(doseq [k all-keys]
(when (not= (get old k) (get new-val k))
(vreset! changed? true)
(swap! pending conj k)
;; Write to IDB
(let [v (get new-val k)]
(if (nil? v)
;; Doc was dissoc'd — mark deleted
(idb/put-doc! (:db conn)
{:id k :value nil :version (get @versions k 0)
:updated (.now js/Date) :deleted true :synced false})
(idb/put-doc! (:db conn)
{:id k :value v :version (get @versions k 0)
:updated (.now js/Date) :deleted false :synced false})))))
;; Kick the sync loop to push immediately
(when @changed?
(put! kick-ch :kick)))
new-val))
(do-reset!* store cache versions pending kick-ch new-val))
ISwap
(-swap! [o f]
@@ -104,8 +123,7 @@
(add-watch cache key f))
(-remove-watch [_ key]
(remove-watch cache key))
(-notify-watches [_ old new]
;; Delegated to the inner atom
(-notify-watches [_ _old _new]
nil)
IMeta
@@ -115,34 +133,21 @@
(-with-meta [_ m] (reset! _meta m))
IPrintWithWriter
(-pr-writer [_ writer opts]
(-write writer (str "#<SyncedAtom[" group "] " (count @cache) " docs>"))))
(-pr-writer [_ writer _opts]
(-write writer (str "#<SyncedAtom[" group "] " (count @cache) " docs>")))]))
;; ---------------------------------------------------------------------------
;; Internal helpers
;; Store ↔ Atom sync
;; ---------------------------------------------------------------------------
(defn- prefix-str [group]
(str group ":"))
(defn- now-ms []
(.now js/Date))
(defn- doc-in-group? [group id]
(str/starts-with? id (prefix-str group)))
;; ---------------------------------------------------------------------------
;; IDB ↔ Atom sync
;; ---------------------------------------------------------------------------
(defn- load-from-idb!
"Load all docs for the group from IndexedDB into the atom.
(defn- load-from-store!
"Load all docs for the group from the store into the atom.
Returns a channel that closes when done."
[sa]
(let [ch (chan 1)]
(go
(let [prefix (prefix-str (.-group sa))
docs (<! (idb/get-all-by-prefix (:db (.-conn sa)) prefix))
docs (<! (store/docs-by-prefix (.-store sa) prefix))
state (into {}
(comp
(remove :deleted)
@@ -153,32 +158,20 @@
docs)]
(reset! (.-cache sa) state)
(reset! (.-versions sa) vers)
;; Load last-sync from IDB meta
(let [ls (<! (idb/get-meta (:db (.-conn sa))
(let [ls (<! (store/get-meta (.-store sa)
(str "last-sync:" (.-group sa))))]
(reset! (.-last_sync sa) (or ls 0)))
(put! ch true)
(async/close! ch)))
ch))
(defn- write-doc-to-idb!
"Persist a single doc to IDB. Returns a channel."
[sa id value deleted?]
(idb/put-doc! (:db (.-conn sa))
{:id id
:value value
:version (get @(.-versions sa) id 0)
:updated (now-ms)
:deleted (boolean deleted?)
:synced false}))
(defn- mark-synced!
"Mark a doc as synced in IDB and update its version."
"Mark a doc as synced in the store and update its version."
[sa id version]
(swap! (.-versions sa) assoc id version)
(swap! (.-pending sa) disj id)
(let [value (get @(.-cache sa) id)]
(idb/put-doc! (:db (.-conn sa))
(store/put-doc! (.-store sa)
{:id id
:value value
:version version
@@ -191,7 +184,7 @@
;; ---------------------------------------------------------------------------
(defn- do-pull!
"Pull changes from server, merge into atom + IDB."
"Pull changes from server, merge into atom + store."
[sa]
(go
(when-let [opts (.-server_opts sa)]
@@ -201,27 +194,24 @@
(let [docs (:docs result)
max-ts (reduce max @(.-last_sync sa)
(map :updated docs))]
;; Merge each doc into cache
(doseq [doc docs]
(let [id (:id doc)]
;; Only apply if server version > local version
(when (> (:version doc) (get @(.-versions sa) id 0))
(if (:deleted doc)
(do
(swap! (.-cache sa) dissoc id)
(swap! (.-versions sa) assoc id (:version doc))
(idb/put-doc! (:db (.-conn sa))
(store/put-doc! (.-store sa)
{:id id :value nil :version (:version doc)
:updated (:updated doc) :deleted true :synced true}))
(do
(swap! (.-cache sa) assoc id (:value doc))
(swap! (.-versions sa) assoc id (:version doc))
(idb/put-doc! (:db (.-conn sa))
(store/put-doc! (.-store sa)
{:id id :value (:value doc) :version (:version doc)
:updated (:updated doc) :deleted false :synced true}))))))
;; Update last-sync
(reset! (.-last_sync sa) max-ts)
(idb/set-meta! (:db (.-conn sa))
(store/set-meta! (.-store sa)
(str "last-sync:" (.-group sa)) max-ts))
true)))))
@@ -248,13 +238,12 @@
(<! (mark-synced! sa (:id r) (:version r)))
:conflict
;; On conflict, accept server value and mark synced
(do
(when (:value r)
(swap! (.-cache sa) assoc (:id r) (:value r)))
(swap! (.-versions sa) assoc (:id r) (:current-version r))
(swap! (.-pending sa) disj (:id r))
(idb/put-doc! (:db (.-conn sa))
(store/put-doc! (.-store sa)
{:id (:id r)
:value (or (:value r) (get @(.-cache sa) (:id r)))
:version (:current-version r)
@@ -262,8 +251,9 @@
:deleted false
:synced true}))
;; Unknown status, log
(js/console.warn "Unknown push result:" (pr-str r))))
;; Unknown status
#?(:clj (println "Unknown push result:" (pr-str r))
:cljs (js/console.warn "Unknown push result:" (pr-str r)))))
true)))))))
(defn- do-sync!
@@ -279,7 +269,7 @@
;; ---------------------------------------------------------------------------
(defn- start-sync-loop!
"Start the background sync loop. Returns a stop function."
"Start the background sync loop."
[sa]
(let [stop-ch (.-stop_ch sa)
kick-ch (.-kick_ch sa)
@@ -290,25 +280,22 @@
(let [[_ ch] (alts! [stop-ch kick-ch (timeout interval)])]
(when-not (= ch stop-ch)
(if (= ch kick-ch)
(<! (do-push! sa)) ;; kick = local write, just push
(<! (do-sync! sa))) ;; timeout = full pull+push
(<! (do-push! sa))
(<! (do-sync! sa)))
(recur))))
;; Online/offline handler
(swap! cleanups conj
(sync/on-connectivity-change
(fn [] (go (<! (do-sync! sa))))
(fn [] nil)))
;; SSE — live pull on server push
;; SSE — live pull on server push (CLJS only)
#?(:cljs
(when-let [opts (.-server_opts sa)]
(swap! cleanups conj
(sync/listen-events opts (.-group sa)
(fn [_group]
(go (<! (do-pull! sa)))))))
(reset! (.-cleanup_fn sa) cleanups)
;; Return stop function
(fn []
(put! stop-ch :stop)
(doseq [f @(.-cleanup_fn sa)] (f)))))
(go (<! (do-pull! sa))))))))
(reset! (.-cleanup_fn sa) @cleanups)))
;; ---------------------------------------------------------------------------
;; Public API
@@ -321,7 +308,7 @@
:server — server URL (e.g. \"http://localhost:8090/sync\")
:cache — custom atom to use (e.g. reagent/atom). Default: cljs.core/atom
:interval — sync interval in ms (default 30000)"
[conn group & [{:keys [server cache interval]
[store group & [{:keys [server cache interval]
:or {interval 30000}}]]
(let [cache-atom (or cache (atom {}))
versions (atom {})
@@ -333,22 +320,20 @@
kick-ch (chan (async/sliding-buffer 1))
cleanup-fn (atom nil)
meta-atom (atom nil)
sa (SyncedAtom. group conn cache-atom versions pending
sa (SyncedAtom. group store cache-atom versions pending
server-opts last-sync ready-ch stop-ch kick-ch
cleanup-fn interval meta-atom)]
;; Load from IDB, then start sync
(go
(<! (load-from-idb! sa))
(<! (load-from-store! sa))
(put! ready-ch true)
(async/close! ready-ch)
;; Initial sync
(when server-opts
(<! (do-sync! sa))
(start-sync-loop! sa)))
sa))
(defn ready?
"Returns a channel that yields true when the atom has finished loading from IDB."
"Returns a channel that yields true when the atom has finished loading from the store."
[sa]
(.-ready_ch sa))
@@ -363,7 +348,7 @@
(count @(.-pending sa)))
(defn destroy!
"Stop the sync loop and clean up. Does not close the IDB connection."
"Stop the sync loop and clean up. Does not close the store."
[sa]
(put! (.-stop_ch sa) :stop)
(doseq [f @(.-cleanup_fn sa)] (f)))

View File

@@ -1,225 +0,0 @@
(ns pocketbook.idb
"IndexedDB wrapper with Transit serialization.
Stores documents as Transit-encoded strings preserving all Clojure types."
(:require [cognitect.transit :as t]
[cljs.core.async :as async :refer [chan put!]]))
;; ---------------------------------------------------------------------------
;; Transit
;; ---------------------------------------------------------------------------
(def ^:private writer (t/writer :json))
(def ^:private reader (t/reader :json))
(defn- encode [v]
(t/write writer v))
(defn- decode [s]
(when s (t/read reader s)))
;; ---------------------------------------------------------------------------
;; IDB operations
;; ---------------------------------------------------------------------------
(defn open
"Open an IndexedDB database. Returns a channel that yields the db."
[db-name]
(let [ch (chan 1)
req (.open js/indexedDB db-name 1)]
(set! (.-onupgradeneeded req)
(fn [e]
(let [db (.-result (.-target e))]
;; Main document store
(when-not (.contains (.-objectStoreNames db) "docs")
(let [store (.createObjectStore db "docs" #js {:keyPath "id"})]
(.createIndex store "synced" "synced" #js {:unique false})
(.createIndex store "updated" "updated" #js {:unique false})))
;; Metadata store (last-sync timestamps, etc.)
(when-not (.contains (.-objectStoreNames db) "meta")
(.createObjectStore db "meta" #js {:keyPath "key"})))))
(set! (.-onsuccess req)
(fn [e]
(put! ch (.-result (.-target e)))
(async/close! ch)))
(set! (.-onerror req)
(fn [e]
(js/console.error "IDB open error:" e)
(async/close! ch)))
ch))
(defn- tx
"Start an IDB transaction. mode is :readonly or :readwrite."
[db store-name mode]
(let [mode-str (case mode :readonly "readonly" :readwrite "readwrite")]
(.transaction db #js [store-name] mode-str)))
(defn put-doc!
"Write a document to IDB. Returns a channel that closes on success.
doc should be: {:id str :value any :version int :updated int :deleted bool :synced bool}"
[db doc]
(let [ch (chan 1)
txn (tx db "docs" :readwrite)
store (.objectStore txn "docs")
;; Serialize the value to Transit, keep metadata as-is
obj #js {:id (:id doc)
:value (encode (:value doc))
:version (:version doc 0)
:updated (:updated doc 0)
:deleted (boolean (:deleted doc false))
:synced (boolean (:synced doc false))}
req (.put store obj)]
(set! (.-onsuccess req) (fn [_] (put! ch true) (async/close! ch)))
(set! (.-onerror req) (fn [e] (js/console.error "IDB put error:" e) (async/close! ch)))
ch))
(defn put-docs!
"Write multiple documents in a single transaction. Returns a channel."
[db docs]
(let [ch (chan 1)
txn (tx db "docs" :readwrite)
store (.objectStore txn "docs")]
(doseq [doc docs]
(let [obj #js {:id (:id doc)
:value (encode (:value doc))
:version (:version doc 0)
:updated (:updated doc 0)
:deleted (boolean (:deleted doc false))
:synced (boolean (:synced doc false))}]
(.put store obj)))
(set! (.-oncomplete txn) (fn [_] (put! ch true) (async/close! ch)))
(set! (.-onerror txn) (fn [e] (js/console.error "IDB batch put error:" e) (async/close! ch)))
ch))
(defn get-doc
"Read a single document by id. Returns a channel yielding the doc or nil."
[db id]
(let [ch (chan 1)
txn (tx db "docs" :readonly)
store (.objectStore txn "docs")
req (.get store id)]
(set! (.-onsuccess req)
(fn [e]
(let [result (.-result (.-target e))]
(if result
(do (put! ch {:id (.-id result)
:value (decode (.-value result))
:version (.-version result)
:updated (.-updated result)
:deleted (.-deleted result)
:synced (.-synced result)})
(async/close! ch))
(async/close! ch)))))
(set! (.-onerror req)
(fn [e] (js/console.error "IDB get error:" e) (async/close! ch)))
ch))
(defn get-all-by-prefix
"Get all documents whose id starts with prefix (e.g., 'todo:').
Returns a channel yielding a vector of docs."
[db prefix]
(let [ch (chan 1)
txn (tx db "docs" :readonly)
store (.objectStore txn "docs")
range (.bound js/IDBKeyRange prefix (str prefix "\uffff"))
req (.openCursor store range)
docs (atom [])]
(set! (.-onsuccess req)
(fn [e]
(let [cursor (.-result (.-target e))]
(if cursor
(let [val (.-value cursor)]
(swap! docs conj
{:id (.-id val)
:value (decode (.-value val))
:version (.-version val)
:updated (.-updated val)
:deleted (.-deleted val)
:synced (.-synced val)})
(.continue cursor))
(do
(put! ch @docs)
(async/close! ch))))))
(set! (.-onerror req)
(fn [e] (js/console.error "IDB cursor error:" e) (async/close! ch)))
ch))
(defn get-unsynced
"Get all documents with synced=false. Returns a channel yielding a vector."
[db]
(let [ch (chan 1)
txn (tx db "docs" :readonly)
store (.objectStore txn "docs")
idx (.index store "synced")
req (.openCursor idx (.only js/IDBKeyRange false))
docs (atom [])]
(set! (.-onsuccess req)
(fn [e]
(let [cursor (.-result (.-target e))]
(if cursor
(let [val (.-value cursor)]
(swap! docs conj
{:id (.-id val)
:value (decode (.-value val))
:version (.-version val)
:updated (.-updated val)
:deleted (.-deleted val)
:synced false})
(.continue cursor))
(do
(put! ch @docs)
(async/close! ch))))))
(set! (.-onerror req)
(fn [e] (js/console.error "IDB unsynced error:" e) (async/close! ch)))
ch))
(defn delete-doc!
"Delete a document from IDB by id. Returns a channel."
[db id]
(let [ch (chan 1)
txn (tx db "docs" :readwrite)
store (.objectStore txn "docs")
req (.delete store id)]
(set! (.-onsuccess req) (fn [_] (put! ch true) (async/close! ch)))
(set! (.-onerror req) (fn [e] (js/console.error "IDB delete error:" e) (async/close! ch)))
ch))
;; ---------------------------------------------------------------------------
;; Metadata
;; ---------------------------------------------------------------------------
(defn get-meta
"Get a metadata value by key. Returns a channel."
[db key]
(let [ch (chan 1)
txn (tx db "meta" :readonly)
store (.objectStore txn "meta")
req (.get store key)]
(set! (.-onsuccess req)
(fn [e]
(let [result (.-result (.-target e))]
(if result
(do (put! ch (.-value result))
(async/close! ch))
(async/close! ch)))))
(set! (.-onerror req) (fn [_] (async/close! ch)))
ch))
(defn set-meta!
"Set a metadata value. Returns a channel."
[db key value]
(let [ch (chan 1)
txn (tx db "meta" :readwrite)
store (.objectStore txn "meta")
req (.put store #js {:key key :value value})]
(set! (.-onsuccess req) (fn [_] (put! ch true) (async/close! ch)))
(set! (.-onerror req) (fn [_] (async/close! ch)))
ch))
;; ---------------------------------------------------------------------------
;; Close
;; ---------------------------------------------------------------------------
(defn close-db!
"Close the IDB connection."
[db]
(when db (.close db)))

23
src/pocketbook/store.cljc Normal file
View File

@@ -0,0 +1,23 @@
(ns pocketbook.store
"Storage protocol for Pocketbook.
All methods return core.async channels.")
(defprotocol PStore
(put-doc! [store doc]
"Write a document to the store. doc is a map:
{:id str, :value any, :version int, :updated int, :deleted bool, :synced bool}
Returns a channel that closes on success.")
(docs-by-prefix [store prefix]
"Get all documents whose id starts with prefix.
Returns a channel yielding a vector of doc maps.")
(get-meta [store key]
"Get a metadata value by key.
Returns a channel yielding the value, or nil if not found.")
(set-meta! [store key value]
"Set a metadata value. Returns a channel that closes on success.")
(close-store! [store]
"Close the store and release resources."))

View File

@@ -0,0 +1,118 @@
(ns pocketbook.store.idb
"IndexedDB store implementing the PStore protocol."
(:require [pocketbook.store :as store]
[pocketbook.transit :as transit]
[cljs.core.async :as async :refer [chan put!]]))
;; ---------------------------------------------------------------------------
;; IDB operations
;; ---------------------------------------------------------------------------
(defn- tx
"Start an IDB transaction. mode is :readonly or :readwrite."
[db store-name mode]
(let [mode-str (case mode :readonly "readonly" :readwrite "readwrite")]
(.transaction db #js [store-name] mode-str)))
;; ---------------------------------------------------------------------------
;; IDBStore
;; ---------------------------------------------------------------------------
(deftype IDBStore [db]
store/PStore
(put-doc! [_ doc]
(let [ch (chan 1)
txn (tx db "docs" :readwrite)
store (.objectStore txn "docs")
obj #js {:id (:id doc)
:value (transit/encode (:value doc))
:version (:version doc 0)
:updated (:updated doc 0)
:deleted (boolean (:deleted doc false))
:synced (boolean (:synced doc false))}
req (.put store obj)]
(set! (.-onsuccess req) (fn [_] (put! ch true) (async/close! ch)))
(set! (.-onerror req) (fn [e] (js/console.error "IDB put error:" e) (async/close! ch)))
ch))
(docs-by-prefix [_ prefix]
(let [ch (chan 1)
txn (tx db "docs" :readonly)
store (.objectStore txn "docs")
range (.bound js/IDBKeyRange prefix (str prefix "\uffff"))
req (.openCursor store range)
docs (atom [])]
(set! (.-onsuccess req)
(fn [e]
(let [cursor (.-result (.-target e))]
(if cursor
(let [val (.-value cursor)]
(swap! docs conj
{:id (.-id val)
:value (transit/decode (.-value val))
:version (.-version val)
:updated (.-updated val)
:deleted (.-deleted val)
:synced (.-synced val)})
(.continue cursor))
(do
(put! ch @docs)
(async/close! ch))))))
(set! (.-onerror req)
(fn [e] (js/console.error "IDB cursor error:" e) (async/close! ch)))
ch))
(get-meta [_ key]
(let [ch (chan 1)
txn (tx db "meta" :readonly)
store (.objectStore txn "meta")
req (.get store key)]
(set! (.-onsuccess req)
(fn [e]
(let [result (.-result (.-target e))]
(if result
(do (put! ch (.-value result))
(async/close! ch))
(async/close! ch)))))
(set! (.-onerror req) (fn [_] (async/close! ch)))
ch))
(set-meta! [_ key value]
(let [ch (chan 1)
txn (tx db "meta" :readwrite)
store (.objectStore txn "meta")
req (.put store #js {:key key :value value})]
(set! (.-onsuccess req) (fn [_] (put! ch true) (async/close! ch)))
(set! (.-onerror req) (fn [_] (async/close! ch)))
ch))
(close-store! [_]
(when db (.close db))))
;; ---------------------------------------------------------------------------
;; Open
;; ---------------------------------------------------------------------------
(defn open
"Open an IndexedDB store. Returns a channel yielding the IDBStore."
[db-name]
(let [ch (chan 1)
req (.open js/indexedDB db-name 1)]
(set! (.-onupgradeneeded req)
(fn [e]
(let [db (.-result (.-target e))]
(when-not (.contains (.-objectStoreNames db) "docs")
(let [store (.createObjectStore db "docs" #js {:keyPath "id"})]
(.createIndex store "synced" "synced" #js {:unique false})
(.createIndex store "updated" "updated" #js {:unique false})))
(when-not (.contains (.-objectStoreNames db) "meta")
(.createObjectStore db "meta" #js {:keyPath "key"})))))
(set! (.-onsuccess req)
(fn [e]
(put! ch (IDBStore. (.-result (.-target e))))
(async/close! ch)))
(set! (.-onerror req)
(fn [e]
(js/console.error "IDB open error:" e)
(async/close! ch)))
ch))

View File

@@ -0,0 +1,49 @@
(ns pocketbook.store.memory
"In-memory store backed by atoms. Useful for testing and JVM clients."
(:require [pocketbook.store :as store]
#?(:clj [clojure.core.async :as async :refer [chan put!]]
:cljs [cljs.core.async :as async :refer [chan put!]])
[clojure.string :as str]))
(deftype MemoryStore [docs meta-store]
store/PStore
(put-doc! [_ doc]
(let [ch (chan 1)]
(swap! docs assoc (:id doc) doc)
(put! ch true)
(async/close! ch)
ch))
(docs-by-prefix [_ prefix]
(let [ch (chan 1)
matching (->> @docs
vals
(filter #(str/starts-with? (:id %) prefix))
vec)]
(put! ch matching)
(async/close! ch)
ch))
(get-meta [_ key]
(let [ch (chan 1)
v (get @meta-store key)]
(if (some? v)
(put! ch v)
nil)
(async/close! ch)
ch))
(set-meta! [_ key value]
(let [ch (chan 1)]
(swap! meta-store assoc key value)
(put! ch true)
(async/close! ch)
ch))
(close-store! [_]
nil))
(defn create
"Create a new in-memory store."
[]
(MemoryStore. (atom {}) (atom {})))

160
src/pocketbook/sync.cljc Normal file
View File

@@ -0,0 +1,160 @@
(ns pocketbook.sync
"HTTP sync client — pull and push documents to/from the Pocketbook server."
(:require [pocketbook.transit :as transit]
[clojure.string :as str]
#?(:clj [clojure.core.async :as async :refer [chan put!]]
:cljs [cljs.core.async :as async :refer [chan put!]]))
#?(:clj (:import [java.net URI]
[java.net.http HttpClient HttpRequest HttpRequest$BodyPublishers
HttpResponse$BodyHandlers])))
;; ---------------------------------------------------------------------------
;; HTTP helpers
;; ---------------------------------------------------------------------------
#?(:clj
(def ^:private http-client (HttpClient/newHttpClient)))
#?(:clj
(defn- http-request
"Make an HTTP request with Transit encoding (JVM).
Returns {:ok bool :body decoded-value :status int}."
[{:keys [url method body]}]
(try
(let [builder (-> (HttpRequest/newBuilder)
(.uri (URI. url))
(.header "Content-Type" "application/transit+json")
(.header "Accept" "application/transit+json"))
req (case method
:get (.build (.GET builder))
:post (.build (.POST builder
(HttpRequest$BodyPublishers/ofString
(transit/encode body)))))
resp (.send http-client req (HttpResponse$BodyHandlers/ofString))]
(if (<= 200 (.statusCode resp) 299)
{:ok true :body (transit/decode (.body resp))}
{:ok false :status (.statusCode resp) :error (.body resp)}))
(catch Exception e
{:ok false :status 0 :error (str e)}))))
#?(:cljs
(defn- fetch-transit
"Make an HTTP request with Transit encoding (browser).
Returns a channel yielding {:ok bool :body decoded :status int}."
[{:keys [url method body]}]
(let [ch (chan 1)
opts (clj->js
(cond-> {:method (or method "GET")
:headers {"Content-Type" "application/transit+json"
"Accept" "application/transit+json"}}
body (assoc :body (transit/encode body))))]
(-> (js/fetch url opts)
(.then (fn [resp]
(-> (.text resp)
(.then (fn [text]
(if (.-ok resp)
(put! ch {:ok true :body (transit/decode text)})
(put! ch {:ok false
:status (.-status resp)
:error text}))
(async/close! ch))))))
(.catch (fn [err]
(put! ch {:ok false :status 0 :error (str err)})
(async/close! ch))))
ch)))
;; ---------------------------------------------------------------------------
;; Pull
;; ---------------------------------------------------------------------------
(defn pull!
"Pull documents from server updated since `since` for `group`.
Returns a channel yielding {:ok true :docs [...]} or {:ok false :error str}."
[{:keys [server]} group since]
(let [url (str server "?group=" #?(:clj (java.net.URLEncoder/encode group "UTF-8")
:cljs (js/encodeURIComponent group))
"&since=" since)]
#?(:clj
(async/thread
(let [result (http-request {:url url :method :get})]
(if (:ok result)
{:ok true :docs (:body result)}
result)))
:cljs
(let [ch (chan 1)]
(async/go
(let [result (async/<! (fetch-transit {:url url :method "GET"}))]
(if (:ok result)
(put! ch {:ok true :docs (:body result)})
(put! ch result))
(async/close! ch)))
ch))))
;; ---------------------------------------------------------------------------
;; Push
;; ---------------------------------------------------------------------------
(defn push!
"Push a batch of documents to the server.
Returns a channel yielding {:ok true :results [...]} or {:ok false :error str}."
[{:keys [server]} docs]
#?(:clj
(async/thread
(let [result (http-request {:url server :method :post :body docs})]
(if (:ok result)
{:ok true :results (:body result)}
result)))
:cljs
(let [ch (chan 1)]
(async/go
(let [result (async/<! (fetch-transit {:url server :method "POST" :body docs}))]
(if (:ok result)
(put! ch {:ok true :results (:body result)})
(put! ch result))
(async/close! ch)))
ch)))
;; ---------------------------------------------------------------------------
;; SSE — live change notifications (CLJS only)
;; ---------------------------------------------------------------------------
#?(:cljs
(defn listen-events
"Open an SSE connection to /events?group=G. Calls `on-change` when the
server signals new data. Returns a cleanup function."
[{:keys [server]} group on-change]
(let [base-url (-> server
(str/replace #"/sync$" "")
(str/replace #"/$" ""))
url (str base-url "/events?group=" (js/encodeURIComponent group))
es (js/EventSource. url)]
(set! (.-onmessage es)
(fn [e]
(on-change (.-data e))))
(set! (.-onerror es)
(fn [_e] nil))
(fn []
(.close es)))))
;; ---------------------------------------------------------------------------
;; Online detection
;; ---------------------------------------------------------------------------
(defn online? []
#?(:clj true
:cljs (.-onLine js/navigator)))
(defn on-connectivity-change
"Register callbacks for online/offline events. Returns a cleanup fn.
On JVM, this is a no-op (always online)."
[on-online on-offline]
#?(:clj (fn [])
:cljs (let [online-handler (fn [_] (on-online))
offline-handler (fn [_] (on-offline))]
(.addEventListener js/window "online" online-handler)
(.addEventListener js/window "offline" offline-handler)
(fn []
(.removeEventListener js/window "online" online-handler)
(.removeEventListener js/window "offline" offline-handler)))))

View File

@@ -1,137 +0,0 @@
(ns pocketbook.sync
"HTTP sync client — pull and push documents to/from the Pocketbook server."
(:require [cognitect.transit :as t]
[clojure.string :as str]
[cljs.core.async :as async :refer [chan put!]]))
;; ---------------------------------------------------------------------------
;; Transit over HTTP
;; ---------------------------------------------------------------------------
(def ^:private writer (t/writer :json))
(def ^:private reader (t/reader :json))
(defn- encode [v]
(t/write writer v))
(defn- decode [s]
(when (and s (not= s ""))
(t/read reader s)))
;; ---------------------------------------------------------------------------
;; HTTP helpers
;; ---------------------------------------------------------------------------
(defn- fetch-transit
"Make an HTTP request with Transit encoding. Returns a channel
yielding {:ok true :body <decoded>} or {:ok false :status N :error str}."
[{:keys [url method body headers]}]
(let [ch (chan 1)
opts (clj->js
(cond-> {:method (or method "GET")
:headers (merge {"Content-Type" "application/transit+json"
"Accept" "application/transit+json"}
headers)}
body (assoc :body (encode body))))]
(-> (js/fetch url opts)
(.then (fn [resp]
(-> (.text resp)
(.then (fn [text]
(if (.-ok resp)
(put! ch {:ok true :body (decode text)})
(put! ch {:ok false
:status (.-status resp)
:error text}))
(async/close! ch))))))
(.catch (fn [err]
(put! ch {:ok false :status 0 :error (str err)})
(async/close! ch))))
ch))
;; ---------------------------------------------------------------------------
;; Pull
;; ---------------------------------------------------------------------------
(defn pull!
"Pull documents from server updated since `since` for `group`.
Returns a channel yielding {:ok true :docs [...]} or {:ok false :error str}."
[{:keys [server]} group since]
(let [ch (chan 1)
url (str server "?group=" (js/encodeURIComponent group)
"&since=" since)]
(async/go
(let [result (async/<! (fetch-transit
{:url url
:method "GET"}))]
(if (:ok result)
(put! ch {:ok true :docs (:body result)})
(put! ch result))
(async/close! ch)))
ch))
;; ---------------------------------------------------------------------------
;; Push
;; ---------------------------------------------------------------------------
(defn push!
"Push a batch of documents to the server.
Each doc: {:id str :value any :base-version int} or {:id str :deleted true :base-version int}.
Returns a channel yielding {:ok true :results [...]} or {:ok false :error str}."
[{:keys [server]} docs]
(let [ch (chan 1)]
(async/go
(let [result (async/<! (fetch-transit
{:url server
:method "POST"
:body docs}))]
(if (:ok result)
(put! ch {:ok true :results (:body result)})
(put! ch result))
(async/close! ch)))
ch))
;; ---------------------------------------------------------------------------
;; SSE — live change notifications
;; ---------------------------------------------------------------------------
(defn listen-events
"Open an SSE connection to /events?group=G. Calls `on-change` when the
server signals new data. Returns a cleanup function."
[{:keys [server]} group on-change]
(let [base-url (-> server
(str/replace #"/sync$" "")
(str/replace #"/$" ""))
url (str base-url "/events?group=" (js/encodeURIComponent group))
es (js/EventSource. url)]
(set! (.-onmessage es)
(fn [e]
(let [data (.-data e)]
;; "connected" fires on initial connect AND every reconnect
;; Always trigger a sync — picks up missed changes after reconnect
(on-change data))))
(set! (.-onerror es)
(fn [_e]
;; EventSource auto-reconnects; nothing to do
nil))
;; Return cleanup
(fn []
(.close es))))
;; ---------------------------------------------------------------------------
;; Online detection
;; ---------------------------------------------------------------------------
(defn online? []
(.-onLine js/navigator))
(defn on-connectivity-change
"Register callbacks for online/offline events. Returns a cleanup fn."
[on-online on-offline]
(let [online-handler (fn [_] (on-online))
offline-handler (fn [_] (on-offline))]
(.addEventListener js/window "online" online-handler)
(.addEventListener js/window "offline" offline-handler)
;; Return cleanup function
(fn []
(.removeEventListener js/window "online" online-handler)
(.removeEventListener js/window "offline" offline-handler))))

View File

@@ -1,32 +0,0 @@
(ns pocketbook.transit
"Transit encoding/decoding helpers for the HTTP wire format."
(:require [cognitect.transit :as t])
(:import [java.io ByteArrayInputStream ByteArrayOutputStream]))
(defn encode
"Encode a Clojure value to a Transit+JSON byte array."
[v]
(let [out (ByteArrayOutputStream. 4096)
w (t/writer out :json)]
(t/write w v)
(.toByteArray out)))
(defn encode-str
"Encode a Clojure value to a Transit+JSON string."
[v]
(let [out (ByteArrayOutputStream. 4096)
w (t/writer out :json)]
(t/write w v)
(.toString out "UTF-8")))
(defn decode
"Decode a Transit+JSON byte array or input stream to a Clojure value."
[input]
(let [in (cond
(instance? ByteArrayInputStream input) input
(instance? java.io.InputStream input) input
(bytes? input) (ByteArrayInputStream. input)
(string? input) (ByteArrayInputStream. (.getBytes ^String input "UTF-8"))
:else (throw (ex-info "Cannot decode, unsupported input type"
{:type (type input)})))]
(t/read (t/reader in :json))))

View File

@@ -0,0 +1,31 @@
(ns pocketbook.transit
"Transit encoding/decoding helpers for the HTTP wire format."
(:require [cognitect.transit :as t])
#?(:clj (:import [java.io ByteArrayInputStream ByteArrayOutputStream])))
(defn encode
"Encode a Clojure value to a Transit+JSON string."
[v]
#?(:clj (let [out (ByteArrayOutputStream. 4096)
w (t/writer out :json)]
(t/write w v)
(.toString out "UTF-8"))
:cljs (t/write (t/writer :json) v)))
(defn decode
"Decode Transit+JSON to a Clojure value.
Accepts a string on both platforms; on CLJ also accepts byte arrays and InputStreams."
[input]
#?(:clj (cond
(nil? input) nil
(string? input)
(when (not= input "")
(t/read (t/reader (ByteArrayInputStream. (.getBytes ^String input "UTF-8")) :json)))
(instance? java.io.InputStream input)
(t/read (t/reader input :json))
(bytes? input)
(t/read (t/reader (ByteArrayInputStream. input) :json))
:else (throw (ex-info "Cannot decode, unsupported input type"
{:type (type input)})))
:cljs (when (and input (not= input ""))
(t/read (t/reader :json) input))))

View File

@@ -0,0 +1,185 @@
(ns pocketbook.core-test
(:require [clojure.test :refer [deftest is testing use-fixtures]]
[clojure.core.async :as async :refer [<!! go <! timeout]]
[pocketbook.core :as pb]
[pocketbook.store :as store]
[pocketbook.store.memory :as memory]
[pocketbook.server :as server])
(:import [java.io File]))
;; ---------------------------------------------------------------------------
;; Fixtures — start a real server for sync tests
;; ---------------------------------------------------------------------------
(def ^:dynamic *port* nil)
(def ^:dynamic *server* nil)
(defn- free-port []
(with-open [s (java.net.ServerSocket. 0)]
(.getLocalPort s)))
(use-fixtures :each
(fn [f]
(let [port (free-port)
db-path (str (File/createTempFile "pocketbook-core-test" ".db"))
srv (server/start! {:port port :db-path db-path})]
(Thread/sleep 200)
(try
(binding [*server* srv *port* port]
(f))
(finally
(server/stop! srv)
(.delete (File. db-path)))))))
(defn- server-url []
(str "http://localhost:" *port* "/sync"))
;; ---------------------------------------------------------------------------
;; Helpers
;; ---------------------------------------------------------------------------
(defn- <!!timeout
"Take from channel with timeout. Returns nil on timeout."
[ch ms]
(let [[v _] (<!! (go (async/alts! [ch (timeout ms)])))]
v))
(defn- wait-synced
"Wait until the synced atom has no pending changes."
[sa ms]
(let [deadline (+ (System/currentTimeMillis) ms)]
(loop []
(when (and (pos? (pb/pending-count sa))
(< (System/currentTimeMillis) deadline))
(Thread/sleep 50)
(recur)))))
;; ---------------------------------------------------------------------------
;; Tests
;; ---------------------------------------------------------------------------
(deftest synced-atom-local-only
(testing "SyncedAtom works without a server (local store only)"
(let [store (memory/create)
sa (pb/synced-atom store "todo")]
(<!!timeout (pb/ready? sa) 1000)
(is (= {} @sa))
(swap! sa assoc "todo:1" {:text "Buy milk"})
(is (= {:text "Buy milk"} (get @sa "todo:1")))
(swap! sa assoc "todo:2" {:text "Walk dog"})
(is (= 2 (count @sa)))
(swap! sa dissoc "todo:1")
(is (= 1 (count @sa)))
(is (nil? (get @sa "todo:1")))
(pb/destroy! sa))))
(deftest synced-atom-persists-to-store
(testing "Changes are persisted to the store"
(let [store (memory/create)
sa (pb/synced-atom store "todo")]
(<!!timeout (pb/ready? sa) 1000)
(swap! sa assoc "todo:1" {:text "Buy milk"})
(Thread/sleep 50) ;; let async store write complete
;; Read from store directly
(let [docs (<!!timeout (store/docs-by-prefix store "todo:") 1000)]
(is (= 1 (count docs)))
(is (= "todo:1" (:id (first docs))))
(is (= {:text "Buy milk"} (:value (first docs)))))
(pb/destroy! sa))))
(deftest synced-atom-loads-from-store
(testing "SyncedAtom loads existing data from store on creation"
(let [store (memory/create)]
;; Pre-populate the store
(<!!timeout (store/put-doc! store
{:id "todo:1" :value {:text "Existing"}
:version 1 :updated 1000 :deleted false :synced true})
1000)
(let [sa (pb/synced-atom store "todo")]
(<!!timeout (pb/ready? sa) 1000)
(is (= {:text "Existing"} (get @sa "todo:1")))
(pb/destroy! sa)))))
(deftest synced-atom-watches
(testing "add-watch fires on changes"
(let [store (memory/create)
sa (pb/synced-atom store "todo")
changes (atom [])]
(<!!timeout (pb/ready? sa) 1000)
(add-watch sa :test (fn [_ _ old new]
(swap! changes conj {:old old :new new})))
(swap! sa assoc "todo:1" {:text "Hello"})
(Thread/sleep 50)
(is (= 1 (count @changes)))
(is (= {} (:old (first @changes))))
(is (= {"todo:1" {:text "Hello"}} (:new (first @changes))))
(remove-watch sa :test)
(pb/destroy! sa))))
(deftest synced-atom-push-to-server
(testing "Local changes are pushed to the server"
(let [store (memory/create)
sa (pb/synced-atom store "todo"
{:server (server-url)})]
(<!!timeout (pb/ready? sa) 2000)
(swap! sa assoc "todo:push1" {:text "Pushed!"})
(Thread/sleep 500) ;; let push complete
(is (zero? (pb/pending-count sa)))
(pb/destroy! sa))))
(deftest synced-atom-pull-from-server
(testing "Two clients sync via server"
(let [store-a (memory/create)
store-b (memory/create)
sa-a (pb/synced-atom store-a "todo"
{:server (server-url) :interval 500})
sa-b (pb/synced-atom store-b "todo"
{:server (server-url) :interval 500})]
(<!!timeout (pb/ready? sa-a) 2000)
(<!!timeout (pb/ready? sa-b) 2000)
;; Client A writes
(swap! sa-a assoc "todo:sync1" {:text "From A"})
(Thread/sleep 500) ;; let A push
;; Trigger a sync on B
(<!!timeout (pb/sync-now! sa-b) 2000)
;; B should have A's data
(is (= {:text "From A"} (get @sa-b "todo:sync1")))
(pb/destroy! sa-a)
(pb/destroy! sa-b))))
(deftest synced-atom-deref-swap-reset
(testing "Standard atom operations work"
(let [store (memory/create)
sa (pb/synced-atom store "note")]
(<!!timeout (pb/ready? sa) 1000)
;; reset!
(reset! sa {"note:1" {:body "Hello"}})
(is (= {"note:1" {:body "Hello"}} @sa))
;; swap! with multiple args
(swap! sa assoc "note:2" {:body "World"})
(is (= 2 (count @sa)))
(swap! sa update "note:1" assoc :edited true)
(is (true? (:edited (get @sa "note:1"))))
(pb/destroy! sa))))

View File

@@ -41,19 +41,19 @@
(.header "Accept" "application/transit+json")
(.GET)
(.build))
resp (.send client req (HttpResponse$BodyHandlers/ofByteArray))]
resp (.send client req (HttpResponse$BodyHandlers/ofString))]
{:status (.statusCode resp)
:body (t/decode (.body resp))}))
(defn- post-transit [path body]
(let [bytes (t/encode body)
(let [encoded (t/encode body)
req (-> (HttpRequest/newBuilder)
(.uri (URI. (url path)))
(.header "Content-Type" "application/transit+json")
(.header "Accept" "application/transit+json")
(.POST (HttpRequest$BodyPublishers/ofByteArray bytes))
(.POST (HttpRequest$BodyPublishers/ofString encoded))
(.build))
resp (.send client req (HttpResponse$BodyHandlers/ofByteArray))]
resp (.send client req (HttpResponse$BodyHandlers/ofString))]
{:status (.statusCode resp)
:body (t/decode (.body resp))}))

View File

@@ -30,8 +30,8 @@
:version 5}]]
(is (= data (t/decode (t/encode data))))))
(deftest encode-str-roundtrip
(deftest encode-returns-string
(let [v {:hello "world" :nums [1 2 3]}
s (t/encode-str v)]
s (t/encode v)]
(is (string? s))
(is (= v (t/decode s)))))