test: add multi-client sync flow tests
Add 8 end-to-end flow tests exercising full client-server sync scenarios: - two-client convergence on same key - delete propagation across clients - update propagation across clients - store persistence across client restarts - fresh client pulling from server - batch sync of 50 documents - pending changes surviving server restart - group isolation between different document groups
This commit is contained in:
@@ -182,3 +182,258 @@
|
||||
(is (true? (:edited (get @sa "note:1"))))
|
||||
|
||||
(pb/destroy! sa))))
|
||||
|
||||
;; ---------------------------------------------------------------------------
|
||||
;; Flow tests — multi-client sync scenarios
|
||||
;; ---------------------------------------------------------------------------
|
||||
|
||||
(deftest two-client-convergence
|
||||
(testing "Two clients writing the same key converge after sync"
|
||||
(let [store-a (memory/create)
|
||||
store-b (memory/create)
|
||||
sa-a (pb/synced-atom store-a "todo"
|
||||
{:server (server-url) :interval 60000})
|
||||
sa-b (pb/synced-atom store-b "todo"
|
||||
{:server (server-url) :interval 60000})]
|
||||
(await! (pb/ready? sa-a) 2000)
|
||||
(await! (pb/ready? sa-b) 2000)
|
||||
|
||||
;; A writes and pushes first
|
||||
(swap! sa-a assoc "todo:x" {:text "From A"})
|
||||
(wait-synced sa-a 2000)
|
||||
|
||||
;; B writes same key (hasn't pulled A's version yet, base-version 0)
|
||||
(swap! sa-b assoc "todo:x" {:text "From B"})
|
||||
|
||||
;; Sync both — pull runs before push, so B picks up A's version
|
||||
(await! (pb/sync-now! sa-b) 3000)
|
||||
(await! (pb/sync-now! sa-a) 3000)
|
||||
(await! (pb/sync-now! sa-b) 3000)
|
||||
|
||||
(is (= (get @sa-a "todo:x") (get @sa-b "todo:x"))
|
||||
"Both clients must converge to the same value")
|
||||
|
||||
(pb/destroy! sa-a)
|
||||
(pb/destroy! sa-b))))
|
||||
|
||||
(deftest delete-propagation
|
||||
(testing "Deleting a doc on A removes it on B after sync"
|
||||
(let [store-a (memory/create)
|
||||
store-b (memory/create)
|
||||
sa-a (pb/synced-atom store-a "todo"
|
||||
{:server (server-url) :interval 60000})
|
||||
sa-b (pb/synced-atom store-b "todo"
|
||||
{:server (server-url) :interval 60000})]
|
||||
(await! (pb/ready? sa-a) 2000)
|
||||
(await! (pb/ready? sa-b) 2000)
|
||||
|
||||
;; A creates a doc and pushes
|
||||
(swap! sa-a assoc "todo:del1" {:text "Delete me"})
|
||||
(wait-synced sa-a 2000)
|
||||
|
||||
;; B syncs — should see the doc
|
||||
(await! (pb/sync-now! sa-b) 2000)
|
||||
(is (= {:text "Delete me"} (get @sa-b "todo:del1"))
|
||||
"B should see A's doc after sync")
|
||||
|
||||
;; A deletes
|
||||
(swap! sa-a dissoc "todo:del1")
|
||||
(wait-synced sa-a 2000)
|
||||
|
||||
;; B syncs — doc should be gone
|
||||
(await! (pb/sync-now! sa-b) 2000)
|
||||
(is (nil? (get @sa-b "todo:del1"))
|
||||
"Deleted doc must disappear on B after sync")
|
||||
|
||||
(pb/destroy! sa-a)
|
||||
(pb/destroy! sa-b))))
|
||||
|
||||
(deftest update-propagation
|
||||
(testing "Updating a doc on A is visible on B after sync"
|
||||
(let [store-a (memory/create)
|
||||
store-b (memory/create)
|
||||
sa-a (pb/synced-atom store-a "todo"
|
||||
{:server (server-url) :interval 60000})
|
||||
sa-b (pb/synced-atom store-b "todo"
|
||||
{:server (server-url) :interval 60000})]
|
||||
(await! (pb/ready? sa-a) 2000)
|
||||
(await! (pb/ready? sa-b) 2000)
|
||||
|
||||
;; A creates
|
||||
(swap! sa-a assoc "todo:u1" {:text "v1"})
|
||||
(wait-synced sa-a 2000)
|
||||
|
||||
;; B sees v1
|
||||
(await! (pb/sync-now! sa-b) 2000)
|
||||
(is (= {:text "v1"} (get @sa-b "todo:u1")))
|
||||
|
||||
;; A updates
|
||||
(swap! sa-a assoc "todo:u1" {:text "v2" :edited true})
|
||||
(wait-synced sa-a 2000)
|
||||
|
||||
;; B sees v2
|
||||
(await! (pb/sync-now! sa-b) 2000)
|
||||
(is (= {:text "v2" :edited true} (get @sa-b "todo:u1"))
|
||||
"B should see A's updated value")
|
||||
|
||||
(pb/destroy! sa-a)
|
||||
(pb/destroy! sa-b))))
|
||||
|
||||
(deftest store-persistence-across-clients
|
||||
(testing "New client on the same store loads data written by previous client"
|
||||
(let [store (memory/create)
|
||||
sa-a (pb/synced-atom store "todo"
|
||||
{:server (server-url)})]
|
||||
(await! (pb/ready? sa-a) 2000)
|
||||
|
||||
(swap! sa-a assoc "todo:p1" {:text "Persisted"})
|
||||
(swap! sa-a assoc "todo:p2" {:text "Also persisted"})
|
||||
(wait-synced sa-a 2000)
|
||||
(pb/destroy! sa-a)
|
||||
|
||||
;; New client on same store — no server needed, loads from store
|
||||
(let [sa-b (pb/synced-atom store "todo")]
|
||||
(await! (pb/ready? sa-b) 2000)
|
||||
(is (= {:text "Persisted"} (get @sa-b "todo:p1")))
|
||||
(is (= {:text "Also persisted"} (get @sa-b "todo:p2")))
|
||||
(pb/destroy! sa-b)))))
|
||||
|
||||
(deftest fresh-client-pulls-from-server
|
||||
(testing "A fresh client with an empty store pulls all data from the server"
|
||||
(let [store-a (memory/create)
|
||||
sa-a (pb/synced-atom store-a "todo"
|
||||
{:server (server-url)})]
|
||||
(await! (pb/ready? sa-a) 2000)
|
||||
|
||||
(swap! sa-a assoc "todo:s1" {:text "Server 1"})
|
||||
(swap! sa-a assoc "todo:s2" {:text "Server 2"})
|
||||
(swap! sa-a assoc "todo:s3" {:text "Server 3"})
|
||||
(wait-synced sa-a 3000)
|
||||
(pb/destroy! sa-a)
|
||||
|
||||
;; Brand new client, empty store
|
||||
(let [store-b (memory/create)
|
||||
sa-b (pb/synced-atom store-b "todo"
|
||||
{:server (server-url)})]
|
||||
(await! (pb/ready? sa-b) 2000)
|
||||
;; Initial sync runs in constructor; give it time + explicit sync
|
||||
(await! (pb/sync-now! sa-b) 2000)
|
||||
|
||||
(is (= 3 (count @sa-b)))
|
||||
(is (= {:text "Server 1"} (get @sa-b "todo:s1")))
|
||||
(is (= {:text "Server 2"} (get @sa-b "todo:s2")))
|
||||
(is (= {:text "Server 3"} (get @sa-b "todo:s3")))
|
||||
(pb/destroy! sa-b)))))
|
||||
|
||||
(deftest batch-sync-many-docs
|
||||
(testing "50 documents written on A all arrive on B"
|
||||
(let [store-a (memory/create)
|
||||
store-b (memory/create)
|
||||
sa-a (pb/synced-atom store-a "item"
|
||||
{:server (server-url) :interval 60000})
|
||||
sa-b (pb/synced-atom store-b "item"
|
||||
{:server (server-url) :interval 60000})]
|
||||
(await! (pb/ready? sa-a) 2000)
|
||||
(await! (pb/ready? sa-b) 2000)
|
||||
|
||||
;; Write 50 docs on A
|
||||
(doseq [i (range 50)]
|
||||
(swap! sa-a assoc (str "item:" i) {:n i}))
|
||||
|
||||
;; Wait for push to complete
|
||||
(wait-synced sa-a 5000)
|
||||
(is (zero? (pb/pending-count sa-a))
|
||||
"A should have pushed all 50 docs")
|
||||
|
||||
;; B pulls
|
||||
(await! (pb/sync-now! sa-b) 5000)
|
||||
|
||||
(is (= 50 (count @sa-b))
|
||||
"B should have all 50 docs")
|
||||
(doseq [i (range 50)]
|
||||
(is (= {:n i} (get @sa-b (str "item:" i)))))
|
||||
|
||||
(pb/destroy! sa-a)
|
||||
(pb/destroy! sa-b))))
|
||||
|
||||
(deftest server-restart-recovery
|
||||
(testing "Pending changes sync after server comes back up"
|
||||
(let [port *port*
|
||||
db-path (:db-path (:config *server*))
|
||||
store-a (memory/create)
|
||||
sa-a (pb/synced-atom store-a "todo"
|
||||
{:server (server-url) :interval 60000})]
|
||||
(await! (pb/ready? sa-a) 2000)
|
||||
|
||||
;; Write while server is up
|
||||
(swap! sa-a assoc "todo:before" {:text "Before restart"})
|
||||
(wait-synced sa-a 2000)
|
||||
|
||||
;; Stop the server
|
||||
(server/stop! *server*)
|
||||
(Thread/sleep 200)
|
||||
|
||||
;; Write while server is down — push will fail silently
|
||||
(swap! sa-a assoc "todo:during" {:text "During downtime"})
|
||||
(Thread/sleep 500)
|
||||
(is (pos? (pb/pending-count sa-a))
|
||||
"Should have pending changes while server is down")
|
||||
|
||||
;; Restart server on same port with same DB
|
||||
(let [srv2 (server/start! {:port port :db-path db-path})]
|
||||
(Thread/sleep 200)
|
||||
(try
|
||||
;; Manual sync pushes the pending changes
|
||||
(await! (pb/sync-now! sa-a) 3000)
|
||||
(wait-synced sa-a 3000)
|
||||
(is (zero? (pb/pending-count sa-a))
|
||||
"Pending changes should be flushed after server restart")
|
||||
|
||||
;; Verify via a fresh client
|
||||
(let [store-b (memory/create)
|
||||
sa-b (pb/synced-atom store-b "todo"
|
||||
{:server (server-url) :interval 60000})]
|
||||
(await! (pb/ready? sa-b) 2000)
|
||||
(await! (pb/sync-now! sa-b) 2000)
|
||||
|
||||
(is (= {:text "Before restart"} (get @sa-b "todo:before")))
|
||||
(is (= {:text "During downtime"} (get @sa-b "todo:during")))
|
||||
(pb/destroy! sa-b))
|
||||
|
||||
(pb/destroy! sa-a)
|
||||
(finally
|
||||
(server/stop! srv2)))))))
|
||||
|
||||
(deftest group-isolation
|
||||
(testing "Documents in one group are invisible to a client on another group"
|
||||
(let [store-todos (memory/create)
|
||||
store-notes (memory/create)
|
||||
sa-todos (pb/synced-atom store-todos "todo"
|
||||
{:server (server-url) :interval 60000})
|
||||
sa-notes (pb/synced-atom store-notes "note"
|
||||
{:server (server-url) :interval 60000})]
|
||||
(await! (pb/ready? sa-todos) 2000)
|
||||
(await! (pb/ready? sa-notes) 2000)
|
||||
|
||||
;; Write to both groups
|
||||
(swap! sa-todos assoc "todo:1" {:text "Buy milk"})
|
||||
(swap! sa-notes assoc "note:1" {:text "Meeting notes"})
|
||||
(wait-synced sa-todos 2000)
|
||||
(wait-synced sa-notes 2000)
|
||||
|
||||
;; Fresh client on "todo" group should only see todos
|
||||
(let [store-fresh (memory/create)
|
||||
sa-fresh (pb/synced-atom store-fresh "todo"
|
||||
{:server (server-url) :interval 60000})]
|
||||
(await! (pb/ready? sa-fresh) 2000)
|
||||
(await! (pb/sync-now! sa-fresh) 2000)
|
||||
|
||||
(is (= 1 (count @sa-fresh)))
|
||||
(is (= {:text "Buy milk"} (get @sa-fresh "todo:1")))
|
||||
(is (nil? (get @sa-fresh "note:1"))
|
||||
"Notes should not leak into the todo group")
|
||||
|
||||
(pb/destroy! sa-fresh))
|
||||
|
||||
(pb/destroy! sa-todos)
|
||||
(pb/destroy! sa-notes))))
|
||||
|
||||
Reference in New Issue
Block a user