feat: add Connection API and make SyncedAtom a promise

Add `connect` to manage store + shared config, and `synced-atom` now
accepts a Connection or raw store (backwards compatible).

SyncedAtom implements IPromiseFactory so it works directly with p/let:
  (def conn (pb/connect {:db "my-app" :server "..."}))
  (def !todos (pb/synced-atom conn "todo"))
  (p/let [todos !todos] (swap! todos assoc "k" v))

- Connection record holds store-promise + default opts (server, interval)
- `connect` accepts :store (CLJ/CLJS) or :db (CLJS, opens IndexedDB)
- `ready?` now resolves to the SyncedAtom itself instead of `true`
- Add `close!` to close a Connection's store
- Store field is now an atom (set when store-promise resolves)
- Simplify TodoMVC example: no more !conn atom or double-deref
This commit is contained in:
Florian Schroedl
2026-04-18 16:51:09 +02:00
parent 267d52e4ce
commit 78b897cc25
3 changed files with 189 additions and 54 deletions

View File

@@ -1,7 +1,6 @@
(ns atomsync.todomvc
"TodoMVC built on Atomsync — offline-first, synced, Clojure-native."
(:require [atomsync.core :as pb]
[atomsync.store.idb :as idb]
[atomsync.hiccup :refer [html]]
[promesa.core :as p]
[clojure.string :as str]))
@@ -10,8 +9,10 @@
;; State
;; ---------------------------------------------------------------------------
(defonce !conn (atom nil))
(defonce !todos (atom nil)) ;; the SyncedAtom
(defonce conn (pb/connect {:db "atomsync-todomvc"
:server "http://localhost:8090/sync"
:interval 15000}))
(defonce !todos (pb/synced-atom conn "todo"))
(defonce !filter (atom :all)) ;; :all | :active | :completed
(defonce !editing (atom nil)) ;; id of todo being edited, or nil
@@ -22,7 +23,7 @@
(defn- todos-list
"Return todos as sorted vec of [id doc] pairs."
[]
(->> @@!todos
(->> @!todos
(sort-by (fn [[_ doc]] (:created doc)))
vec))
@@ -50,33 +51,33 @@
(let [text (str/trim text)]
(when (seq text)
(let [id (str "todo:" (random-uuid))]
(swap! @!todos assoc id
(swap! !todos assoc id
{:text text
:completed false
:created (.now js/Date)})))))
(defn- toggle-todo! [id]
(swap! @!todos update-in [id :completed] not))
(swap! !todos update-in [id :completed] not))
(defn- toggle-all! []
(let [target (not (all-completed?))]
(swap! @!todos
(swap! !todos
(fn [m]
(reduce-kv (fn [acc k v] (assoc acc k (assoc v :completed target)))
{} m)))))
(defn- destroy-todo! [id]
(swap! @!todos dissoc id))
(swap! !todos dissoc id))
(defn- edit-todo! [id new-text]
(let [text (str/trim new-text)]
(if (seq text)
(swap! @!todos assoc-in [id :text] text)
(swap! !todos assoc-in [id :text] text)
(destroy-todo! id))
(reset! !editing nil)))
(defn- clear-completed! []
(swap! @!todos
(swap! !todos
(fn [m]
(into {} (remove (fn [[_ v]] (:completed v))) m))))
@@ -119,7 +120,7 @@
"Clear completed"])]))
(defn- render-sync-status []
(let [pending (when @!todos (pb/pending-count @!todos))
(let [pending (pb/pending-count !todos)
online? (.-onLine js/navigator)]
[:div.sync-bar
[:span {:class (str "sync-dot " (if online? "online" "offline"))}]
@@ -232,22 +233,16 @@
;; ---------------------------------------------------------------------------
(defn ^:export init []
(p/let [store (idb/open "atomsync-todomvc")]
(let [todos (pb/synced-atom store "todo"
{:server "http://localhost:8090/sync"
:interval 15000})]
(reset! !conn store)
(reset! !todos todos)
;; Render + bind immediately (empty or stale is fine)
(render!)
(bind-events!)
;; Re-render on any data change (fires when IDB loads + server syncs)
(add-watch todos :render (fn [_ _ _ _] (render!)))
(add-watch !filter :render (fn [_ _ _ _] (render!)))
(.addEventListener js/window "online" (fn [_] (render!)))
(.addEventListener js/window "offline" (fn [_] (render!)))
;; Wait for IDB — watch triggers render automatically
(p/let [_ (pb/ready? todos)]
(js/console.log "🔶 Atomsync TodoMVC loaded —" (count @todos) "todos")))))
;; Render + bind immediately (empty or stale is fine)
(render!)
(bind-events!)
;; Re-render on any data change (fires when IDB loads + server syncs)
(add-watch !todos :render (fn [_ _ _ _] (render!)))
(add-watch !filter :render (fn [_ _ _ _] (render!)))
(.addEventListener js/window "online" (fn [_] (render!)))
(.addEventListener js/window "offline" (fn [_] (render!)))
;; Wait for IDB — watch triggers render automatically
(p/let [todos !todos]
(js/console.log "🔶 Atomsync TodoMVC loaded —" (count @todos) "todos")))
(init)

View File

@@ -2,19 +2,42 @@
"Atomsync: a Clojure-native synced atom.
Usage:
(def store @(idb/open \"my-app\")) ;; or (memory/create)
(def todos (pb/synced-atom store \"todo\"
{:server \"http://localhost:8090/sync\"}))
(.then (ready? todos)
(fn [_] (swap! todos assoc \"todo:1\" {:text \"Buy milk\"})))
(def conn (connect {:db \"my-app\"
:server \"http://localhost:8090/sync\"}))
(def todos (synced-atom conn \"todo\"))
(p/let [todos todos]
(swap! todos assoc \"todo:1\" {:text \"Buy milk\"}))
@todos ;=> {\"todo:1\" {:text \"Buy milk\"}}
"
(:require [atomsync.store :as store]
[atomsync.sync :as sync]
[clojure.string :as str]
[promesa.core :as p])
[promesa.core :as p]
[promesa.protocols :as pt]
#?(:cljs [atomsync.store.idb :as idb]))
#?(:clj (:import [java.util.concurrent Executors ScheduledExecutorService TimeUnit])))
;; ---------------------------------------------------------------------------
;; Connection
;; ---------------------------------------------------------------------------
(defrecord Connection [store-promise opts])
(defn connect
"Create a connection for synced atoms.
Options:
:db — database name (CLJS: opens IndexedDB)
:store — pre-opened store or promise of a store
:server — server URL for sync
:interval — sync interval in ms (default 30000)"
[{:keys [db store server interval] :or {interval 30000}}]
(let [store-pr (cond
(some? store) (p/promise store)
#?@(:cljs [(some? db) (idb/open db)])
:else (throw (ex-info "connect requires :store or :db" {})))]
(->Connection store-pr {:server server :interval interval})))
;; ---------------------------------------------------------------------------
;; Internal helpers
;; ---------------------------------------------------------------------------
@@ -64,7 +87,7 @@
;; ---------------------------------------------------------------------------
(deftype SyncedAtom [group ;; string prefix, e.g. "todo"
store ;; PStore implementation
store ;; atom containing PStore implementation
cache ;; atom containing {id -> value}
versions ;; atom containing {id -> version}
pending ;; atom containing #{id} — unsynced ids
@@ -92,7 +115,7 @@
clojure.lang.IAtom
(reset [this newval]
(do-reset!* store cache versions pending this newval))
(do-reset!* @store cache versions pending this newval))
(swap [this f]
(.reset this (f @cache)))
(swap [this f arg]
@@ -104,7 +127,10 @@
(compareAndSet [this old new]
(if (= @cache old)
(do (.reset this new) true)
false))]
false))
pt/IPromiseFactory
(-promise [_] ready-pr)]
:cljs
[IAtom
@@ -114,7 +140,7 @@
IReset
(-reset! [this new-val]
(do-reset!* store cache versions pending this new-val))
(do-reset!* @store cache versions pending this new-val))
ISwap
(-swap! [o f]
@@ -140,6 +166,9 @@
IWithMeta
(-with-meta [_ m] (reset! _meta m))
pt/IPromiseFactory
(-promise [_] ready-pr)
IPrintWithWriter
(-pr-writer [_ writer _opts]
(-write writer (str "#<SyncedAtom[" group "] " (count @cache) " docs>")))]))
@@ -153,7 +182,7 @@
Returns a promise that resolves when done."
[sa]
(p/let [prefix (prefix-str (.-group sa))
docs (store/docs-by-prefix (.-store sa) prefix)
docs (store/docs-by-prefix @(.-store sa) prefix)
state (into {}
(comp
(remove :deleted)
@@ -164,7 +193,7 @@
docs)
_ (do (reset! (.-cache sa) state)
(reset! (.-versions sa) vers))
ls (store/get-meta (.-store sa)
ls (store/get-meta @(.-store sa)
(str "last-sync:" (.-group sa)))]
(reset! (.-last_sync sa) (or ls 0))
true))
@@ -176,7 +205,7 @@
(swap! (.-versions sa) assoc id version)
(swap! (.-pending sa) disj id)
(let [value (get @(.-cache sa) id)]
(store/put-doc! (.-store sa)
(store/put-doc! @(.-store sa)
{:id id
:value value
:version version
@@ -206,17 +235,17 @@
(do
(swap! (.-cache sa) dissoc id)
(swap! (.-versions sa) assoc id (:version doc))
(store/put-doc! (.-store sa)
(store/put-doc! @(.-store sa)
{:id id :value nil :version (:version doc)
:updated (:updated doc) :deleted true :synced true}))
(do
(swap! (.-cache sa) assoc id (:value doc))
(swap! (.-versions sa) assoc id (:version doc))
(store/put-doc! (.-store sa)
(store/put-doc! @(.-store sa)
{:id id :value (:value doc) :version (:version doc)
:updated (:updated doc) :deleted false :synced true})))))]
(reset! (.-last_sync sa) max-ts)
(store/set-meta! (.-store sa)
(store/set-meta! @(.-store sa)
(str "last-sync:" (.-group sa)) max-ts)))))
(p/resolved nil)))
@@ -248,7 +277,7 @@
(swap! (.-cache sa) assoc (:id r) (:value r)))
(swap! (.-versions sa) assoc (:id r) (:current-version r))
(swap! (.-pending sa) disj (:id r))
(store/put-doc! (.-store sa)
(store/put-doc! @(.-store sa)
{:id (:id r)
:value (or (:value r) (get @(.-cache sa) (:id r)))
:version (:current-version r)
@@ -356,13 +385,22 @@
(defn synced-atom
"Create a synced atom for a document group.
Options:
Accepts a Connection (from `connect`) or a raw store for backwards compat.
Options (per-atom overrides, merged with connection defaults):
:server — server URL (e.g. \"http://localhost:8090/sync\")
:cache — custom atom to use (e.g. reagent/atom). Default: cljs.core/atom
:interval — sync interval in ms (default 30000)"
[store group & [{:keys [server cache interval]
:or {interval 30000}}]]
(let [cache-atom (or cache (atom {}))
:interval — sync interval in ms (default 30000)
Returns a SyncedAtom that implements IPromiseFactory, so it works with p/let:
(p/let [sa (synced-atom conn \"todo\")] ...)"
[conn-or-store group & [opts]]
(let [[store-pr conn-opts] (if (instance? Connection conn-or-store)
[(:store-promise conn-or-store) (:opts conn-or-store)]
[(p/resolved conn-or-store) {}])
{:keys [server cache interval] :or {interval 30000}} (merge conn-opts opts)
store-atom (atom nil)
cache-atom (or cache (atom {}))
versions (atom {})
pending (atom #{})
server-opts (when server {:server server})
@@ -374,12 +412,14 @@
syncing? (atom false)
cleanup-fn (atom nil)
meta-atom (atom nil)
sa (SyncedAtom. group store cache-atom versions pending
sa (SyncedAtom. group store-atom cache-atom versions pending
server-opts last-sync ready-pr sync-timer push-timer
pushing? syncing? cleanup-fn interval meta-atom)]
(-> (load-from-store! sa)
(-> (p/let [store store-pr]
(reset! store-atom store)
(load-from-store! sa))
(p/then (fn [_]
(p/resolve! ready-pr true)
(p/resolve! ready-pr sa)
(when server-opts
(-> (do-sync! sa)
(p/then (fn [_] (start-sync-loop! sa)))))))
@@ -388,7 +428,8 @@
sa))
(defn ready?
"Returns a promise that yields true when the atom has finished loading from the store."
"Returns a promise that resolves to the SyncedAtom when initial load is complete.
The SyncedAtom itself implements IPromiseFactory, so (p/let [sa atom] ...) also works."
[sa]
(.-ready_pr sa))
@@ -411,3 +452,9 @@
:cljs (js/clearTimeout t)))
;; Run all cleanup fns (timer, connectivity, SSE)
(doseq [f @(.-cleanup_fn sa)] (f)))
(defn close!
"Close the underlying store of a Connection."
[conn]
(p/let [store (:store-promise conn)]
(store/close-store! store)))

View File

@@ -57,6 +57,99 @@
;; Tests
;; ---------------------------------------------------------------------------
;; ---------------------------------------------------------------------------
;; Connection API tests
;; ---------------------------------------------------------------------------
(deftest connect-with-store
(testing "connect accepts a :store option"
(let [store (memory/create)
conn (pb/connect {:store store})]
(is (instance? atomsync.core.Connection conn))
(is (some? (:store-promise conn))))))
(deftest connect-synced-atom-local
(testing "synced-atom works with a Connection (no server)"
(let [conn (pb/connect {:store (memory/create)})
sa (pb/synced-atom conn "todo")]
(await! (pb/ready? sa) 1000)
(is (= {} @sa))
(swap! sa assoc "todo:1" {:text "Buy milk"})
(is (= {:text "Buy milk"} (get @sa "todo:1")))
(pb/destroy! sa))))
(deftest connect-synced-atom-with-server
(testing "synced-atom with Connection inherits server opts"
(let [conn (pb/connect {:store (memory/create)
:server (server-url)
:interval 60000})
sa (pb/synced-atom conn "todo")]
(await! (pb/ready? sa) 2000)
(swap! sa assoc "todo:c1" {:text "Via conn"})
(Thread/sleep 500)
(is (zero? (pb/pending-count sa)))
(pb/destroy! sa))))
(deftest connect-shared-across-atoms
(testing "Multiple synced atoms share a Connection"
(let [conn (pb/connect {:store (memory/create)})
todos (pb/synced-atom conn "todo")
notes (pb/synced-atom conn "note")]
(await! (pb/ready? todos) 1000)
(await! (pb/ready? notes) 1000)
(swap! todos assoc "todo:1" {:text "Do thing"})
(swap! notes assoc "note:1" {:text "A note"})
(is (= 1 (count @todos)))
(is (= 1 (count @notes)))
(is (nil? (get @todos "note:1")) "Groups are isolated")
(pb/destroy! todos)
(pb/destroy! notes))))
(deftest synced-atom-promise-protocol
(testing "SyncedAtom works with p/let directly"
(let [conn (pb/connect {:store (memory/create)})
sa (pb/synced-atom conn "todo")]
;; p/let on the SA itself should resolve to the SA
(let [result (await! (p/let [resolved-sa sa]
(swap! resolved-sa assoc "todo:1" {:text "Hi"})
resolved-sa)
1000)]
(is (= sa result) "p/let resolves to the SyncedAtom itself")
(is (= {:text "Hi"} (get @sa "todo:1"))))
(pb/destroy! sa))))
(deftest close-connection
(testing "close! closes the store"
(let [store (memory/create)
conn (pb/connect {:store store})]
;; Should not throw
(await! (pb/close! conn) 1000))))
(deftest per-atom-overrides
(testing "synced-atom can override connection defaults"
(let [conn (pb/connect {:store (memory/create)
:server "http://localhost:9999/sync"
:interval 60000})
;; Override: no server for this particular atom
sa (pb/synced-atom conn "todo" {:server nil})]
(await! (pb/ready? sa) 1000)
(swap! sa assoc "todo:1" {:text "Local only"})
(is (= {:text "Local only"} (get @sa "todo:1")))
(pb/destroy! sa))))
;; ---------------------------------------------------------------------------
;; Original tests (raw store API — backwards compat)
;; ---------------------------------------------------------------------------
(deftest synced-atom-local-only
(testing "SyncedAtom works without a server (local store only)"
(let [store (memory/create)