diff --git a/bb.edn b/bb.edn index e9bf3f7..e48548a 100644 --- a/bb.edn +++ b/bb.edn @@ -14,7 +14,7 @@ test {:doc "Run all server tests" :task (let [expr (str "(require 'pocketbook.db-test 'pocketbook.transit-test" - " 'pocketbook.server-test)" + " 'pocketbook.server-test 'pocketbook.core-test)" " (let [r (clojure.test/run-all-tests #\"pocketbook\\..*\")]" " (System/exit (if (and (zero? (:fail r)) (zero? (:error r))) 0 1)))")] (shell "clj" "-M:dev" "-e" expr))} diff --git a/deps.edn b/deps.edn index 48726ec..78ac008 100644 --- a/deps.edn +++ b/deps.edn @@ -1,5 +1,6 @@ {:paths ["src"] :deps {org.clojure/clojure {:mvn/version "1.12.0"} + org.clojure/core.async {:mvn/version "1.7.701"} http-kit/http-kit {:mvn/version "2.8.0"} com.cognitect/transit-clj {:mvn/version "1.0.333"} com.taoensso/nippy {:mvn/version "3.4.2"} diff --git a/example/todomvc/pocketbook/todomvc.cljs b/example/todomvc/pocketbook/todomvc.cljs index aa240e2..e8b65e8 100644 --- a/example/todomvc/pocketbook/todomvc.cljs +++ b/example/todomvc/pocketbook/todomvc.cljs @@ -1,6 +1,7 @@ (ns pocketbook.todomvc "TodoMVC built on Pocketbook — offline-first, synced, Clojure-native." (:require [pocketbook.core :as pb] + [pocketbook.store.idb :as idb] [cljs.core.async :refer [go {\"todo:1\" {:text \"Buy milk\"}} " - (:require [pocketbook.idb :as idb] + (:require [pocketbook.store :as store] [pocketbook.sync :as sync] [clojure.string :as str] - [cljs.core.async :as async :refer [go go-loop ! chan put! timeout alts!]])) + #?(:clj [clojure.core.async :as async :refer [go go-loop ! chan put! timeout alts!]] + :cljs [cljs.core.async :as async :refer [go go-loop ! chan put! timeout alts!]]))) ;; --------------------------------------------------------------------------- -;; Connection (IDB handle) +;; Internal helpers ;; --------------------------------------------------------------------------- -(defn open - "Open a Pocketbook connection (IndexedDB database). - Returns a channel yielding the connection map." - [db-name] - (let [ch (chan 1)] - (go - (let [db (! ch {:db db :db-name db-name}) - (async/close! ch))) - ch)) +(defn- now-ms [] + #?(:clj (System/currentTimeMillis) + :cljs (.now js/Date))) -(defn shutdown! - "Close a Pocketbook connection." - [{:keys [db atoms]}] - ;; Stop all sync loops - (doseq [[_ sa] @(or atoms (atom {}))] - (when-let [stop (:stop-fn sa)] - (stop))) - (idb/close-db! db)) +(defn- prefix-str [group] + (str group ":")) ;; --------------------------------------------------------------------------- -;; Synced Atom — implements IAtom semantics +;; Shared reset logic +;; --------------------------------------------------------------------------- + +(defn- do-reset!* + "Shared reset implementation. Updates cache, tracks pending, writes to store, kicks sync." + [store cache versions pending kick-ch new-val] + (let [old @cache] + (reset! cache new-val) + (let [all-keys (into (set (keys old)) (keys new-val)) + changed? (volatile! false)] + (doseq [k all-keys] + (when (not= (get old k) (get new-val k)) + (vreset! changed? true) + (swap! pending conj k) + (let [v (get new-val k)] + (if (nil? v) + (store/put-doc! store + {:id k :value nil :version (get @versions k 0) + :updated (now-ms) :deleted true :synced false}) + (store/put-doc! store + {:id k :value v :version (get @versions k 0) + :updated (now-ms) :deleted false :synced false}))))) + (when @changed? + (put! kick-ch :kick))) + new-val)) + +;; --------------------------------------------------------------------------- +;; Synced Atom ;; --------------------------------------------------------------------------- (deftype SyncedAtom [group ;; string prefix, e.g. "todo" - conn ;; {:db idb, ...} + store ;; PStore implementation cache ;; atom containing {id -> value} versions ;; atom containing {id -> version} pending ;; atom containing #{id} — unsynced ids @@ -56,93 +71,83 @@ sync-interval ;; ms _meta] ;; metadata atom - IAtom + #?@(:clj + [clojure.lang.IDeref + (deref [_] @cache) - IDeref - (-deref [_] - @cache) + clojure.lang.IRef + (addWatch [this key f] (add-watch cache key f) this) + (removeWatch [this key] (remove-watch cache key) this) + (getWatches [_] (.getWatches ^clojure.lang.IRef cache)) + (getValidator [_] nil) + (setValidator [_ _vf] nil) - IReset - (-reset! [_ new-val] - ;; Replace the entire cache (all docs in group) - (let [old @cache] - (reset! cache new-val) - ;; Track which docs changed/added/removed - (let [all-keys (into (set (keys old)) (keys new-val)) - changed? (volatile! false)] - (doseq [k all-keys] - (when (not= (get old k) (get new-val k)) - (vreset! changed? true) - (swap! pending conj k) - ;; Write to IDB - (let [v (get new-val k)] - (if (nil? v) - ;; Doc was dissoc'd — mark deleted - (idb/put-doc! (:db conn) - {:id k :value nil :version (get @versions k 0) - :updated (.now js/Date) :deleted true :synced false}) - (idb/put-doc! (:db conn) - {:id k :value v :version (get @versions k 0) - :updated (.now js/Date) :deleted false :synced false}))))) - ;; Kick the sync loop to push immediately - (when @changed? - (put! kick-ch :kick))) - new-val)) + clojure.lang.IAtom + (reset [_ newval] + (do-reset!* store cache versions pending kick-ch newval)) + (swap [this f] + (.reset this (f @cache))) + (swap [this f arg] + (.reset this (f @cache arg))) + (swap [this f arg1 arg2] + (.reset this (f @cache arg1 arg2))) + (swap [this f x y args] + (.reset this (apply f @cache x y args))) + (compareAndSet [this old new] + (if (= @cache old) + (do (.reset this new) true) + false))] - ISwap - (-swap! [o f] - (-reset! o (f @cache))) - (-swap! [o f a] - (-reset! o (f @cache a))) - (-swap! [o f a b] - (-reset! o (f @cache a b))) - (-swap! [o f a b xs] - (-reset! o (apply f @cache a b xs))) + :cljs + [IAtom - IWatchable - (-add-watch [_ key f] - (add-watch cache key f)) - (-remove-watch [_ key] - (remove-watch cache key)) - (-notify-watches [_ old new] - ;; Delegated to the inner atom - nil) + IDeref + (-deref [_] @cache) - IMeta - (-meta [_] @_meta) + IReset + (-reset! [_ new-val] + (do-reset!* store cache versions pending kick-ch new-val)) - IWithMeta - (-with-meta [_ m] (reset! _meta m)) + ISwap + (-swap! [o f] + (-reset! o (f @cache))) + (-swap! [o f a] + (-reset! o (f @cache a))) + (-swap! [o f a b] + (-reset! o (f @cache a b))) + (-swap! [o f a b xs] + (-reset! o (apply f @cache a b xs))) - IPrintWithWriter - (-pr-writer [_ writer opts] - (-write writer (str "#")))) + IWatchable + (-add-watch [_ key f] + (add-watch cache key f)) + (-remove-watch [_ key] + (remove-watch cache key)) + (-notify-watches [_ _old _new] + nil) + + IMeta + (-meta [_] @_meta) + + IWithMeta + (-with-meta [_ m] (reset! _meta m)) + + IPrintWithWriter + (-pr-writer [_ writer _opts] + (-write writer (str "#")))])) ;; --------------------------------------------------------------------------- -;; Internal helpers +;; Store ↔ Atom sync ;; --------------------------------------------------------------------------- -(defn- prefix-str [group] - (str group ":")) - -(defn- now-ms [] - (.now js/Date)) - -(defn- doc-in-group? [group id] - (str/starts-with? id (prefix-str group))) - -;; --------------------------------------------------------------------------- -;; IDB ↔ Atom sync -;; --------------------------------------------------------------------------- - -(defn- load-from-idb! - "Load all docs for the group from IndexedDB into the atom. +(defn- load-from-store! + "Load all docs for the group from the store into the atom. Returns a channel that closes when done." [sa] (let [ch (chan 1)] (go (let [prefix (prefix-str (.-group sa)) - docs ( local version (when (> (:version doc) (get @(.-versions sa) id 0)) (if (:deleted doc) (do (swap! (.-cache sa) dissoc id) (swap! (.-versions sa) assoc id (:version doc)) - (idb/put-doc! (:db (.-conn sa)) + (store/put-doc! (.-store sa) {:id id :value nil :version (:version doc) :updated (:updated doc) :deleted true :synced true})) (do (swap! (.-cache sa) assoc id (:value doc)) (swap! (.-versions sa) assoc id (:version doc)) - (idb/put-doc! (:db (.-conn sa)) + (store/put-doc! (.-store sa) {:id id :value (:value doc) :version (:version doc) :updated (:updated doc) :deleted false :synced true})))))) - ;; Update last-sync (reset! (.-last_sync sa) max-ts) - (idb/set-meta! (:db (.-conn sa)) + (store/set-meta! (.-store sa) (str "last-sync:" (.-group sa)) max-ts)) true))))) @@ -248,13 +238,12 @@ (> @docs + vals + (filter #(str/starts-with? (:id %) prefix)) + vec)] + (put! ch matching) + (async/close! ch) + ch)) + + (get-meta [_ key] + (let [ch (chan 1) + v (get @meta-store key)] + (if (some? v) + (put! ch v) + nil) + (async/close! ch) + ch)) + + (set-meta! [_ key value] + (let [ch (chan 1)] + (swap! meta-store assoc key value) + (put! ch true) + (async/close! ch) + ch)) + + (close-store! [_] + nil)) + +(defn create + "Create a new in-memory store." + [] + (MemoryStore. (atom {}) (atom {}))) diff --git a/src/pocketbook/sync.cljc b/src/pocketbook/sync.cljc new file mode 100644 index 0000000..05b8041 --- /dev/null +++ b/src/pocketbook/sync.cljc @@ -0,0 +1,160 @@ +(ns pocketbook.sync + "HTTP sync client — pull and push documents to/from the Pocketbook server." + (:require [pocketbook.transit :as transit] + [clojure.string :as str] + #?(:clj [clojure.core.async :as async :refer [chan put!]] + :cljs [cljs.core.async :as async :refer [chan put!]])) + #?(:clj (:import [java.net URI] + [java.net.http HttpClient HttpRequest HttpRequest$BodyPublishers + HttpResponse$BodyHandlers]))) + +;; --------------------------------------------------------------------------- +;; HTTP helpers +;; --------------------------------------------------------------------------- + +#?(:clj + (def ^:private http-client (HttpClient/newHttpClient))) + +#?(:clj + (defn- http-request + "Make an HTTP request with Transit encoding (JVM). + Returns {:ok bool :body decoded-value :status int}." + [{:keys [url method body]}] + (try + (let [builder (-> (HttpRequest/newBuilder) + (.uri (URI. url)) + (.header "Content-Type" "application/transit+json") + (.header "Accept" "application/transit+json")) + req (case method + :get (.build (.GET builder)) + :post (.build (.POST builder + (HttpRequest$BodyPublishers/ofString + (transit/encode body))))) + resp (.send http-client req (HttpResponse$BodyHandlers/ofString))] + (if (<= 200 (.statusCode resp) 299) + {:ok true :body (transit/decode (.body resp))} + {:ok false :status (.statusCode resp) :error (.body resp)})) + (catch Exception e + {:ok false :status 0 :error (str e)})))) + +#?(:cljs + (defn- fetch-transit + "Make an HTTP request with Transit encoding (browser). + Returns a channel yielding {:ok bool :body decoded :status int}." + [{:keys [url method body]}] + (let [ch (chan 1) + opts (clj->js + (cond-> {:method (or method "GET") + :headers {"Content-Type" "application/transit+json" + "Accept" "application/transit+json"}} + body (assoc :body (transit/encode body))))] + (-> (js/fetch url opts) + (.then (fn [resp] + (-> (.text resp) + (.then (fn [text] + (if (.-ok resp) + (put! ch {:ok true :body (transit/decode text)}) + (put! ch {:ok false + :status (.-status resp) + :error text})) + (async/close! ch)))))) + (.catch (fn [err] + (put! ch {:ok false :status 0 :error (str err)}) + (async/close! ch)))) + ch))) + +;; --------------------------------------------------------------------------- +;; Pull +;; --------------------------------------------------------------------------- + +(defn pull! + "Pull documents from server updated since `since` for `group`. + Returns a channel yielding {:ok true :docs [...]} or {:ok false :error str}." + [{:keys [server]} group since] + (let [url (str server "?group=" #?(:clj (java.net.URLEncoder/encode group "UTF-8") + :cljs (js/encodeURIComponent group)) + "&since=" since)] + #?(:clj + (async/thread + (let [result (http-request {:url url :method :get})] + (if (:ok result) + {:ok true :docs (:body result)} + result))) + + :cljs + (let [ch (chan 1)] + (async/go + (let [result (async/ server + (str/replace #"/sync$" "") + (str/replace #"/$" "")) + url (str base-url "/events?group=" (js/encodeURIComponent group)) + es (js/EventSource. url)] + (set! (.-onmessage es) + (fn [e] + (on-change (.-data e)))) + (set! (.-onerror es) + (fn [_e] nil)) + (fn [] + (.close es))))) + +;; --------------------------------------------------------------------------- +;; Online detection +;; --------------------------------------------------------------------------- + +(defn online? [] + #?(:clj true + :cljs (.-onLine js/navigator))) + +(defn on-connectivity-change + "Register callbacks for online/offline events. Returns a cleanup fn. + On JVM, this is a no-op (always online)." + [on-online on-offline] + #?(:clj (fn []) + :cljs (let [online-handler (fn [_] (on-online)) + offline-handler (fn [_] (on-offline))] + (.addEventListener js/window "online" online-handler) + (.addEventListener js/window "offline" offline-handler) + (fn [] + (.removeEventListener js/window "online" online-handler) + (.removeEventListener js/window "offline" offline-handler))))) diff --git a/src/pocketbook/sync.cljs b/src/pocketbook/sync.cljs deleted file mode 100644 index 06538d3..0000000 --- a/src/pocketbook/sync.cljs +++ /dev/null @@ -1,137 +0,0 @@ -(ns pocketbook.sync - "HTTP sync client — pull and push documents to/from the Pocketbook server." - (:require [cognitect.transit :as t] - [clojure.string :as str] - [cljs.core.async :as async :refer [chan put!]])) - -;; --------------------------------------------------------------------------- -;; Transit over HTTP -;; --------------------------------------------------------------------------- - -(def ^:private writer (t/writer :json)) -(def ^:private reader (t/reader :json)) - -(defn- encode [v] - (t/write writer v)) - -(defn- decode [s] - (when (and s (not= s "")) - (t/read reader s))) - -;; --------------------------------------------------------------------------- -;; HTTP helpers -;; --------------------------------------------------------------------------- - -(defn- fetch-transit - "Make an HTTP request with Transit encoding. Returns a channel - yielding {:ok true :body } or {:ok false :status N :error str}." - [{:keys [url method body headers]}] - (let [ch (chan 1) - opts (clj->js - (cond-> {:method (or method "GET") - :headers (merge {"Content-Type" "application/transit+json" - "Accept" "application/transit+json"} - headers)} - body (assoc :body (encode body))))] - (-> (js/fetch url opts) - (.then (fn [resp] - (-> (.text resp) - (.then (fn [text] - (if (.-ok resp) - (put! ch {:ok true :body (decode text)}) - (put! ch {:ok false - :status (.-status resp) - :error text})) - (async/close! ch)))))) - (.catch (fn [err] - (put! ch {:ok false :status 0 :error (str err)}) - (async/close! ch)))) - ch)) - -;; --------------------------------------------------------------------------- -;; Pull -;; --------------------------------------------------------------------------- - -(defn pull! - "Pull documents from server updated since `since` for `group`. - Returns a channel yielding {:ok true :docs [...]} or {:ok false :error str}." - [{:keys [server]} group since] - (let [ch (chan 1) - url (str server "?group=" (js/encodeURIComponent group) - "&since=" since)] - (async/go - (let [result (async/ server - (str/replace #"/sync$" "") - (str/replace #"/$" "")) - url (str base-url "/events?group=" (js/encodeURIComponent group)) - es (js/EventSource. url)] - (set! (.-onmessage es) - (fn [e] - (let [data (.-data e)] - ;; "connected" fires on initial connect AND every reconnect - ;; Always trigger a sync — picks up missed changes after reconnect - (on-change data)))) - (set! (.-onerror es) - (fn [_e] - ;; EventSource auto-reconnects; nothing to do - nil)) - ;; Return cleanup - (fn [] - (.close es)))) - -;; --------------------------------------------------------------------------- -;; Online detection -;; --------------------------------------------------------------------------- - -(defn online? [] - (.-onLine js/navigator)) - -(defn on-connectivity-change - "Register callbacks for online/offline events. Returns a cleanup fn." - [on-online on-offline] - (let [online-handler (fn [_] (on-online)) - offline-handler (fn [_] (on-offline))] - (.addEventListener js/window "online" online-handler) - (.addEventListener js/window "offline" offline-handler) - ;; Return cleanup function - (fn [] - (.removeEventListener js/window "online" online-handler) - (.removeEventListener js/window "offline" offline-handler)))) diff --git a/src/pocketbook/transit.clj b/src/pocketbook/transit.clj deleted file mode 100644 index b19a570..0000000 --- a/src/pocketbook/transit.clj +++ /dev/null @@ -1,32 +0,0 @@ -(ns pocketbook.transit - "Transit encoding/decoding helpers for the HTTP wire format." - (:require [cognitect.transit :as t]) - (:import [java.io ByteArrayInputStream ByteArrayOutputStream])) - -(defn encode - "Encode a Clojure value to a Transit+JSON byte array." - [v] - (let [out (ByteArrayOutputStream. 4096) - w (t/writer out :json)] - (t/write w v) - (.toByteArray out))) - -(defn encode-str - "Encode a Clojure value to a Transit+JSON string." - [v] - (let [out (ByteArrayOutputStream. 4096) - w (t/writer out :json)] - (t/write w v) - (.toString out "UTF-8"))) - -(defn decode - "Decode a Transit+JSON byte array or input stream to a Clojure value." - [input] - (let [in (cond - (instance? ByteArrayInputStream input) input - (instance? java.io.InputStream input) input - (bytes? input) (ByteArrayInputStream. input) - (string? input) (ByteArrayInputStream. (.getBytes ^String input "UTF-8")) - :else (throw (ex-info "Cannot decode, unsupported input type" - {:type (type input)})))] - (t/read (t/reader in :json)))) diff --git a/src/pocketbook/transit.cljc b/src/pocketbook/transit.cljc new file mode 100644 index 0000000..1968051 --- /dev/null +++ b/src/pocketbook/transit.cljc @@ -0,0 +1,31 @@ +(ns pocketbook.transit + "Transit encoding/decoding helpers for the HTTP wire format." + (:require [cognitect.transit :as t]) + #?(:clj (:import [java.io ByteArrayInputStream ByteArrayOutputStream]))) + +(defn encode + "Encode a Clojure value to a Transit+JSON string." + [v] + #?(:clj (let [out (ByteArrayOutputStream. 4096) + w (t/writer out :json)] + (t/write w v) + (.toString out "UTF-8")) + :cljs (t/write (t/writer :json) v))) + +(defn decode + "Decode Transit+JSON to a Clojure value. + Accepts a string on both platforms; on CLJ also accepts byte arrays and InputStreams." + [input] + #?(:clj (cond + (nil? input) nil + (string? input) + (when (not= input "") + (t/read (t/reader (ByteArrayInputStream. (.getBytes ^String input "UTF-8")) :json))) + (instance? java.io.InputStream input) + (t/read (t/reader input :json)) + (bytes? input) + (t/read (t/reader (ByteArrayInputStream. input) :json)) + :else (throw (ex-info "Cannot decode, unsupported input type" + {:type (type input)}))) + :cljs (when (and input (not= input "")) + (t/read (t/reader :json) input)))) diff --git a/test/pocketbook/core_test.clj b/test/pocketbook/core_test.clj new file mode 100644 index 0000000..775abec --- /dev/null +++ b/test/pocketbook/core_test.clj @@ -0,0 +1,185 @@ +(ns pocketbook.core-test + (:require [clojure.test :refer [deftest is testing use-fixtures]] + [clojure.core.async :as async :refer [ (HttpRequest/newBuilder) - (.uri (URI. (url path))) - (.header "Content-Type" "application/transit+json") - (.header "Accept" "application/transit+json") - (.POST (HttpRequest$BodyPublishers/ofByteArray bytes)) - (.build)) - resp (.send client req (HttpResponse$BodyHandlers/ofByteArray))] + (let [encoded (t/encode body) + req (-> (HttpRequest/newBuilder) + (.uri (URI. (url path))) + (.header "Content-Type" "application/transit+json") + (.header "Accept" "application/transit+json") + (.POST (HttpRequest$BodyPublishers/ofString encoded)) + (.build)) + resp (.send client req (HttpResponse$BodyHandlers/ofString))] {:status (.statusCode resp) :body (t/decode (.body resp))})) diff --git a/test/pocketbook/transit_test.clj b/test/pocketbook/transit_test.clj index f98e32a..59e62f4 100644 --- a/test/pocketbook/transit_test.clj +++ b/test/pocketbook/transit_test.clj @@ -30,8 +30,8 @@ :version 5}]] (is (= data (t/decode (t/encode data)))))) -(deftest encode-str-roundtrip +(deftest encode-returns-string (let [v {:hello "world" :nums [1 2 3]} - s (t/encode-str v)] + s (t/encode v)] (is (string? s)) (is (= v (t/decode s)))))