feat: instant sync — push on write, pull on reconnect, render from cache

- Add kick-ch to SyncedAtom: swap!/reset! trigger immediate push
  instead of waiting for the 15s sync loop interval
- SSE: pull on reconnect ("connected" message), not just on new events,
  so coming back online picks up missed server changes
- TodoMVC: render and bind events before IDB load completes; the watch
  re-renders automatically when cached data arrives
This commit is contained in:
Florian Schroedl
2026-04-16 19:09:08 +02:00
parent 38a15b7a34
commit 06d0fa5e05
3 changed files with 23 additions and 15 deletions

View File

@@ -247,18 +247,16 @@
:interval 15000})] :interval 15000})]
(reset! !conn conn) (reset! !conn conn)
(reset! !todos todos) (reset! !todos todos)
;; Wait for IDB load ;; Render + bind immediately (empty or stale is fine)
(<! (pb/ready? todos))
;; Initial render
(render!) (render!)
(bind-events!) (bind-events!)
;; Re-render on any data change ;; Re-render on any data change (fires when IDB loads + server syncs)
(add-watch todos :render (fn [_ _ _ _] (render!))) (add-watch todos :render (fn [_ _ _ _] (render!)))
;; Re-render on filter change
(add-watch !filter :render (fn [_ _ _ _] (render!))) (add-watch !filter :render (fn [_ _ _ _] (render!)))
;; Online/offline status updates
(.addEventListener js/window "online" (fn [_] (render!))) (.addEventListener js/window "online" (fn [_] (render!)))
(.addEventListener js/window "offline" (fn [_] (render!))) (.addEventListener js/window "offline" (fn [_] (render!)))
;; Wait for IDB — watch triggers render automatically
(<! (pb/ready? todos))
(js/console.log "🔶 Pocketbook TodoMVC loaded —" (count @todos) "todos")))) (js/console.log "🔶 Pocketbook TodoMVC loaded —" (count @todos) "todos"))))
(init) (init)

View File

@@ -51,6 +51,7 @@
last-sync ;; atom containing epoch ms last-sync ;; atom containing epoch ms
ready-ch ;; channel, closed when initial load complete ready-ch ;; channel, closed when initial load complete
stop-ch ;; channel to signal stop stop-ch ;; channel to signal stop
kick-ch ;; channel to trigger immediate push
cleanup-fn ;; atom holding connectivity cleanup fn cleanup-fn ;; atom holding connectivity cleanup fn
sync-interval ;; ms sync-interval ;; ms
_meta] ;; metadata atom _meta] ;; metadata atom
@@ -67,9 +68,11 @@
(let [old @cache] (let [old @cache]
(reset! cache new-val) (reset! cache new-val)
;; Track which docs changed/added/removed ;; Track which docs changed/added/removed
(let [all-keys (into (set (keys old)) (keys new-val))] (let [all-keys (into (set (keys old)) (keys new-val))
changed? (volatile! false)]
(doseq [k all-keys] (doseq [k all-keys]
(when (not= (get old k) (get new-val k)) (when (not= (get old k) (get new-val k))
(vreset! changed? true)
(swap! pending conj k) (swap! pending conj k)
;; Write to IDB ;; Write to IDB
(let [v (get new-val k)] (let [v (get new-val k)]
@@ -80,7 +83,10 @@
:updated (.now js/Date) :deleted true :synced false}) :updated (.now js/Date) :deleted true :synced false})
(idb/put-doc! (:db conn) (idb/put-doc! (:db conn)
{:id k :value v :version (get @versions k 0) {:id k :value v :version (get @versions k 0)
:updated (.now js/Date) :deleted false :synced false})))))) :updated (.now js/Date) :deleted false :synced false})))))
;; Kick the sync loop to push immediately
(when @changed?
(put! kick-ch :kick)))
new-val)) new-val))
ISwap ISwap
@@ -276,13 +282,16 @@
"Start the background sync loop. Returns a stop function." "Start the background sync loop. Returns a stop function."
[sa] [sa]
(let [stop-ch (.-stop_ch sa) (let [stop-ch (.-stop_ch sa)
kick-ch (.-kick_ch sa)
interval (.-sync_interval sa) interval (.-sync_interval sa)
cleanups (atom [])] cleanups (atom [])]
;; Periodic sync (fallback) ;; Periodic sync (fallback) + immediate push on kick
(go-loop [] (go-loop []
(let [[_ ch] (alts! [stop-ch (timeout interval)])] (let [[_ ch] (alts! [stop-ch kick-ch (timeout interval)])]
(when-not (= ch stop-ch) (when-not (= ch stop-ch)
(<! (do-sync! sa)) (if (= ch kick-ch)
(<! (do-push! sa)) ;; kick = local write, just push
(<! (do-sync! sa))) ;; timeout = full pull+push
(recur)))) (recur))))
;; Online/offline handler ;; Online/offline handler
(swap! cleanups conj (swap! cleanups conj
@@ -322,10 +331,11 @@
last-sync (atom 0) last-sync (atom 0)
ready-ch (chan 1) ready-ch (chan 1)
stop-ch (chan 1) stop-ch (chan 1)
kick-ch (chan (async/sliding-buffer 1))
cleanup-fn (atom nil) cleanup-fn (atom nil)
meta-atom (atom nil) meta-atom (atom nil)
sa (SyncedAtom. group conn cache-atom versions pending sa (SyncedAtom. group conn cache-atom versions pending
server-opts last-sync ready-ch stop-ch server-opts last-sync ready-ch stop-ch kick-ch
cleanup-fn interval meta-atom)] cleanup-fn interval meta-atom)]
;; Load from IDB, then start sync ;; Load from IDB, then start sync
(go (go

View File

@@ -108,9 +108,9 @@
(set! (.-onmessage es) (set! (.-onmessage es)
(fn [e] (fn [e]
(let [data (.-data e)] (let [data (.-data e)]
;; Skip initial "connected" message ;; "connected" fires on initial connect AND every reconnect
(when (not= data "connected") ;; Always trigger a sync — picks up missed changes after reconnect
(on-change data))))) (on-change data))))
(set! (.-onerror es) (set! (.-onerror es)
(fn [_e] (fn [_e]
;; EventSource auto-reconnects; nothing to do ;; EventSource auto-reconnects; nothing to do