From 59f9cacdb05a24946706b5122723b33cd95bac30 Mon Sep 17 00:00:00 2001 From: Florian Schroedl Date: Thu, 16 Apr 2026 20:36:30 +0200 Subject: [PATCH] test: add multi-client sync flow tests Add 8 end-to-end flow tests exercising full client-server sync scenarios: - two-client convergence on same key - delete propagation across clients - update propagation across clients - store persistence across client restarts - fresh client pulling from server - batch sync of 50 documents - pending changes surviving server restart - group isolation between different document groups --- test/atomsync/core_test.clj | 255 ++++++++++++++++++++++++++++++++++++ 1 file changed, 255 insertions(+) diff --git a/test/atomsync/core_test.clj b/test/atomsync/core_test.clj index 9b44b62..8997170 100644 --- a/test/atomsync/core_test.clj +++ b/test/atomsync/core_test.clj @@ -182,3 +182,258 @@ (is (true? (:edited (get @sa "note:1")))) (pb/destroy! sa)))) + +;; --------------------------------------------------------------------------- +;; Flow tests — multi-client sync scenarios +;; --------------------------------------------------------------------------- + +(deftest two-client-convergence + (testing "Two clients writing the same key converge after sync" + (let [store-a (memory/create) + store-b (memory/create) + sa-a (pb/synced-atom store-a "todo" + {:server (server-url) :interval 60000}) + sa-b (pb/synced-atom store-b "todo" + {:server (server-url) :interval 60000})] + (await! (pb/ready? sa-a) 2000) + (await! (pb/ready? sa-b) 2000) + + ;; A writes and pushes first + (swap! sa-a assoc "todo:x" {:text "From A"}) + (wait-synced sa-a 2000) + + ;; B writes same key (hasn't pulled A's version yet, base-version 0) + (swap! sa-b assoc "todo:x" {:text "From B"}) + + ;; Sync both — pull runs before push, so B picks up A's version + (await! (pb/sync-now! sa-b) 3000) + (await! (pb/sync-now! sa-a) 3000) + (await! (pb/sync-now! sa-b) 3000) + + (is (= (get @sa-a "todo:x") (get @sa-b "todo:x")) + "Both clients must converge to the same value") + + (pb/destroy! sa-a) + (pb/destroy! sa-b)))) + +(deftest delete-propagation + (testing "Deleting a doc on A removes it on B after sync" + (let [store-a (memory/create) + store-b (memory/create) + sa-a (pb/synced-atom store-a "todo" + {:server (server-url) :interval 60000}) + sa-b (pb/synced-atom store-b "todo" + {:server (server-url) :interval 60000})] + (await! (pb/ready? sa-a) 2000) + (await! (pb/ready? sa-b) 2000) + + ;; A creates a doc and pushes + (swap! sa-a assoc "todo:del1" {:text "Delete me"}) + (wait-synced sa-a 2000) + + ;; B syncs — should see the doc + (await! (pb/sync-now! sa-b) 2000) + (is (= {:text "Delete me"} (get @sa-b "todo:del1")) + "B should see A's doc after sync") + + ;; A deletes + (swap! sa-a dissoc "todo:del1") + (wait-synced sa-a 2000) + + ;; B syncs — doc should be gone + (await! (pb/sync-now! sa-b) 2000) + (is (nil? (get @sa-b "todo:del1")) + "Deleted doc must disappear on B after sync") + + (pb/destroy! sa-a) + (pb/destroy! sa-b)))) + +(deftest update-propagation + (testing "Updating a doc on A is visible on B after sync" + (let [store-a (memory/create) + store-b (memory/create) + sa-a (pb/synced-atom store-a "todo" + {:server (server-url) :interval 60000}) + sa-b (pb/synced-atom store-b "todo" + {:server (server-url) :interval 60000})] + (await! (pb/ready? sa-a) 2000) + (await! (pb/ready? sa-b) 2000) + + ;; A creates + (swap! sa-a assoc "todo:u1" {:text "v1"}) + (wait-synced sa-a 2000) + + ;; B sees v1 + (await! (pb/sync-now! sa-b) 2000) + (is (= {:text "v1"} (get @sa-b "todo:u1"))) + + ;; A updates + (swap! sa-a assoc "todo:u1" {:text "v2" :edited true}) + (wait-synced sa-a 2000) + + ;; B sees v2 + (await! (pb/sync-now! sa-b) 2000) + (is (= {:text "v2" :edited true} (get @sa-b "todo:u1")) + "B should see A's updated value") + + (pb/destroy! sa-a) + (pb/destroy! sa-b)))) + +(deftest store-persistence-across-clients + (testing "New client on the same store loads data written by previous client" + (let [store (memory/create) + sa-a (pb/synced-atom store "todo" + {:server (server-url)})] + (await! (pb/ready? sa-a) 2000) + + (swap! sa-a assoc "todo:p1" {:text "Persisted"}) + (swap! sa-a assoc "todo:p2" {:text "Also persisted"}) + (wait-synced sa-a 2000) + (pb/destroy! sa-a) + + ;; New client on same store — no server needed, loads from store + (let [sa-b (pb/synced-atom store "todo")] + (await! (pb/ready? sa-b) 2000) + (is (= {:text "Persisted"} (get @sa-b "todo:p1"))) + (is (= {:text "Also persisted"} (get @sa-b "todo:p2"))) + (pb/destroy! sa-b))))) + +(deftest fresh-client-pulls-from-server + (testing "A fresh client with an empty store pulls all data from the server" + (let [store-a (memory/create) + sa-a (pb/synced-atom store-a "todo" + {:server (server-url)})] + (await! (pb/ready? sa-a) 2000) + + (swap! sa-a assoc "todo:s1" {:text "Server 1"}) + (swap! sa-a assoc "todo:s2" {:text "Server 2"}) + (swap! sa-a assoc "todo:s3" {:text "Server 3"}) + (wait-synced sa-a 3000) + (pb/destroy! sa-a) + + ;; Brand new client, empty store + (let [store-b (memory/create) + sa-b (pb/synced-atom store-b "todo" + {:server (server-url)})] + (await! (pb/ready? sa-b) 2000) + ;; Initial sync runs in constructor; give it time + explicit sync + (await! (pb/sync-now! sa-b) 2000) + + (is (= 3 (count @sa-b))) + (is (= {:text "Server 1"} (get @sa-b "todo:s1"))) + (is (= {:text "Server 2"} (get @sa-b "todo:s2"))) + (is (= {:text "Server 3"} (get @sa-b "todo:s3"))) + (pb/destroy! sa-b))))) + +(deftest batch-sync-many-docs + (testing "50 documents written on A all arrive on B" + (let [store-a (memory/create) + store-b (memory/create) + sa-a (pb/synced-atom store-a "item" + {:server (server-url) :interval 60000}) + sa-b (pb/synced-atom store-b "item" + {:server (server-url) :interval 60000})] + (await! (pb/ready? sa-a) 2000) + (await! (pb/ready? sa-b) 2000) + + ;; Write 50 docs on A + (doseq [i (range 50)] + (swap! sa-a assoc (str "item:" i) {:n i})) + + ;; Wait for push to complete + (wait-synced sa-a 5000) + (is (zero? (pb/pending-count sa-a)) + "A should have pushed all 50 docs") + + ;; B pulls + (await! (pb/sync-now! sa-b) 5000) + + (is (= 50 (count @sa-b)) + "B should have all 50 docs") + (doseq [i (range 50)] + (is (= {:n i} (get @sa-b (str "item:" i))))) + + (pb/destroy! sa-a) + (pb/destroy! sa-b)))) + +(deftest server-restart-recovery + (testing "Pending changes sync after server comes back up" + (let [port *port* + db-path (:db-path (:config *server*)) + store-a (memory/create) + sa-a (pb/synced-atom store-a "todo" + {:server (server-url) :interval 60000})] + (await! (pb/ready? sa-a) 2000) + + ;; Write while server is up + (swap! sa-a assoc "todo:before" {:text "Before restart"}) + (wait-synced sa-a 2000) + + ;; Stop the server + (server/stop! *server*) + (Thread/sleep 200) + + ;; Write while server is down — push will fail silently + (swap! sa-a assoc "todo:during" {:text "During downtime"}) + (Thread/sleep 500) + (is (pos? (pb/pending-count sa-a)) + "Should have pending changes while server is down") + + ;; Restart server on same port with same DB + (let [srv2 (server/start! {:port port :db-path db-path})] + (Thread/sleep 200) + (try + ;; Manual sync pushes the pending changes + (await! (pb/sync-now! sa-a) 3000) + (wait-synced sa-a 3000) + (is (zero? (pb/pending-count sa-a)) + "Pending changes should be flushed after server restart") + + ;; Verify via a fresh client + (let [store-b (memory/create) + sa-b (pb/synced-atom store-b "todo" + {:server (server-url) :interval 60000})] + (await! (pb/ready? sa-b) 2000) + (await! (pb/sync-now! sa-b) 2000) + + (is (= {:text "Before restart"} (get @sa-b "todo:before"))) + (is (= {:text "During downtime"} (get @sa-b "todo:during"))) + (pb/destroy! sa-b)) + + (pb/destroy! sa-a) + (finally + (server/stop! srv2))))))) + +(deftest group-isolation + (testing "Documents in one group are invisible to a client on another group" + (let [store-todos (memory/create) + store-notes (memory/create) + sa-todos (pb/synced-atom store-todos "todo" + {:server (server-url) :interval 60000}) + sa-notes (pb/synced-atom store-notes "note" + {:server (server-url) :interval 60000})] + (await! (pb/ready? sa-todos) 2000) + (await! (pb/ready? sa-notes) 2000) + + ;; Write to both groups + (swap! sa-todos assoc "todo:1" {:text "Buy milk"}) + (swap! sa-notes assoc "note:1" {:text "Meeting notes"}) + (wait-synced sa-todos 2000) + (wait-synced sa-notes 2000) + + ;; Fresh client on "todo" group should only see todos + (let [store-fresh (memory/create) + sa-fresh (pb/synced-atom store-fresh "todo" + {:server (server-url) :interval 60000})] + (await! (pb/ready? sa-fresh) 2000) + (await! (pb/sync-now! sa-fresh) 2000) + + (is (= 1 (count @sa-fresh))) + (is (= {:text "Buy milk"} (get @sa-fresh "todo:1"))) + (is (nil? (get @sa-fresh "note:1")) + "Notes should not leak into the todo group") + + (pb/destroy! sa-fresh)) + + (pb/destroy! sa-todos) + (pb/destroy! sa-notes))))