diff --git a/deps.edn b/deps.edn index 5da82c5..e849462 100644 --- a/deps.edn +++ b/deps.edn @@ -1,6 +1,6 @@ {:paths ["src"] :deps {org.clojure/clojure {:mvn/version "1.12.0"} - org.clojure/core.async {:mvn/version "1.7.701"} + funcool/promesa {:mvn/version "11.0.678"} http-kit/http-kit {:mvn/version "2.8.0"} com.cognitect/transit-clj {:mvn/version "1.0.333"} com.taoensso/nippy {:mvn/version "3.4.2"} @@ -18,12 +18,10 @@ ;; ClojureScript client build :cljs {:extra-paths ["example/todomvc"] :extra-deps {org.clojure/clojurescript {:mvn/version "1.11.132"} - com.cognitect/transit-cljs {:mvn/version "0.8.280"} - org.clojure/core.async {:mvn/version "1.7.701"}} + com.cognitect/transit-cljs {:mvn/version "0.8.280"}} :main-opts ["-m" "cljs.main" "-co" "build.edn" "-c"]} :cljs-dev {:extra-paths ["example/todomvc"] :extra-deps {org.clojure/clojurescript {:mvn/version "1.11.132"} - com.cognitect/transit-cljs {:mvn/version "0.8.280"} - org.clojure/core.async {:mvn/version "1.7.701"}} + com.cognitect/transit-cljs {:mvn/version "0.8.280"}} :main-opts ["-m" "cljs.main" "-co" "build.edn" "-w" "src:example/todomvc/pocketbook" "-c"]}}} diff --git a/example/todomvc/pocketbook/todomvc.cljs b/example/todomvc/pocketbook/todomvc.cljs index ff129a6..ec57b48 100644 --- a/example/todomvc/pocketbook/todomvc.cljs +++ b/example/todomvc/pocketbook/todomvc.cljs @@ -3,7 +3,7 @@ (:require [pocketbook.core :as pb] [pocketbook.store.idb :as idb] [pocketbook.hiccup :refer [html]] - [cljs.core.async :refer [go {\"todo:1\" {:text \"Buy milk\"}} " (:require [pocketbook.store :as store] [pocketbook.sync :as sync] [clojure.string :as str] - #?(:clj [clojure.core.async :as async :refer [go go-loop ! chan put! timeout alts!]] - :cljs [cljs.core.async :as async :refer [go go-loop ! chan put! timeout alts!]]))) + [promesa.core :as p]) + #?(:clj (:import [java.util.concurrent Executors ScheduledExecutorService TimeUnit]))) ;; --------------------------------------------------------------------------- ;; Internal helpers @@ -26,13 +26,19 @@ (defn- prefix-str [group] (str group ":")) +;; --------------------------------------------------------------------------- +;; Forward declarations +;; --------------------------------------------------------------------------- + +(declare schedule-push!) + ;; --------------------------------------------------------------------------- ;; Shared reset logic ;; --------------------------------------------------------------------------- (defn- do-reset!* "Shared reset implementation. Updates cache, tracks pending, writes to store, kicks sync." - [store cache versions pending kick-ch new-val] + [store cache versions pending sa new-val] (let [old @cache] (reset! cache new-val) (let [all-keys (into (set (keys old)) (keys new-val)) @@ -50,7 +56,7 @@ {:id k :value v :version (get @versions k 0) :updated (now-ms) :deleted false :synced false}))))) (when @changed? - (put! kick-ch :kick))) + (schedule-push! sa))) new-val)) ;; --------------------------------------------------------------------------- @@ -64,9 +70,11 @@ pending ;; atom containing #{id} — unsynced ids server-opts ;; {:server url} or nil last-sync ;; atom containing epoch ms - ready-ch ;; channel, closed when initial load complete - stop-ch ;; channel to signal stop - kick-ch ;; channel to trigger immediate push + ready-pr ;; promesa deferred, resolved when initial load complete + sync-timer ;; atom holding timer reference (for cleanup) + push-timer ;; atom holding debounce timer reference + pushing? ;; atom boolean — guard against overlapping pushes + syncing? ;; atom boolean — guard against overlapping syncs cleanup-fn ;; atom holding connectivity cleanup fn sync-interval ;; ms _meta] ;; metadata atom @@ -83,8 +91,8 @@ (setValidator [_ _vf] nil) clojure.lang.IAtom - (reset [_ newval] - (do-reset!* store cache versions pending kick-ch newval)) + (reset [this newval] + (do-reset!* store cache versions pending this newval)) (swap [this f] (.reset this (f @cache))) (swap [this f arg] @@ -105,8 +113,8 @@ (-deref [_] @cache) IReset - (-reset! [_ new-val] - (do-reset!* store cache versions pending kick-ch new-val)) + (-reset! [this new-val] + (do-reset!* store cache versions pending this new-val)) ISwap (-swap! [o f] @@ -142,31 +150,28 @@ (defn- load-from-store! "Load all docs for the group from the store into the atom. - Returns a channel that closes when done." + Returns a promise that resolves when done." [sa] - (let [ch (chan 1)] - (go - (let [prefix (prefix-str (.-group sa)) - docs ( (:version doc) (get @(.-versions sa) id 0)) - (if (:deleted doc) - (do - (swap! (.-cache sa) dissoc id) - (swap! (.-versions sa) assoc id (:version doc)) - (store/put-doc! (.-store sa) - {:id id :value nil :version (:version doc) - :updated (:updated doc) :deleted true :synced true})) - (do - (swap! (.-cache sa) assoc id (:value doc)) - (swap! (.-versions sa) assoc id (:version doc)) - (store/put-doc! (.-store sa) - {:id id :value (:value doc) :version (:version doc) - :updated (:updated doc) :deleted false :synced true})))))) + (if-let [opts (.-server_opts sa)] + (p/let [since @(.-last_sync sa) + result (sync/pull! opts (.-group sa) since)] + (when (:ok result) + (let [docs (:docs result) + max-ts (reduce max @(.-last_sync sa) + (map :updated docs))] + (p/let [_ (p/all + (for [doc docs + :let [id (:id doc)] + :when (> (:version doc) (get @(.-versions sa) id 0))] + (if (:deleted doc) + (do + (swap! (.-cache sa) dissoc id) + (swap! (.-versions sa) assoc id (:version doc)) + (store/put-doc! (.-store sa) + {:id id :value nil :version (:version doc) + :updated (:updated doc) :deleted true :synced true})) + (do + (swap! (.-cache sa) assoc id (:value doc)) + (swap! (.-versions sa) assoc id (:version doc)) + (store/put-doc! (.-store sa) + {:id id :value (:value doc) :version (:version doc) + :updated (:updated doc) :deleted false :synced true})))))] (reset! (.-last_sync sa) max-ts) (store/set-meta! (.-store sa) - (str "last-sync:" (.-group sa)) max-ts)) - true))))) + (str "last-sync:" (.-group sa)) max-ts))))) + (p/resolved nil))) (defn- do-push! - "Push all unsynced local docs to the server." + "Push all unsynced local docs to the server. Returns a promise." [sa] - (go - (when-let [opts (.-server_opts sa)] - (let [pending-ids @(.-pending sa)] - (when (seq pending-ids) - (let [docs (mapv (fn [id] - (let [v (get @(.-cache sa) id)] - (if (nil? v) - {:id id :deleted true - :base-version (get @(.-versions sa) id 0)} - {:id id :value v - :base-version (get @(.-versions sa) id 0)}))) - pending-ids) - result ( (do-push! sa) + (p/finally + (fn [_ _] + (reset! (.-pushing? sa) false) + (when (seq @(.-pending sa)) + (schedule-push! sa))))))) + :cljs (js/setTimeout + (fn [] + (when-not @(.-pushing? sa) + (reset! (.-pushing? sa) true) + (-> (do-push! sa) + (p/finally + (fn [_ _] + (reset! (.-pushing? sa) false) + (when (seq @(.-pending sa)) + (schedule-push! sa))))))) + 50)))) + +;; --------------------------------------------------------------------------- +;; Sync loop (timer-based) ;; --------------------------------------------------------------------------- (defn- start-sync-loop! - "Start the background sync loop." + "Start the background sync loop using platform timers." [sa] - (let [stop-ch (.-stop_ch sa) - kick-ch (.-kick_ch sa) - interval (.-sync_interval sa) - cleanups (atom [])] - ;; Periodic sync (fallback) + immediate push on kick - (go-loop [] - (let [[_ ch] (alts! [stop-ch kick-ch (timeout interval)])] - (when-not (= ch stop-ch) - (if (= ch kick-ch) - ( (do-sync! sa) + (p/finally + (fn [_ _] + (reset! (.-syncing? sa) false))))))] + ;; Periodic sync + #?(:clj + (let [^ScheduledExecutorService exec (Executors/newSingleThreadScheduledExecutor)] + (.scheduleAtFixedRate exec ^Runnable sync-fn + (long interval) (long interval) TimeUnit/MILLISECONDS) + (reset! (.-sync_timer sa) exec) + (swap! cleanups conj (fn [] (.shutdown exec)))) + :cljs + (let [timer-id (js/setInterval sync-fn interval)] + (reset! (.-sync_timer sa) timer-id) + (swap! cleanups conj (fn [] (js/clearInterval timer-id))))) + ;; Online/offline handler (swap! cleanups conj (sync/on-connectivity-change - (fn [] (go ( (load-from-store! sa) + (p/then (fn [_] + (p/resolve! ready-pr true) + (when server-opts + (-> (do-sync! sa) + (p/then (fn [_] (start-sync-loop! sa))))))) + (p/catch (fn [err] + (p/reject! ready-pr err)))) sa)) (defn ready? - "Returns a channel that yields true when the atom has finished loading from the store." + "Returns a promise that yields true when the atom has finished loading from the store." [sa] - (.-ready_ch sa)) + (.-ready_pr sa)) (defn sync-now! - "Trigger an immediate sync cycle. Returns a channel." + "Trigger an immediate sync cycle. Returns a promise." [sa] (do-sync! sa)) @@ -350,5 +405,9 @@ (defn destroy! "Stop the sync loop and clean up. Does not close the store." [sa] - (put! (.-stop_ch sa) :stop) + ;; Cancel push debounce timer + (when-let [t @(.-push_timer sa)] + #?(:clj (future-cancel t) + :cljs (js/clearTimeout t))) + ;; Run all cleanup fns (timer, connectivity, SSE) (doseq [f @(.-cleanup_fn sa)] (f))) diff --git a/src/pocketbook/store.cljc b/src/pocketbook/store.cljc index 883f17f..aa60002 100644 --- a/src/pocketbook/store.cljc +++ b/src/pocketbook/store.cljc @@ -1,23 +1,23 @@ (ns pocketbook.store "Storage protocol for Pocketbook. - All methods return core.async channels.") + All methods return promesa promises.") (defprotocol PStore (put-doc! [store doc] "Write a document to the store. doc is a map: {:id str, :value any, :version int, :updated int, :deleted bool, :synced bool} - Returns a channel that closes on success.") + Returns a promise that resolves on success.") (docs-by-prefix [store prefix] "Get all documents whose id starts with prefix. - Returns a channel yielding a vector of doc maps.") + Returns a promise yielding a vector of doc maps.") (get-meta [store key] "Get a metadata value by key. - Returns a channel yielding the value, or nil if not found.") + Returns a promise yielding the value, or nil if not found.") (set-meta! [store key value] - "Set a metadata value. Returns a channel that closes on success.") + "Set a metadata value. Returns a promise that resolves on success.") (close-store! [store] "Close the store and release resources.")) diff --git a/src/pocketbook/store/idb.cljs b/src/pocketbook/store/idb.cljs index 6e390e4..ef0a9bf 100644 --- a/src/pocketbook/store/idb.cljs +++ b/src/pocketbook/store/idb.cljs @@ -2,7 +2,7 @@ "IndexedDB store implementing the PStore protocol." (:require [pocketbook.store :as store] [pocketbook.transit :as transit] - [cljs.core.async :as async :refer [chan put!]])) + [promesa.core :as p])) ;; --------------------------------------------------------------------------- ;; IDB operations @@ -21,70 +21,71 @@ (deftype IDBStore [db] store/PStore (put-doc! [_ doc] - (let [ch (chan 1) - txn (tx db "docs" :readwrite) - store (.objectStore txn "docs") - obj #js {:id (:id doc) - :value (transit/encode (:value doc)) - :version (:version doc 0) - :updated (:updated doc 0) - :deleted (boolean (:deleted doc false)) - :synced (boolean (:synced doc false))} - req (.put store obj)] - (set! (.-onsuccess req) (fn [_] (put! ch true) (async/close! ch))) - (set! (.-onerror req) (fn [e] (js/console.error "IDB put error:" e) (async/close! ch))) - ch)) + (p/create + (fn [resolve reject] + (let [txn (tx db "docs" :readwrite) + store (.objectStore txn "docs") + obj #js {:id (:id doc) + :value (transit/encode (:value doc)) + :version (:version doc 0) + :updated (:updated doc 0) + :deleted (boolean (:deleted doc false)) + :synced (boolean (:synced doc false))} + req (.put store obj)] + (set! (.-onsuccess req) (fn [_] (resolve true))) + (set! (.-onerror req) (fn [e] + (js/console.error "IDB put error:" e) + (reject e))))))) (docs-by-prefix [_ prefix] - (let [ch (chan 1) - txn (tx db "docs" :readonly) - store (.objectStore txn "docs") - range (.bound js/IDBKeyRange prefix (str prefix "\uffff")) - req (.openCursor store range) - docs (atom [])] - (set! (.-onsuccess req) - (fn [e] - (let [cursor (.-result (.-target e))] - (if cursor - (let [val (.-value cursor)] - (swap! docs conj - {:id (.-id val) - :value (transit/decode (.-value val)) - :version (.-version val) - :updated (.-updated val) - :deleted (.-deleted val) - :synced (.-synced val)}) - (.continue cursor)) - (do - (put! ch @docs) - (async/close! ch)))))) - (set! (.-onerror req) - (fn [e] (js/console.error "IDB cursor error:" e) (async/close! ch))) - ch)) + (p/create + (fn [resolve reject] + (let [txn (tx db "docs" :readonly) + store (.objectStore txn "docs") + range (.bound js/IDBKeyRange prefix (str prefix "\uffff")) + req (.openCursor store range) + docs (atom [])] + (set! (.-onsuccess req) + (fn [e] + (let [cursor (.-result (.-target e))] + (if cursor + (let [val (.-value cursor)] + (swap! docs conj + {:id (.-id val) + :value (transit/decode (.-value val)) + :version (.-version val) + :updated (.-updated val) + :deleted (.-deleted val) + :synced (.-synced val)}) + (.continue cursor)) + (resolve @docs))))) + (set! (.-onerror req) + (fn [e] + (js/console.error "IDB cursor error:" e) + (reject e))))))) (get-meta [_ key] - (let [ch (chan 1) - txn (tx db "meta" :readonly) - store (.objectStore txn "meta") - req (.get store key)] - (set! (.-onsuccess req) - (fn [e] - (let [result (.-result (.-target e))] - (if result - (do (put! ch (.-value result)) - (async/close! ch)) - (async/close! ch))))) - (set! (.-onerror req) (fn [_] (async/close! ch))) - ch)) + (p/create + (fn [resolve reject] + (let [txn (tx db "meta" :readonly) + store (.objectStore txn "meta") + req (.get store key)] + (set! (.-onsuccess req) + (fn [e] + (let [result (.-result (.-target e))] + (resolve (when result (.-value result)))))) + (set! (.-onerror req) + (fn [_] (reject (js/Error. "IDB get-meta error")))))))) (set-meta! [_ key value] - (let [ch (chan 1) - txn (tx db "meta" :readwrite) - store (.objectStore txn "meta") - req (.put store #js {:key key :value value})] - (set! (.-onsuccess req) (fn [_] (put! ch true) (async/close! ch))) - (set! (.-onerror req) (fn [_] (async/close! ch))) - ch)) + (p/create + (fn [resolve reject] + (let [txn (tx db "meta" :readwrite) + store (.objectStore txn "meta") + req (.put store #js {:key key :value value})] + (set! (.-onsuccess req) (fn [_] (resolve true))) + (set! (.-onerror req) + (fn [_] (reject (js/Error. "IDB set-meta error")))))))) (close-store! [_] (when db (.close db)))) @@ -94,25 +95,24 @@ ;; --------------------------------------------------------------------------- (defn open - "Open an IndexedDB store. Returns a channel yielding the IDBStore." + "Open an IndexedDB store. Returns a promise yielding the IDBStore." [db-name] - (let [ch (chan 1) - req (.open js/indexedDB db-name 1)] - (set! (.-onupgradeneeded req) - (fn [e] - (let [db (.-result (.-target e))] - (when-not (.contains (.-objectStoreNames db) "docs") - (let [store (.createObjectStore db "docs" #js {:keyPath "id"})] - (.createIndex store "synced" "synced" #js {:unique false}) - (.createIndex store "updated" "updated" #js {:unique false}))) - (when-not (.contains (.-objectStoreNames db) "meta") - (.createObjectStore db "meta" #js {:keyPath "key"}))))) - (set! (.-onsuccess req) - (fn [e] - (put! ch (IDBStore. (.-result (.-target e)))) - (async/close! ch))) - (set! (.-onerror req) - (fn [e] - (js/console.error "IDB open error:" e) - (async/close! ch))) - ch)) + (p/create + (fn [resolve reject] + (let [req (.open js/indexedDB db-name 1)] + (set! (.-onupgradeneeded req) + (fn [e] + (let [db (.-result (.-target e))] + (when-not (.contains (.-objectStoreNames db) "docs") + (let [store (.createObjectStore db "docs" #js {:keyPath "id"})] + (.createIndex store "synced" "synced" #js {:unique false}) + (.createIndex store "updated" "updated" #js {:unique false}))) + (when-not (.contains (.-objectStoreNames db) "meta") + (.createObjectStore db "meta" #js {:keyPath "key"}))))) + (set! (.-onsuccess req) + (fn [e] + (resolve (IDBStore. (.-result (.-target e)))))) + (set! (.-onerror req) + (fn [e] + (js/console.error "IDB open error:" e) + (reject e))))))) diff --git a/src/pocketbook/store/memory.cljc b/src/pocketbook/store/memory.cljc index 1adf828..9e54919 100644 --- a/src/pocketbook/store/memory.cljc +++ b/src/pocketbook/store/memory.cljc @@ -1,44 +1,28 @@ (ns pocketbook.store.memory "In-memory store backed by atoms. Useful for testing and JVM clients." (:require [pocketbook.store :as store] - #?(:clj [clojure.core.async :as async :refer [chan put!]] - :cljs [cljs.core.async :as async :refer [chan put!]]) + [promesa.core :as p] [clojure.string :as str])) (deftype MemoryStore [docs meta-store] store/PStore (put-doc! [_ doc] - (let [ch (chan 1)] - (swap! docs assoc (:id doc) doc) - (put! ch true) - (async/close! ch) - ch)) + (swap! docs assoc (:id doc) doc) + (p/resolved true)) (docs-by-prefix [_ prefix] - (let [ch (chan 1) - matching (->> @docs - vals - (filter #(str/starts-with? (:id %) prefix)) - vec)] - (put! ch matching) - (async/close! ch) - ch)) + (p/resolved + (->> @docs + vals + (filter #(str/starts-with? (:id %) prefix)) + vec))) (get-meta [_ key] - (let [ch (chan 1) - v (get @meta-store key)] - (if (some? v) - (put! ch v) - nil) - (async/close! ch) - ch)) + (p/resolved (get @meta-store key))) (set-meta! [_ key value] - (let [ch (chan 1)] - (swap! meta-store assoc key value) - (put! ch true) - (async/close! ch) - ch)) + (swap! meta-store assoc key value) + (p/resolved true)) (close-store! [_] nil)) diff --git a/src/pocketbook/sync.cljc b/src/pocketbook/sync.cljc index 05b8041..71b98e8 100644 --- a/src/pocketbook/sync.cljc +++ b/src/pocketbook/sync.cljc @@ -2,8 +2,7 @@ "HTTP sync client — pull and push documents to/from the Pocketbook server." (:require [pocketbook.transit :as transit] [clojure.string :as str] - #?(:clj [clojure.core.async :as async :refer [chan put!]] - :cljs [cljs.core.async :as async :refer [chan put!]])) + [promesa.core :as p]) #?(:clj (:import [java.net URI] [java.net.http HttpClient HttpRequest HttpRequest$BodyPublishers HttpResponse$BodyHandlers]))) @@ -40,28 +39,22 @@ #?(:cljs (defn- fetch-transit "Make an HTTP request with Transit encoding (browser). - Returns a channel yielding {:ok bool :body decoded :status int}." + Returns a promise yielding {:ok bool :body decoded :status int}." [{:keys [url method body]}] - (let [ch (chan 1) - opts (clj->js - (cond-> {:method (or method "GET") - :headers {"Content-Type" "application/transit+json" - "Accept" "application/transit+json"}} - body (assoc :body (transit/encode body))))] - (-> (js/fetch url opts) - (.then (fn [resp] - (-> (.text resp) - (.then (fn [text] - (if (.-ok resp) - (put! ch {:ok true :body (transit/decode text)}) - (put! ch {:ok false - :status (.-status resp) - :error text})) - (async/close! ch)))))) - (.catch (fn [err] - (put! ch {:ok false :status 0 :error (str err)}) - (async/close! ch)))) - ch))) + (-> (js/fetch url + (clj->js + (cond-> {:method (or method "GET") + :headers {"Content-Type" "application/transit+json" + "Accept" "application/transit+json"}} + body (assoc :body (transit/encode body))))) + (.then (fn [resp] + (-> (.text resp) + (.then (fn [text] + (if (.-ok resp) + {:ok true :body (transit/decode text)} + {:ok false + :status (.-status resp) + :error text}))))))))) ;; --------------------------------------------------------------------------- ;; Pull @@ -69,27 +62,24 @@ (defn pull! "Pull documents from server updated since `since` for `group`. - Returns a channel yielding {:ok true :docs [...]} or {:ok false :error str}." + Returns a promise yielding {:ok true :docs [...]} or {:ok false :error str}." [{:keys [server]} group since] (let [url (str server "?group=" #?(:clj (java.net.URLEncoder/encode group "UTF-8") :cljs (js/encodeURIComponent group)) "&since=" since)] #?(:clj - (async/thread + (p/vthread (let [result (http-request {:url url :method :get})] (if (:ok result) {:ok true :docs (:body result)} result))) :cljs - (let [ch (chan 1)] - (async/go - (let [result (async/