From 78b897cc2502d21fa225df75cbfced661da9c9d8 Mon Sep 17 00:00:00 2001 From: Florian Schroedl Date: Sat, 18 Apr 2026 16:51:09 +0200 Subject: [PATCH] feat: add Connection API and make SyncedAtom a promise Add `connect` to manage store + shared config, and `synced-atom` now accepts a Connection or raw store (backwards compatible). SyncedAtom implements IPromiseFactory so it works directly with p/let: (def conn (pb/connect {:db "my-app" :server "..."})) (def !todos (pb/synced-atom conn "todo")) (p/let [todos !todos] (swap! todos assoc "k" v)) - Connection record holds store-promise + default opts (server, interval) - `connect` accepts :store (CLJ/CLJS) or :db (CLJS, opens IndexedDB) - `ready?` now resolves to the SyncedAtom itself instead of `true` - Add `close!` to close a Connection's store - Store field is now an atom (set when store-promise resolves) - Simplify TodoMVC example: no more !conn atom or double-deref --- example/todomvc/atomsync/todomvc.cljs | 51 +++++++------- src/atomsync/core.cljc | 99 ++++++++++++++++++++------- test/atomsync/core_test.clj | 93 +++++++++++++++++++++++++ 3 files changed, 189 insertions(+), 54 deletions(-) diff --git a/example/todomvc/atomsync/todomvc.cljs b/example/todomvc/atomsync/todomvc.cljs index 97c6ce5..9798c17 100644 --- a/example/todomvc/atomsync/todomvc.cljs +++ b/example/todomvc/atomsync/todomvc.cljs @@ -1,7 +1,6 @@ (ns atomsync.todomvc "TodoMVC built on Atomsync — offline-first, synced, Clojure-native." (:require [atomsync.core :as pb] - [atomsync.store.idb :as idb] [atomsync.hiccup :refer [html]] [promesa.core :as p] [clojure.string :as str])) @@ -10,8 +9,10 @@ ;; State ;; --------------------------------------------------------------------------- -(defonce !conn (atom nil)) -(defonce !todos (atom nil)) ;; the SyncedAtom +(defonce conn (pb/connect {:db "atomsync-todomvc" + :server "http://localhost:8090/sync" + :interval 15000})) +(defonce !todos (pb/synced-atom conn "todo")) (defonce !filter (atom :all)) ;; :all | :active | :completed (defonce !editing (atom nil)) ;; id of todo being edited, or nil @@ -22,7 +23,7 @@ (defn- todos-list "Return todos as sorted vec of [id doc] pairs." [] - (->> @@!todos + (->> @!todos (sort-by (fn [[_ doc]] (:created doc))) vec)) @@ -50,33 +51,33 @@ (let [text (str/trim text)] (when (seq text) (let [id (str "todo:" (random-uuid))] - (swap! @!todos assoc id + (swap! !todos assoc id {:text text :completed false :created (.now js/Date)}))))) (defn- toggle-todo! [id] - (swap! @!todos update-in [id :completed] not)) + (swap! !todos update-in [id :completed] not)) (defn- toggle-all! [] (let [target (not (all-completed?))] - (swap! @!todos + (swap! !todos (fn [m] (reduce-kv (fn [acc k v] (assoc acc k (assoc v :completed target))) {} m))))) (defn- destroy-todo! [id] - (swap! @!todos dissoc id)) + (swap! !todos dissoc id)) (defn- edit-todo! [id new-text] (let [text (str/trim new-text)] (if (seq text) - (swap! @!todos assoc-in [id :text] text) + (swap! !todos assoc-in [id :text] text) (destroy-todo! id)) (reset! !editing nil))) (defn- clear-completed! [] - (swap! @!todos + (swap! !todos (fn [m] (into {} (remove (fn [[_ v]] (:completed v))) m)))) @@ -119,7 +120,7 @@ "Clear completed"])])) (defn- render-sync-status [] - (let [pending (when @!todos (pb/pending-count @!todos)) + (let [pending (pb/pending-count !todos) online? (.-onLine js/navigator)] [:div.sync-bar [:span {:class (str "sync-dot " (if online? "online" "offline"))}] @@ -232,22 +233,16 @@ ;; --------------------------------------------------------------------------- (defn ^:export init [] - (p/let [store (idb/open "atomsync-todomvc")] - (let [todos (pb/synced-atom store "todo" - {:server "http://localhost:8090/sync" - :interval 15000})] - (reset! !conn store) - (reset! !todos todos) - ;; Render + bind immediately (empty or stale is fine) - (render!) - (bind-events!) - ;; Re-render on any data change (fires when IDB loads + server syncs) - (add-watch todos :render (fn [_ _ _ _] (render!))) - (add-watch !filter :render (fn [_ _ _ _] (render!))) - (.addEventListener js/window "online" (fn [_] (render!))) - (.addEventListener js/window "offline" (fn [_] (render!))) - ;; Wait for IDB — watch triggers render automatically - (p/let [_ (pb/ready? todos)] - (js/console.log "🔶 Atomsync TodoMVC loaded —" (count @todos) "todos"))))) + ;; Render + bind immediately (empty or stale is fine) + (render!) + (bind-events!) + ;; Re-render on any data change (fires when IDB loads + server syncs) + (add-watch !todos :render (fn [_ _ _ _] (render!))) + (add-watch !filter :render (fn [_ _ _ _] (render!))) + (.addEventListener js/window "online" (fn [_] (render!))) + (.addEventListener js/window "offline" (fn [_] (render!))) + ;; Wait for IDB — watch triggers render automatically + (p/let [todos !todos] + (js/console.log "🔶 Atomsync TodoMVC loaded —" (count @todos) "todos"))) (init) diff --git a/src/atomsync/core.cljc b/src/atomsync/core.cljc index 8240747..9c4157e 100644 --- a/src/atomsync/core.cljc +++ b/src/atomsync/core.cljc @@ -2,19 +2,42 @@ "Atomsync: a Clojure-native synced atom. Usage: - (def store @(idb/open \"my-app\")) ;; or (memory/create) - (def todos (pb/synced-atom store \"todo\" - {:server \"http://localhost:8090/sync\"})) - (.then (ready? todos) - (fn [_] (swap! todos assoc \"todo:1\" {:text \"Buy milk\"}))) + (def conn (connect {:db \"my-app\" + :server \"http://localhost:8090/sync\"})) + (def todos (synced-atom conn \"todo\")) + (p/let [todos todos] + (swap! todos assoc \"todo:1\" {:text \"Buy milk\"})) @todos ;=> {\"todo:1\" {:text \"Buy milk\"}} " (:require [atomsync.store :as store] [atomsync.sync :as sync] [clojure.string :as str] - [promesa.core :as p]) + [promesa.core :as p] + [promesa.protocols :as pt] + #?(:cljs [atomsync.store.idb :as idb])) #?(:clj (:import [java.util.concurrent Executors ScheduledExecutorService TimeUnit]))) +;; --------------------------------------------------------------------------- +;; Connection +;; --------------------------------------------------------------------------- + +(defrecord Connection [store-promise opts]) + +(defn connect + "Create a connection for synced atoms. + + Options: + :db — database name (CLJS: opens IndexedDB) + :store — pre-opened store or promise of a store + :server — server URL for sync + :interval — sync interval in ms (default 30000)" + [{:keys [db store server interval] :or {interval 30000}}] + (let [store-pr (cond + (some? store) (p/promise store) + #?@(:cljs [(some? db) (idb/open db)]) + :else (throw (ex-info "connect requires :store or :db" {})))] + (->Connection store-pr {:server server :interval interval}))) + ;; --------------------------------------------------------------------------- ;; Internal helpers ;; --------------------------------------------------------------------------- @@ -64,7 +87,7 @@ ;; --------------------------------------------------------------------------- (deftype SyncedAtom [group ;; string prefix, e.g. "todo" - store ;; PStore implementation + store ;; atom containing PStore implementation cache ;; atom containing {id -> value} versions ;; atom containing {id -> version} pending ;; atom containing #{id} — unsynced ids @@ -92,7 +115,7 @@ clojure.lang.IAtom (reset [this newval] - (do-reset!* store cache versions pending this newval)) + (do-reset!* @store cache versions pending this newval)) (swap [this f] (.reset this (f @cache))) (swap [this f arg] @@ -104,7 +127,10 @@ (compareAndSet [this old new] (if (= @cache old) (do (.reset this new) true) - false))] + false)) + + pt/IPromiseFactory + (-promise [_] ready-pr)] :cljs [IAtom @@ -114,7 +140,7 @@ IReset (-reset! [this new-val] - (do-reset!* store cache versions pending this new-val)) + (do-reset!* @store cache versions pending this new-val)) ISwap (-swap! [o f] @@ -140,6 +166,9 @@ IWithMeta (-with-meta [_ m] (reset! _meta m)) + pt/IPromiseFactory + (-promise [_] ready-pr) + IPrintWithWriter (-pr-writer [_ writer _opts] (-write writer (str "#")))])) @@ -153,7 +182,7 @@ Returns a promise that resolves when done." [sa] (p/let [prefix (prefix-str (.-group sa)) - docs (store/docs-by-prefix (.-store sa) prefix) + docs (store/docs-by-prefix @(.-store sa) prefix) state (into {} (comp (remove :deleted) @@ -164,7 +193,7 @@ docs) _ (do (reset! (.-cache sa) state) (reset! (.-versions sa) vers)) - ls (store/get-meta (.-store sa) + ls (store/get-meta @(.-store sa) (str "last-sync:" (.-group sa)))] (reset! (.-last_sync sa) (or ls 0)) true)) @@ -176,7 +205,7 @@ (swap! (.-versions sa) assoc id version) (swap! (.-pending sa) disj id) (let [value (get @(.-cache sa) id)] - (store/put-doc! (.-store sa) + (store/put-doc! @(.-store sa) {:id id :value value :version version @@ -206,17 +235,17 @@ (do (swap! (.-cache sa) dissoc id) (swap! (.-versions sa) assoc id (:version doc)) - (store/put-doc! (.-store sa) + (store/put-doc! @(.-store sa) {:id id :value nil :version (:version doc) :updated (:updated doc) :deleted true :synced true})) (do (swap! (.-cache sa) assoc id (:value doc)) (swap! (.-versions sa) assoc id (:version doc)) - (store/put-doc! (.-store sa) + (store/put-doc! @(.-store sa) {:id id :value (:value doc) :version (:version doc) :updated (:updated doc) :deleted false :synced true})))))] (reset! (.-last_sync sa) max-ts) - (store/set-meta! (.-store sa) + (store/set-meta! @(.-store sa) (str "last-sync:" (.-group sa)) max-ts))))) (p/resolved nil))) @@ -248,7 +277,7 @@ (swap! (.-cache sa) assoc (:id r) (:value r))) (swap! (.-versions sa) assoc (:id r) (:current-version r)) (swap! (.-pending sa) disj (:id r)) - (store/put-doc! (.-store sa) + (store/put-doc! @(.-store sa) {:id (:id r) :value (or (:value r) (get @(.-cache sa) (:id r))) :version (:current-version r) @@ -356,13 +385,22 @@ (defn synced-atom "Create a synced atom for a document group. - Options: + Accepts a Connection (from `connect`) or a raw store for backwards compat. + + Options (per-atom overrides, merged with connection defaults): :server — server URL (e.g. \"http://localhost:8090/sync\") :cache — custom atom to use (e.g. reagent/atom). Default: cljs.core/atom - :interval — sync interval in ms (default 30000)" - [store group & [{:keys [server cache interval] - :or {interval 30000}}]] - (let [cache-atom (or cache (atom {})) + :interval — sync interval in ms (default 30000) + + Returns a SyncedAtom that implements IPromiseFactory, so it works with p/let: + (p/let [sa (synced-atom conn \"todo\")] ...)" + [conn-or-store group & [opts]] + (let [[store-pr conn-opts] (if (instance? Connection conn-or-store) + [(:store-promise conn-or-store) (:opts conn-or-store)] + [(p/resolved conn-or-store) {}]) + {:keys [server cache interval] :or {interval 30000}} (merge conn-opts opts) + store-atom (atom nil) + cache-atom (or cache (atom {})) versions (atom {}) pending (atom #{}) server-opts (when server {:server server}) @@ -374,12 +412,14 @@ syncing? (atom false) cleanup-fn (atom nil) meta-atom (atom nil) - sa (SyncedAtom. group store cache-atom versions pending + sa (SyncedAtom. group store-atom cache-atom versions pending server-opts last-sync ready-pr sync-timer push-timer pushing? syncing? cleanup-fn interval meta-atom)] - (-> (load-from-store! sa) + (-> (p/let [store store-pr] + (reset! store-atom store) + (load-from-store! sa)) (p/then (fn [_] - (p/resolve! ready-pr true) + (p/resolve! ready-pr sa) (when server-opts (-> (do-sync! sa) (p/then (fn [_] (start-sync-loop! sa))))))) @@ -388,7 +428,8 @@ sa)) (defn ready? - "Returns a promise that yields true when the atom has finished loading from the store." + "Returns a promise that resolves to the SyncedAtom when initial load is complete. + The SyncedAtom itself implements IPromiseFactory, so (p/let [sa atom] ...) also works." [sa] (.-ready_pr sa)) @@ -411,3 +452,9 @@ :cljs (js/clearTimeout t))) ;; Run all cleanup fns (timer, connectivity, SSE) (doseq [f @(.-cleanup_fn sa)] (f))) + +(defn close! + "Close the underlying store of a Connection." + [conn] + (p/let [store (:store-promise conn)] + (store/close-store! store))) diff --git a/test/atomsync/core_test.clj b/test/atomsync/core_test.clj index 8997170..9229945 100644 --- a/test/atomsync/core_test.clj +++ b/test/atomsync/core_test.clj @@ -57,6 +57,99 @@ ;; Tests ;; --------------------------------------------------------------------------- +;; --------------------------------------------------------------------------- +;; Connection API tests +;; --------------------------------------------------------------------------- + +(deftest connect-with-store + (testing "connect accepts a :store option" + (let [store (memory/create) + conn (pb/connect {:store store})] + (is (instance? atomsync.core.Connection conn)) + (is (some? (:store-promise conn)))))) + +(deftest connect-synced-atom-local + (testing "synced-atom works with a Connection (no server)" + (let [conn (pb/connect {:store (memory/create)}) + sa (pb/synced-atom conn "todo")] + (await! (pb/ready? sa) 1000) + (is (= {} @sa)) + + (swap! sa assoc "todo:1" {:text "Buy milk"}) + (is (= {:text "Buy milk"} (get @sa "todo:1"))) + + (pb/destroy! sa)))) + +(deftest connect-synced-atom-with-server + (testing "synced-atom with Connection inherits server opts" + (let [conn (pb/connect {:store (memory/create) + :server (server-url) + :interval 60000}) + sa (pb/synced-atom conn "todo")] + (await! (pb/ready? sa) 2000) + + (swap! sa assoc "todo:c1" {:text "Via conn"}) + (Thread/sleep 500) + + (is (zero? (pb/pending-count sa))) + (pb/destroy! sa)))) + +(deftest connect-shared-across-atoms + (testing "Multiple synced atoms share a Connection" + (let [conn (pb/connect {:store (memory/create)}) + todos (pb/synced-atom conn "todo") + notes (pb/synced-atom conn "note")] + (await! (pb/ready? todos) 1000) + (await! (pb/ready? notes) 1000) + + (swap! todos assoc "todo:1" {:text "Do thing"}) + (swap! notes assoc "note:1" {:text "A note"}) + + (is (= 1 (count @todos))) + (is (= 1 (count @notes))) + (is (nil? (get @todos "note:1")) "Groups are isolated") + + (pb/destroy! todos) + (pb/destroy! notes)))) + +(deftest synced-atom-promise-protocol + (testing "SyncedAtom works with p/let directly" + (let [conn (pb/connect {:store (memory/create)}) + sa (pb/synced-atom conn "todo")] + ;; p/let on the SA itself should resolve to the SA + (let [result (await! (p/let [resolved-sa sa] + (swap! resolved-sa assoc "todo:1" {:text "Hi"}) + resolved-sa) + 1000)] + (is (= sa result) "p/let resolves to the SyncedAtom itself") + (is (= {:text "Hi"} (get @sa "todo:1")))) + (pb/destroy! sa)))) + +(deftest close-connection + (testing "close! closes the store" + (let [store (memory/create) + conn (pb/connect {:store store})] + ;; Should not throw + (await! (pb/close! conn) 1000)))) + +(deftest per-atom-overrides + (testing "synced-atom can override connection defaults" + (let [conn (pb/connect {:store (memory/create) + :server "http://localhost:9999/sync" + :interval 60000}) + ;; Override: no server for this particular atom + sa (pb/synced-atom conn "todo" {:server nil})] + (await! (pb/ready? sa) 1000) + + (swap! sa assoc "todo:1" {:text "Local only"}) + (is (= {:text "Local only"} (get @sa "todo:1"))) + + (pb/destroy! sa)))) + +;; --------------------------------------------------------------------------- +;; Original tests (raw store API — backwards compat) +;; --------------------------------------------------------------------------- + (deftest synced-atom-local-only (testing "SyncedAtom works without a server (local store only)" (let [store (memory/create)