refactor: replace core.async with promesa for all async operations

- Store protocol now returns promesa promises instead of core.async channels
- MemoryStore: `(p/resolved val)` replaces chan+put!+close! ceremony
- IDBStore: `p/create` with resolve/reject wraps IDB callbacks
- sync.cljc: CLJ uses `p/vthread`, CLJS returns native Promise chains
- core.cljc: `p/let` replaces go blocks, timer-based sync loop replaces
  go-loop+alts!, debounced push replaces kick channel
- Tests use `deref` with timeout on promesa promises
- Todomvc example uses `p/let` instead of go/<!
This commit is contained in:
Florian Schroedl
2026-04-16 20:05:08 +02:00
parent fffd934262
commit 973b079ae3
9 changed files with 360 additions and 321 deletions

View File

@@ -1,6 +1,6 @@
{:paths ["src"] {:paths ["src"]
:deps {org.clojure/clojure {:mvn/version "1.12.0"} :deps {org.clojure/clojure {:mvn/version "1.12.0"}
org.clojure/core.async {:mvn/version "1.7.701"} funcool/promesa {:mvn/version "11.0.678"}
http-kit/http-kit {:mvn/version "2.8.0"} http-kit/http-kit {:mvn/version "2.8.0"}
com.cognitect/transit-clj {:mvn/version "1.0.333"} com.cognitect/transit-clj {:mvn/version "1.0.333"}
com.taoensso/nippy {:mvn/version "3.4.2"} com.taoensso/nippy {:mvn/version "3.4.2"}
@@ -18,12 +18,10 @@
;; ClojureScript client build ;; ClojureScript client build
:cljs {:extra-paths ["example/todomvc"] :cljs {:extra-paths ["example/todomvc"]
:extra-deps {org.clojure/clojurescript {:mvn/version "1.11.132"} :extra-deps {org.clojure/clojurescript {:mvn/version "1.11.132"}
com.cognitect/transit-cljs {:mvn/version "0.8.280"} com.cognitect/transit-cljs {:mvn/version "0.8.280"}}
org.clojure/core.async {:mvn/version "1.7.701"}}
:main-opts ["-m" "cljs.main" "-co" "build.edn" "-c"]} :main-opts ["-m" "cljs.main" "-co" "build.edn" "-c"]}
:cljs-dev {:extra-paths ["example/todomvc"] :cljs-dev {:extra-paths ["example/todomvc"]
:extra-deps {org.clojure/clojurescript {:mvn/version "1.11.132"} :extra-deps {org.clojure/clojurescript {:mvn/version "1.11.132"}
com.cognitect/transit-cljs {:mvn/version "0.8.280"} com.cognitect/transit-cljs {:mvn/version "0.8.280"}}
org.clojure/core.async {:mvn/version "1.7.701"}}
:main-opts ["-m" "cljs.main" "-co" "build.edn" "-w" "src:example/todomvc/pocketbook" "-c"]}}} :main-opts ["-m" "cljs.main" "-co" "build.edn" "-w" "src:example/todomvc/pocketbook" "-c"]}}}

View File

@@ -3,7 +3,7 @@
(:require [pocketbook.core :as pb] (:require [pocketbook.core :as pb]
[pocketbook.store.idb :as idb] [pocketbook.store.idb :as idb]
[pocketbook.hiccup :refer [html]] [pocketbook.hiccup :refer [html]]
[cljs.core.async :refer [go <!]] [promesa.core :as p]
[clojure.string :as str])) [clojure.string :as str]))
;; --------------------------------------------------------------------------- ;; ---------------------------------------------------------------------------
@@ -232,9 +232,8 @@
;; --------------------------------------------------------------------------- ;; ---------------------------------------------------------------------------
(defn ^:export init [] (defn ^:export init []
(go (p/let [store (idb/open "pocketbook-todomvc")]
(let [store (<! (idb/open "pocketbook-todomvc")) (let [todos (pb/synced-atom store "todo"
todos (pb/synced-atom store "todo"
{:server "http://localhost:8090/sync" {:server "http://localhost:8090/sync"
:interval 15000})] :interval 15000})]
(reset! !conn store) (reset! !conn store)
@@ -248,7 +247,7 @@
(.addEventListener js/window "online" (fn [_] (render!))) (.addEventListener js/window "online" (fn [_] (render!)))
(.addEventListener js/window "offline" (fn [_] (render!))) (.addEventListener js/window "offline" (fn [_] (render!)))
;; Wait for IDB — watch triggers render automatically ;; Wait for IDB — watch triggers render automatically
(<! (pb/ready? todos)) (p/let [_ (pb/ready? todos)]
(js/console.log "🔶 Pocketbook TodoMVC loaded —" (count @todos) "todos")))) (js/console.log "🔶 Pocketbook TodoMVC loaded —" (count @todos) "todos")))))
(init) (init)

View File

@@ -2,18 +2,18 @@
"Pocketbook: a Clojure-native synced atom. "Pocketbook: a Clojure-native synced atom.
Usage: Usage:
(def store (<! (idb/open \"my-app\"))) ;; or (memory/create) (def store @(idb/open \"my-app\")) ;; or (memory/create)
(def todos (pb/synced-atom store \"todo\" (def todos (pb/synced-atom store \"todo\"
{:server \"http://localhost:8090/sync\"})) {:server \"http://localhost:8090/sync\"}))
(go (<! (ready? todos)) (.then (ready? todos)
(swap! todos assoc \"todo:1\" {:text \"Buy milk\"})) (fn [_] (swap! todos assoc \"todo:1\" {:text \"Buy milk\"})))
@todos ;=> {\"todo:1\" {:text \"Buy milk\"}} @todos ;=> {\"todo:1\" {:text \"Buy milk\"}}
" "
(:require [pocketbook.store :as store] (:require [pocketbook.store :as store]
[pocketbook.sync :as sync] [pocketbook.sync :as sync]
[clojure.string :as str] [clojure.string :as str]
#?(:clj [clojure.core.async :as async :refer [go go-loop <! >! chan put! timeout alts!]] [promesa.core :as p])
:cljs [cljs.core.async :as async :refer [go go-loop <! >! chan put! timeout alts!]]))) #?(:clj (:import [java.util.concurrent Executors ScheduledExecutorService TimeUnit])))
;; --------------------------------------------------------------------------- ;; ---------------------------------------------------------------------------
;; Internal helpers ;; Internal helpers
@@ -26,13 +26,19 @@
(defn- prefix-str [group] (defn- prefix-str [group]
(str group ":")) (str group ":"))
;; ---------------------------------------------------------------------------
;; Forward declarations
;; ---------------------------------------------------------------------------
(declare schedule-push!)
;; --------------------------------------------------------------------------- ;; ---------------------------------------------------------------------------
;; Shared reset logic ;; Shared reset logic
;; --------------------------------------------------------------------------- ;; ---------------------------------------------------------------------------
(defn- do-reset!* (defn- do-reset!*
"Shared reset implementation. Updates cache, tracks pending, writes to store, kicks sync." "Shared reset implementation. Updates cache, tracks pending, writes to store, kicks sync."
[store cache versions pending kick-ch new-val] [store cache versions pending sa new-val]
(let [old @cache] (let [old @cache]
(reset! cache new-val) (reset! cache new-val)
(let [all-keys (into (set (keys old)) (keys new-val)) (let [all-keys (into (set (keys old)) (keys new-val))
@@ -50,7 +56,7 @@
{:id k :value v :version (get @versions k 0) {:id k :value v :version (get @versions k 0)
:updated (now-ms) :deleted false :synced false}))))) :updated (now-ms) :deleted false :synced false})))))
(when @changed? (when @changed?
(put! kick-ch :kick))) (schedule-push! sa)))
new-val)) new-val))
;; --------------------------------------------------------------------------- ;; ---------------------------------------------------------------------------
@@ -64,9 +70,11 @@
pending ;; atom containing #{id} — unsynced ids pending ;; atom containing #{id} — unsynced ids
server-opts ;; {:server url} or nil server-opts ;; {:server url} or nil
last-sync ;; atom containing epoch ms last-sync ;; atom containing epoch ms
ready-ch ;; channel, closed when initial load complete ready-pr ;; promesa deferred, resolved when initial load complete
stop-ch ;; channel to signal stop sync-timer ;; atom holding timer reference (for cleanup)
kick-ch ;; channel to trigger immediate push push-timer ;; atom holding debounce timer reference
pushing? ;; atom boolean — guard against overlapping pushes
syncing? ;; atom boolean — guard against overlapping syncs
cleanup-fn ;; atom holding connectivity cleanup fn cleanup-fn ;; atom holding connectivity cleanup fn
sync-interval ;; ms sync-interval ;; ms
_meta] ;; metadata atom _meta] ;; metadata atom
@@ -83,8 +91,8 @@
(setValidator [_ _vf] nil) (setValidator [_ _vf] nil)
clojure.lang.IAtom clojure.lang.IAtom
(reset [_ newval] (reset [this newval]
(do-reset!* store cache versions pending kick-ch newval)) (do-reset!* store cache versions pending this newval))
(swap [this f] (swap [this f]
(.reset this (f @cache))) (.reset this (f @cache)))
(swap [this f arg] (swap [this f arg]
@@ -105,8 +113,8 @@
(-deref [_] @cache) (-deref [_] @cache)
IReset IReset
(-reset! [_ new-val] (-reset! [this new-val]
(do-reset!* store cache versions pending kick-ch new-val)) (do-reset!* store cache versions pending this new-val))
ISwap ISwap
(-swap! [o f] (-swap! [o f]
@@ -142,31 +150,28 @@
(defn- load-from-store! (defn- load-from-store!
"Load all docs for the group from the store into the atom. "Load all docs for the group from the store into the atom.
Returns a channel that closes when done." Returns a promise that resolves when done."
[sa] [sa]
(let [ch (chan 1)] (p/let [prefix (prefix-str (.-group sa))
(go docs (store/docs-by-prefix (.-store sa) prefix)
(let [prefix (prefix-str (.-group sa)) state (into {}
docs (<! (store/docs-by-prefix (.-store sa) prefix)) (comp
state (into {} (remove :deleted)
(comp (map (fn [d] [(:id d) (:value d)])))
(remove :deleted) docs)
(map (fn [d] [(:id d) (:value d)]))) vers (into {}
docs) (map (fn [d] [(:id d) (:version d)]))
vers (into {} docs)
(map (fn [d] [(:id d) (:version d)])) _ (do (reset! (.-cache sa) state)
docs)] (reset! (.-versions sa) vers))
(reset! (.-cache sa) state) ls (store/get-meta (.-store sa)
(reset! (.-versions sa) vers) (str "last-sync:" (.-group sa)))]
(let [ls (<! (store/get-meta (.-store sa) (reset! (.-last_sync sa) (or ls 0))
(str "last-sync:" (.-group sa))))] true))
(reset! (.-last_sync sa) (or ls 0)))
(put! ch true)
(async/close! ch)))
ch))
(defn- mark-synced! (defn- mark-synced!
"Mark a doc as synced in the store and update its version." "Mark a doc as synced in the store and update its version.
Returns a promise."
[sa id version] [sa id version]
(swap! (.-versions sa) assoc id version) (swap! (.-versions sa) assoc id version)
(swap! (.-pending sa) disj id) (swap! (.-pending sa) disj id)
@@ -184,117 +189,164 @@
;; --------------------------------------------------------------------------- ;; ---------------------------------------------------------------------------
(defn- do-pull! (defn- do-pull!
"Pull changes from server, merge into atom + store." "Pull changes from server, merge into atom + store. Returns a promise."
[sa] [sa]
(go (if-let [opts (.-server_opts sa)]
(when-let [opts (.-server_opts sa)] (p/let [since @(.-last_sync sa)
(let [since @(.-last_sync sa) result (sync/pull! opts (.-group sa) since)]
result (<! (sync/pull! opts (.-group sa) since))] (when (:ok result)
(when (:ok result) (let [docs (:docs result)
(let [docs (:docs result) max-ts (reduce max @(.-last_sync sa)
max-ts (reduce max @(.-last_sync sa) (map :updated docs))]
(map :updated docs))] (p/let [_ (p/all
(doseq [doc docs] (for [doc docs
(let [id (:id doc)] :let [id (:id doc)]
(when (> (:version doc) (get @(.-versions sa) id 0)) :when (> (:version doc) (get @(.-versions sa) id 0))]
(if (:deleted doc) (if (:deleted doc)
(do (do
(swap! (.-cache sa) dissoc id) (swap! (.-cache sa) dissoc id)
(swap! (.-versions sa) assoc id (:version doc)) (swap! (.-versions sa) assoc id (:version doc))
(store/put-doc! (.-store sa) (store/put-doc! (.-store sa)
{:id id :value nil :version (:version doc) {:id id :value nil :version (:version doc)
:updated (:updated doc) :deleted true :synced true})) :updated (:updated doc) :deleted true :synced true}))
(do (do
(swap! (.-cache sa) assoc id (:value doc)) (swap! (.-cache sa) assoc id (:value doc))
(swap! (.-versions sa) assoc id (:version doc)) (swap! (.-versions sa) assoc id (:version doc))
(store/put-doc! (.-store sa) (store/put-doc! (.-store sa)
{:id id :value (:value doc) :version (:version doc) {:id id :value (:value doc) :version (:version doc)
:updated (:updated doc) :deleted false :synced true})))))) :updated (:updated doc) :deleted false :synced true})))))]
(reset! (.-last_sync sa) max-ts) (reset! (.-last_sync sa) max-ts)
(store/set-meta! (.-store sa) (store/set-meta! (.-store sa)
(str "last-sync:" (.-group sa)) max-ts)) (str "last-sync:" (.-group sa)) max-ts)))))
true))))) (p/resolved nil)))
(defn- do-push! (defn- do-push!
"Push all unsynced local docs to the server." "Push all unsynced local docs to the server. Returns a promise."
[sa] [sa]
(go (if-let [opts (.-server_opts sa)]
(when-let [opts (.-server_opts sa)] (let [pending-ids @(.-pending sa)]
(let [pending-ids @(.-pending sa)] (if (seq pending-ids)
(when (seq pending-ids) (let [docs (mapv (fn [id]
(let [docs (mapv (fn [id] (let [v (get @(.-cache sa) id)]
(let [v (get @(.-cache sa) id)] (if (nil? v)
(if (nil? v) {:id id :deleted true
{:id id :deleted true :base-version (get @(.-versions sa) id 0)}
:base-version (get @(.-versions sa) id 0)} {:id id :value v
{:id id :value v :base-version (get @(.-versions sa) id 0)})))
:base-version (get @(.-versions sa) id 0)}))) pending-ids)]
pending-ids) (p/let [result (sync/push! opts docs)]
result (<! (sync/push! opts docs))]
(when (:ok result) (when (:ok result)
(doseq [r (:results result)] (p/all
(case (:status r) (for [r (:results result)]
:ok (case (:status r)
(<! (mark-synced! sa (:id r) (:version r))) :ok
(mark-synced! sa (:id r) (:version r))
:conflict :conflict
(do (do
(when (:value r) (when (:value r)
(swap! (.-cache sa) assoc (:id r) (:value r))) (swap! (.-cache sa) assoc (:id r) (:value r)))
(swap! (.-versions sa) assoc (:id r) (:current-version r)) (swap! (.-versions sa) assoc (:id r) (:current-version r))
(swap! (.-pending sa) disj (:id r)) (swap! (.-pending sa) disj (:id r))
(store/put-doc! (.-store sa) (store/put-doc! (.-store sa)
{:id (:id r) {:id (:id r)
:value (or (:value r) (get @(.-cache sa) (:id r))) :value (or (:value r) (get @(.-cache sa) (:id r)))
:version (:current-version r) :version (:current-version r)
:updated (now-ms) :updated (now-ms)
:deleted false :deleted false
:synced true})) :synced true}))
;; Unknown status ;; Unknown status
#?(:clj (println "Unknown push result:" (pr-str r)) (do #?(:clj (println "Unknown push result:" (pr-str r))
:cljs (js/console.warn "Unknown push result:" (pr-str r))))) :cljs (js/console.warn "Unknown push result:" (pr-str r)))
true))))))) (p/resolved nil))))))))
(p/resolved nil)))
(p/resolved nil)))
(defn- do-sync! (defn- do-sync!
"Run a full sync cycle: pull then push." "Run a full sync cycle: pull then push. Returns a promise."
[sa] [sa]
(go (if (sync/online?)
(when (sync/online?) (p/let [_ (do-pull! sa)]
(<! (do-pull! sa)) (do-push! sa))
(<! (do-push! sa))))) (p/resolved nil)))
;; --------------------------------------------------------------------------- ;; ---------------------------------------------------------------------------
;; Sync loop ;; Push scheduling (replaces kick channel)
;; ---------------------------------------------------------------------------
(defn- schedule-push!
"Schedule a push after a short debounce. Coalesces rapid changes.
If a push is in flight, another will follow after it completes."
[sa]
(when-let [t @(.-push_timer sa)]
#?(:clj (future-cancel t)
:cljs (js/clearTimeout t)))
(reset! (.-push_timer sa)
#?(:clj (future
(Thread/sleep 50)
(when-not @(.-pushing? sa)
(reset! (.-pushing? sa) true)
(-> (do-push! sa)
(p/finally
(fn [_ _]
(reset! (.-pushing? sa) false)
(when (seq @(.-pending sa))
(schedule-push! sa)))))))
:cljs (js/setTimeout
(fn []
(when-not @(.-pushing? sa)
(reset! (.-pushing? sa) true)
(-> (do-push! sa)
(p/finally
(fn [_ _]
(reset! (.-pushing? sa) false)
(when (seq @(.-pending sa))
(schedule-push! sa)))))))
50))))
;; ---------------------------------------------------------------------------
;; Sync loop (timer-based)
;; --------------------------------------------------------------------------- ;; ---------------------------------------------------------------------------
(defn- start-sync-loop! (defn- start-sync-loop!
"Start the background sync loop." "Start the background sync loop using platform timers."
[sa] [sa]
(let [stop-ch (.-stop_ch sa) (let [interval (.-sync_interval sa)
kick-ch (.-kick_ch sa) cleanups (atom [])
interval (.-sync_interval sa) sync-fn (fn []
cleanups (atom [])] (when-not @(.-syncing? sa)
;; Periodic sync (fallback) + immediate push on kick (reset! (.-syncing? sa) true)
(go-loop [] (-> (do-sync! sa)
(let [[_ ch] (alts! [stop-ch kick-ch (timeout interval)])] (p/finally
(when-not (= ch stop-ch) (fn [_ _]
(if (= ch kick-ch) (reset! (.-syncing? sa) false))))))]
(<! (do-push! sa)) ;; Periodic sync
(<! (do-sync! sa))) #?(:clj
(recur)))) (let [^ScheduledExecutorService exec (Executors/newSingleThreadScheduledExecutor)]
(.scheduleAtFixedRate exec ^Runnable sync-fn
(long interval) (long interval) TimeUnit/MILLISECONDS)
(reset! (.-sync_timer sa) exec)
(swap! cleanups conj (fn [] (.shutdown exec))))
:cljs
(let [timer-id (js/setInterval sync-fn interval)]
(reset! (.-sync_timer sa) timer-id)
(swap! cleanups conj (fn [] (js/clearInterval timer-id)))))
;; Online/offline handler ;; Online/offline handler
(swap! cleanups conj (swap! cleanups conj
(sync/on-connectivity-change (sync/on-connectivity-change
(fn [] (go (<! (do-sync! sa)))) (fn [] (do-sync! sa))
(fn [] nil))) (fn [] nil)))
;; SSE — live pull on server push (CLJS only) ;; SSE — live pull on server push (CLJS only)
#?(:cljs #?(:cljs
(when-let [opts (.-server_opts sa)] (when-let [opts (.-server_opts sa)]
(swap! cleanups conj (swap! cleanups conj
(sync/listen-events opts (.-group sa) (sync/listen-events opts (.-group sa)
(fn [_group] (fn [_group]
(go (<! (do-pull! sa)))))))) (do-pull! sa))))))
(reset! (.-cleanup_fn sa) @cleanups))) (reset! (.-cleanup_fn sa) @cleanups)))
;; --------------------------------------------------------------------------- ;; ---------------------------------------------------------------------------
@@ -315,30 +367,33 @@
pending (atom #{}) pending (atom #{})
server-opts (when server {:server server}) server-opts (when server {:server server})
last-sync (atom 0) last-sync (atom 0)
ready-ch (chan 1) ready-pr (p/deferred)
stop-ch (chan 1) sync-timer (atom nil)
kick-ch (chan (async/sliding-buffer 1)) push-timer (atom nil)
pushing? (atom false)
syncing? (atom false)
cleanup-fn (atom nil) cleanup-fn (atom nil)
meta-atom (atom nil) meta-atom (atom nil)
sa (SyncedAtom. group store cache-atom versions pending sa (SyncedAtom. group store cache-atom versions pending
server-opts last-sync ready-ch stop-ch kick-ch server-opts last-sync ready-pr sync-timer push-timer
cleanup-fn interval meta-atom)] pushing? syncing? cleanup-fn interval meta-atom)]
(go (-> (load-from-store! sa)
(<! (load-from-store! sa)) (p/then (fn [_]
(put! ready-ch true) (p/resolve! ready-pr true)
(async/close! ready-ch) (when server-opts
(when server-opts (-> (do-sync! sa)
(<! (do-sync! sa)) (p/then (fn [_] (start-sync-loop! sa)))))))
(start-sync-loop! sa))) (p/catch (fn [err]
(p/reject! ready-pr err))))
sa)) sa))
(defn ready? (defn ready?
"Returns a channel that yields true when the atom has finished loading from the store." "Returns a promise that yields true when the atom has finished loading from the store."
[sa] [sa]
(.-ready_ch sa)) (.-ready_pr sa))
(defn sync-now! (defn sync-now!
"Trigger an immediate sync cycle. Returns a channel." "Trigger an immediate sync cycle. Returns a promise."
[sa] [sa]
(do-sync! sa)) (do-sync! sa))
@@ -350,5 +405,9 @@
(defn destroy! (defn destroy!
"Stop the sync loop and clean up. Does not close the store." "Stop the sync loop and clean up. Does not close the store."
[sa] [sa]
(put! (.-stop_ch sa) :stop) ;; Cancel push debounce timer
(when-let [t @(.-push_timer sa)]
#?(:clj (future-cancel t)
:cljs (js/clearTimeout t)))
;; Run all cleanup fns (timer, connectivity, SSE)
(doseq [f @(.-cleanup_fn sa)] (f))) (doseq [f @(.-cleanup_fn sa)] (f)))

View File

@@ -1,23 +1,23 @@
(ns pocketbook.store (ns pocketbook.store
"Storage protocol for Pocketbook. "Storage protocol for Pocketbook.
All methods return core.async channels.") All methods return promesa promises.")
(defprotocol PStore (defprotocol PStore
(put-doc! [store doc] (put-doc! [store doc]
"Write a document to the store. doc is a map: "Write a document to the store. doc is a map:
{:id str, :value any, :version int, :updated int, :deleted bool, :synced bool} {:id str, :value any, :version int, :updated int, :deleted bool, :synced bool}
Returns a channel that closes on success.") Returns a promise that resolves on success.")
(docs-by-prefix [store prefix] (docs-by-prefix [store prefix]
"Get all documents whose id starts with prefix. "Get all documents whose id starts with prefix.
Returns a channel yielding a vector of doc maps.") Returns a promise yielding a vector of doc maps.")
(get-meta [store key] (get-meta [store key]
"Get a metadata value by key. "Get a metadata value by key.
Returns a channel yielding the value, or nil if not found.") Returns a promise yielding the value, or nil if not found.")
(set-meta! [store key value] (set-meta! [store key value]
"Set a metadata value. Returns a channel that closes on success.") "Set a metadata value. Returns a promise that resolves on success.")
(close-store! [store] (close-store! [store]
"Close the store and release resources.")) "Close the store and release resources."))

View File

@@ -2,7 +2,7 @@
"IndexedDB store implementing the PStore protocol." "IndexedDB store implementing the PStore protocol."
(:require [pocketbook.store :as store] (:require [pocketbook.store :as store]
[pocketbook.transit :as transit] [pocketbook.transit :as transit]
[cljs.core.async :as async :refer [chan put!]])) [promesa.core :as p]))
;; --------------------------------------------------------------------------- ;; ---------------------------------------------------------------------------
;; IDB operations ;; IDB operations
@@ -21,70 +21,71 @@
(deftype IDBStore [db] (deftype IDBStore [db]
store/PStore store/PStore
(put-doc! [_ doc] (put-doc! [_ doc]
(let [ch (chan 1) (p/create
txn (tx db "docs" :readwrite) (fn [resolve reject]
store (.objectStore txn "docs") (let [txn (tx db "docs" :readwrite)
obj #js {:id (:id doc) store (.objectStore txn "docs")
:value (transit/encode (:value doc)) obj #js {:id (:id doc)
:version (:version doc 0) :value (transit/encode (:value doc))
:updated (:updated doc 0) :version (:version doc 0)
:deleted (boolean (:deleted doc false)) :updated (:updated doc 0)
:synced (boolean (:synced doc false))} :deleted (boolean (:deleted doc false))
req (.put store obj)] :synced (boolean (:synced doc false))}
(set! (.-onsuccess req) (fn [_] (put! ch true) (async/close! ch))) req (.put store obj)]
(set! (.-onerror req) (fn [e] (js/console.error "IDB put error:" e) (async/close! ch))) (set! (.-onsuccess req) (fn [_] (resolve true)))
ch)) (set! (.-onerror req) (fn [e]
(js/console.error "IDB put error:" e)
(reject e)))))))
(docs-by-prefix [_ prefix] (docs-by-prefix [_ prefix]
(let [ch (chan 1) (p/create
txn (tx db "docs" :readonly) (fn [resolve reject]
store (.objectStore txn "docs") (let [txn (tx db "docs" :readonly)
range (.bound js/IDBKeyRange prefix (str prefix "\uffff")) store (.objectStore txn "docs")
req (.openCursor store range) range (.bound js/IDBKeyRange prefix (str prefix "\uffff"))
docs (atom [])] req (.openCursor store range)
(set! (.-onsuccess req) docs (atom [])]
(fn [e] (set! (.-onsuccess req)
(let [cursor (.-result (.-target e))] (fn [e]
(if cursor (let [cursor (.-result (.-target e))]
(let [val (.-value cursor)] (if cursor
(swap! docs conj (let [val (.-value cursor)]
{:id (.-id val) (swap! docs conj
:value (transit/decode (.-value val)) {:id (.-id val)
:version (.-version val) :value (transit/decode (.-value val))
:updated (.-updated val) :version (.-version val)
:deleted (.-deleted val) :updated (.-updated val)
:synced (.-synced val)}) :deleted (.-deleted val)
(.continue cursor)) :synced (.-synced val)})
(do (.continue cursor))
(put! ch @docs) (resolve @docs)))))
(async/close! ch)))))) (set! (.-onerror req)
(set! (.-onerror req) (fn [e]
(fn [e] (js/console.error "IDB cursor error:" e) (async/close! ch))) (js/console.error "IDB cursor error:" e)
ch)) (reject e)))))))
(get-meta [_ key] (get-meta [_ key]
(let [ch (chan 1) (p/create
txn (tx db "meta" :readonly) (fn [resolve reject]
store (.objectStore txn "meta") (let [txn (tx db "meta" :readonly)
req (.get store key)] store (.objectStore txn "meta")
(set! (.-onsuccess req) req (.get store key)]
(fn [e] (set! (.-onsuccess req)
(let [result (.-result (.-target e))] (fn [e]
(if result (let [result (.-result (.-target e))]
(do (put! ch (.-value result)) (resolve (when result (.-value result))))))
(async/close! ch)) (set! (.-onerror req)
(async/close! ch))))) (fn [_] (reject (js/Error. "IDB get-meta error"))))))))
(set! (.-onerror req) (fn [_] (async/close! ch)))
ch))
(set-meta! [_ key value] (set-meta! [_ key value]
(let [ch (chan 1) (p/create
txn (tx db "meta" :readwrite) (fn [resolve reject]
store (.objectStore txn "meta") (let [txn (tx db "meta" :readwrite)
req (.put store #js {:key key :value value})] store (.objectStore txn "meta")
(set! (.-onsuccess req) (fn [_] (put! ch true) (async/close! ch))) req (.put store #js {:key key :value value})]
(set! (.-onerror req) (fn [_] (async/close! ch))) (set! (.-onsuccess req) (fn [_] (resolve true)))
ch)) (set! (.-onerror req)
(fn [_] (reject (js/Error. "IDB set-meta error"))))))))
(close-store! [_] (close-store! [_]
(when db (.close db)))) (when db (.close db))))
@@ -94,25 +95,24 @@
;; --------------------------------------------------------------------------- ;; ---------------------------------------------------------------------------
(defn open (defn open
"Open an IndexedDB store. Returns a channel yielding the IDBStore." "Open an IndexedDB store. Returns a promise yielding the IDBStore."
[db-name] [db-name]
(let [ch (chan 1) (p/create
req (.open js/indexedDB db-name 1)] (fn [resolve reject]
(set! (.-onupgradeneeded req) (let [req (.open js/indexedDB db-name 1)]
(fn [e] (set! (.-onupgradeneeded req)
(let [db (.-result (.-target e))] (fn [e]
(when-not (.contains (.-objectStoreNames db) "docs") (let [db (.-result (.-target e))]
(let [store (.createObjectStore db "docs" #js {:keyPath "id"})] (when-not (.contains (.-objectStoreNames db) "docs")
(.createIndex store "synced" "synced" #js {:unique false}) (let [store (.createObjectStore db "docs" #js {:keyPath "id"})]
(.createIndex store "updated" "updated" #js {:unique false}))) (.createIndex store "synced" "synced" #js {:unique false})
(when-not (.contains (.-objectStoreNames db) "meta") (.createIndex store "updated" "updated" #js {:unique false})))
(.createObjectStore db "meta" #js {:keyPath "key"}))))) (when-not (.contains (.-objectStoreNames db) "meta")
(set! (.-onsuccess req) (.createObjectStore db "meta" #js {:keyPath "key"})))))
(fn [e] (set! (.-onsuccess req)
(put! ch (IDBStore. (.-result (.-target e)))) (fn [e]
(async/close! ch))) (resolve (IDBStore. (.-result (.-target e))))))
(set! (.-onerror req) (set! (.-onerror req)
(fn [e] (fn [e]
(js/console.error "IDB open error:" e) (js/console.error "IDB open error:" e)
(async/close! ch))) (reject e)))))))
ch))

View File

@@ -1,44 +1,28 @@
(ns pocketbook.store.memory (ns pocketbook.store.memory
"In-memory store backed by atoms. Useful for testing and JVM clients." "In-memory store backed by atoms. Useful for testing and JVM clients."
(:require [pocketbook.store :as store] (:require [pocketbook.store :as store]
#?(:clj [clojure.core.async :as async :refer [chan put!]] [promesa.core :as p]
:cljs [cljs.core.async :as async :refer [chan put!]])
[clojure.string :as str])) [clojure.string :as str]))
(deftype MemoryStore [docs meta-store] (deftype MemoryStore [docs meta-store]
store/PStore store/PStore
(put-doc! [_ doc] (put-doc! [_ doc]
(let [ch (chan 1)] (swap! docs assoc (:id doc) doc)
(swap! docs assoc (:id doc) doc) (p/resolved true))
(put! ch true)
(async/close! ch)
ch))
(docs-by-prefix [_ prefix] (docs-by-prefix [_ prefix]
(let [ch (chan 1) (p/resolved
matching (->> @docs (->> @docs
vals vals
(filter #(str/starts-with? (:id %) prefix)) (filter #(str/starts-with? (:id %) prefix))
vec)] vec)))
(put! ch matching)
(async/close! ch)
ch))
(get-meta [_ key] (get-meta [_ key]
(let [ch (chan 1) (p/resolved (get @meta-store key)))
v (get @meta-store key)]
(if (some? v)
(put! ch v)
nil)
(async/close! ch)
ch))
(set-meta! [_ key value] (set-meta! [_ key value]
(let [ch (chan 1)] (swap! meta-store assoc key value)
(swap! meta-store assoc key value) (p/resolved true))
(put! ch true)
(async/close! ch)
ch))
(close-store! [_] (close-store! [_]
nil)) nil))

View File

@@ -2,8 +2,7 @@
"HTTP sync client — pull and push documents to/from the Pocketbook server." "HTTP sync client — pull and push documents to/from the Pocketbook server."
(:require [pocketbook.transit :as transit] (:require [pocketbook.transit :as transit]
[clojure.string :as str] [clojure.string :as str]
#?(:clj [clojure.core.async :as async :refer [chan put!]] [promesa.core :as p])
:cljs [cljs.core.async :as async :refer [chan put!]]))
#?(:clj (:import [java.net URI] #?(:clj (:import [java.net URI]
[java.net.http HttpClient HttpRequest HttpRequest$BodyPublishers [java.net.http HttpClient HttpRequest HttpRequest$BodyPublishers
HttpResponse$BodyHandlers]))) HttpResponse$BodyHandlers])))
@@ -40,28 +39,22 @@
#?(:cljs #?(:cljs
(defn- fetch-transit (defn- fetch-transit
"Make an HTTP request with Transit encoding (browser). "Make an HTTP request with Transit encoding (browser).
Returns a channel yielding {:ok bool :body decoded :status int}." Returns a promise yielding {:ok bool :body decoded :status int}."
[{:keys [url method body]}] [{:keys [url method body]}]
(let [ch (chan 1) (-> (js/fetch url
opts (clj->js (clj->js
(cond-> {:method (or method "GET") (cond-> {:method (or method "GET")
:headers {"Content-Type" "application/transit+json" :headers {"Content-Type" "application/transit+json"
"Accept" "application/transit+json"}} "Accept" "application/transit+json"}}
body (assoc :body (transit/encode body))))] body (assoc :body (transit/encode body)))))
(-> (js/fetch url opts) (.then (fn [resp]
(.then (fn [resp] (-> (.text resp)
(-> (.text resp) (.then (fn [text]
(.then (fn [text] (if (.-ok resp)
(if (.-ok resp) {:ok true :body (transit/decode text)}
(put! ch {:ok true :body (transit/decode text)}) {:ok false
(put! ch {:ok false :status (.-status resp)
:status (.-status resp) :error text})))))))))
:error text}))
(async/close! ch))))))
(.catch (fn [err]
(put! ch {:ok false :status 0 :error (str err)})
(async/close! ch))))
ch)))
;; --------------------------------------------------------------------------- ;; ---------------------------------------------------------------------------
;; Pull ;; Pull
@@ -69,27 +62,24 @@
(defn pull! (defn pull!
"Pull documents from server updated since `since` for `group`. "Pull documents from server updated since `since` for `group`.
Returns a channel yielding {:ok true :docs [...]} or {:ok false :error str}." Returns a promise yielding {:ok true :docs [...]} or {:ok false :error str}."
[{:keys [server]} group since] [{:keys [server]} group since]
(let [url (str server "?group=" #?(:clj (java.net.URLEncoder/encode group "UTF-8") (let [url (str server "?group=" #?(:clj (java.net.URLEncoder/encode group "UTF-8")
:cljs (js/encodeURIComponent group)) :cljs (js/encodeURIComponent group))
"&since=" since)] "&since=" since)]
#?(:clj #?(:clj
(async/thread (p/vthread
(let [result (http-request {:url url :method :get})] (let [result (http-request {:url url :method :get})]
(if (:ok result) (if (:ok result)
{:ok true :docs (:body result)} {:ok true :docs (:body result)}
result))) result)))
:cljs :cljs
(let [ch (chan 1)] (p/then (fetch-transit {:url url :method "GET"})
(async/go (fn [result]
(let [result (async/<! (fetch-transit {:url url :method "GET"}))] (if (:ok result)
(if (:ok result) {:ok true :docs (:body result)}
(put! ch {:ok true :docs (:body result)}) result))))))
(put! ch result))
(async/close! ch)))
ch))))
;; --------------------------------------------------------------------------- ;; ---------------------------------------------------------------------------
;; Push ;; Push
@@ -97,24 +87,21 @@
(defn push! (defn push!
"Push a batch of documents to the server. "Push a batch of documents to the server.
Returns a channel yielding {:ok true :results [...]} or {:ok false :error str}." Returns a promise yielding {:ok true :results [...]} or {:ok false :error str}."
[{:keys [server]} docs] [{:keys [server]} docs]
#?(:clj #?(:clj
(async/thread (p/vthread
(let [result (http-request {:url server :method :post :body docs})] (let [result (http-request {:url server :method :post :body docs})]
(if (:ok result) (if (:ok result)
{:ok true :results (:body result)} {:ok true :results (:body result)}
result))) result)))
:cljs :cljs
(let [ch (chan 1)] (p/then (fetch-transit {:url server :method "POST" :body docs})
(async/go (fn [result]
(let [result (async/<! (fetch-transit {:url server :method "POST" :body docs}))] (if (:ok result)
(if (:ok result) {:ok true :results (:body result)}
(put! ch {:ok true :results (:body result)}) result)))))
(put! ch result))
(async/close! ch)))
ch)))
;; --------------------------------------------------------------------------- ;; ---------------------------------------------------------------------------
;; SSE — live change notifications (CLJS only) ;; SSE — live change notifications (CLJS only)

13
tasks/todo.md Normal file
View File

@@ -0,0 +1,13 @@
# Replace core.async with promesa
## Tasks
- [x] 1. Add `funcool/promesa` to deps.edn, remove `core.async`
- [x] 2. Update `store.cljc` docstring (returns promises, not channels)
- [x] 3. Rewrite `store/memory.cljc` — return `(p/resolved val)`
- [x] 4. Rewrite `store/idb.cljs` — wrap IDB callbacks in `(p/create ...)`
- [x] 5. Rewrite `sync.cljc` — return promises from pull!/push!/fetch-transit
- [x] 6. Rewrite `core.cljc` — promises + timer-based sync loop
- [x] 7. Update `test/core_test.clj``deref` promises instead of `<!!`
- [x] 8. Update `example/todomvc/todomvc.cljs``p/let` instead of `go`/`<!`
- [x] 9. Run tests — 28 tests, 87 assertions, 0 failures

View File

@@ -1,6 +1,6 @@
(ns pocketbook.core-test (ns pocketbook.core-test
(:require [clojure.test :refer [deftest is testing use-fixtures]] (:require [clojure.test :refer [deftest is testing use-fixtures]]
[clojure.core.async :as async :refer [<!! go <! timeout]] [promesa.core :as p]
[pocketbook.core :as pb] [pocketbook.core :as pb]
[pocketbook.store :as store] [pocketbook.store :as store]
[pocketbook.store.memory :as memory] [pocketbook.store.memory :as memory]
@@ -38,11 +38,10 @@
;; Helpers ;; Helpers
;; --------------------------------------------------------------------------- ;; ---------------------------------------------------------------------------
(defn- <!!timeout (defn- await!
"Take from channel with timeout. Returns nil on timeout." "Deref a promise with timeout. Returns nil on timeout."
[ch ms] [promise ms]
(let [[v _] (<!! (go (async/alts! [ch (timeout ms)])))] (deref promise ms nil))
v))
(defn- wait-synced (defn- wait-synced
"Wait until the synced atom has no pending changes." "Wait until the synced atom has no pending changes."
@@ -62,7 +61,7 @@
(testing "SyncedAtom works without a server (local store only)" (testing "SyncedAtom works without a server (local store only)"
(let [store (memory/create) (let [store (memory/create)
sa (pb/synced-atom store "todo")] sa (pb/synced-atom store "todo")]
(<!!timeout (pb/ready? sa) 1000) (await! (pb/ready? sa) 1000)
(is (= {} @sa)) (is (= {} @sa))
(swap! sa assoc "todo:1" {:text "Buy milk"}) (swap! sa assoc "todo:1" {:text "Buy milk"})
@@ -81,13 +80,13 @@
(testing "Changes are persisted to the store" (testing "Changes are persisted to the store"
(let [store (memory/create) (let [store (memory/create)
sa (pb/synced-atom store "todo")] sa (pb/synced-atom store "todo")]
(<!!timeout (pb/ready? sa) 1000) (await! (pb/ready? sa) 1000)
(swap! sa assoc "todo:1" {:text "Buy milk"}) (swap! sa assoc "todo:1" {:text "Buy milk"})
(Thread/sleep 50) ;; let async store write complete (Thread/sleep 50) ;; let async store write complete
;; Read from store directly ;; Read from store directly
(let [docs (<!!timeout (store/docs-by-prefix store "todo:") 1000)] (let [docs (await! (store/docs-by-prefix store "todo:") 1000)]
(is (= 1 (count docs))) (is (= 1 (count docs)))
(is (= "todo:1" (:id (first docs)))) (is (= "todo:1" (:id (first docs))))
(is (= {:text "Buy milk"} (:value (first docs))))) (is (= {:text "Buy milk"} (:value (first docs)))))
@@ -98,13 +97,13 @@
(testing "SyncedAtom loads existing data from store on creation" (testing "SyncedAtom loads existing data from store on creation"
(let [store (memory/create)] (let [store (memory/create)]
;; Pre-populate the store ;; Pre-populate the store
(<!!timeout (store/put-doc! store (await! (store/put-doc! store
{:id "todo:1" :value {:text "Existing"} {:id "todo:1" :value {:text "Existing"}
:version 1 :updated 1000 :deleted false :synced true}) :version 1 :updated 1000 :deleted false :synced true})
1000) 1000)
(let [sa (pb/synced-atom store "todo")] (let [sa (pb/synced-atom store "todo")]
(<!!timeout (pb/ready? sa) 1000) (await! (pb/ready? sa) 1000)
(is (= {:text "Existing"} (get @sa "todo:1"))) (is (= {:text "Existing"} (get @sa "todo:1")))
(pb/destroy! sa))))) (pb/destroy! sa)))))
@@ -113,7 +112,7 @@
(let [store (memory/create) (let [store (memory/create)
sa (pb/synced-atom store "todo") sa (pb/synced-atom store "todo")
changes (atom [])] changes (atom [])]
(<!!timeout (pb/ready? sa) 1000) (await! (pb/ready? sa) 1000)
(add-watch sa :test (fn [_ _ old new] (add-watch sa :test (fn [_ _ old new]
(swap! changes conj {:old old :new new}))) (swap! changes conj {:old old :new new})))
@@ -133,7 +132,7 @@
(let [store (memory/create) (let [store (memory/create)
sa (pb/synced-atom store "todo" sa (pb/synced-atom store "todo"
{:server (server-url)})] {:server (server-url)})]
(<!!timeout (pb/ready? sa) 2000) (await! (pb/ready? sa) 2000)
(swap! sa assoc "todo:push1" {:text "Pushed!"}) (swap! sa assoc "todo:push1" {:text "Pushed!"})
(Thread/sleep 500) ;; let push complete (Thread/sleep 500) ;; let push complete
@@ -149,15 +148,15 @@
{:server (server-url) :interval 500}) {:server (server-url) :interval 500})
sa-b (pb/synced-atom store-b "todo" sa-b (pb/synced-atom store-b "todo"
{:server (server-url) :interval 500})] {:server (server-url) :interval 500})]
(<!!timeout (pb/ready? sa-a) 2000) (await! (pb/ready? sa-a) 2000)
(<!!timeout (pb/ready? sa-b) 2000) (await! (pb/ready? sa-b) 2000)
;; Client A writes ;; Client A writes
(swap! sa-a assoc "todo:sync1" {:text "From A"}) (swap! sa-a assoc "todo:sync1" {:text "From A"})
(Thread/sleep 500) ;; let A push (Thread/sleep 500) ;; let A push
;; Trigger a sync on B ;; Trigger a sync on B
(<!!timeout (pb/sync-now! sa-b) 2000) (await! (pb/sync-now! sa-b) 2000)
;; B should have A's data ;; B should have A's data
(is (= {:text "From A"} (get @sa-b "todo:sync1"))) (is (= {:text "From A"} (get @sa-b "todo:sync1")))
@@ -169,7 +168,7 @@
(testing "Standard atom operations work" (testing "Standard atom operations work"
(let [store (memory/create) (let [store (memory/create)
sa (pb/synced-atom store "note")] sa (pb/synced-atom store "note")]
(<!!timeout (pb/ready? sa) 1000) (await! (pb/ready? sa) 1000)
;; reset! ;; reset!
(reset! sa {"note:1" {:body "Hello"}}) (reset! sa {"note:1" {:body "Hello"}})