import json import os import tempfile import time import sys sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) from tinyweb_forum.db import ForumDB from tinyweb_forum.handlers import ForumHandlers def make_handler(data_dir, site_name="test"): fdb = ForumDB(data_dir) handlers = ForumHandlers(fdb, None, None, None, site_name=site_name) return fdb, handlers def test_sync_thread(): dir_a = tempfile.mkdtemp() dir_b = tempfile.mkdtemp() try: db_a, ha = make_handler(dir_a) db_b, hb = make_handler(dir_b) now = "2026-06-05T12:00:00" thread_id = "t1" db_a.create_thread(thread_id, "Hello World", "https://example.com", "First post", "test,hello", "abc123", "", now) # Simulate sync A -> B hb.handle_sync_request({ "query": {}, "threads": [dict(db_a.get_thread(thread_id))], "posts": [], "upvotes": [], "from_hash": "abc123", "blocks": {"mine": [], "peers": []}, "retractions": [], }) t = db_b.get_thread(thread_id) assert t is not None, "thread should exist in B" assert t["title"] == "Hello World" assert t["url"] == "https://example.com" assert t["tags"] == "test,hello" assert t["author_instance"] == "abc123" finally: import shutil shutil.rmtree(dir_a) shutil.rmtree(dir_b) def test_sync_reply(): dir_a = tempfile.mkdtemp() dir_b = tempfile.mkdtemp() try: db_a, ha = make_handler(dir_a) db_b, hb = make_handler(dir_b) now = "2026-06-05T12:00:00" thread_id = "t1" db_a.create_thread(thread_id, "Thread", "", "Body", "", "abc123", "", now) post_id = "p1" db_a.create_post(post_id, thread_id, "", "A reply", "def456", "", now) hb.handle_sync_request({ "query": {}, "threads": [dict(db_a.get_thread(thread_id))], "posts": [dict(db_a.get_posts(thread_id)[0])], "upvotes": [], "from_hash": "abc123", "blocks": {"mine": [], "peers": []}, "retractions": [], }) posts = db_b.get_posts(thread_id) assert len(list(posts)) == 1 assert list(posts)[0]["body"] == "A reply" finally: import shutil shutil.rmtree(dir_a) shutil.rmtree(dir_b) def test_sync_upvote(): dir_a = tempfile.mkdtemp() dir_b = tempfile.mkdtemp() try: db_a, ha = make_handler(dir_a) db_b, hb = make_handler(dir_b) now = "2026-06-05T12:00:00" thread_id = "t1" db_a.create_thread(thread_id, "Thread", "", "", "", "abc123", "", now) hb.handle_sync_request({ "query": {}, "threads": [dict(db_a.get_thread(thread_id))], "posts": [], "upvotes": [{"thread_id": thread_id, "instance_hash": "xyz789"}], "from_hash": "abc123", "blocks": {"mine": [], "peers": []}, "retractions": [], }) t = db_b.get_thread(thread_id) assert t is not None assert t["score"] == 1 finally: import shutil shutil.rmtree(dir_a) shutil.rmtree(dir_b) def test_sync_blocks_content(): """Threads from blocked authors should not sync.""" dir_a = tempfile.mkdtemp() dir_b = tempfile.mkdtemp() try: db_a, ha = make_handler(dir_a) db_b, hb = make_handler(dir_b) # Block the author db_b.set_setting("blocked_instances", "abc123") now = "2026-06-05T12:00:00" thread_id = "t1" db_a.create_thread(thread_id, "Blocked Thread", "", "", "", "abc123", "", now) hb.handle_sync_request({ "query": {}, "threads": [dict(db_a.get_thread(thread_id))], "posts": [], "upvotes": [], "from_hash": "abc123", "blocks": {"mine": [], "peers": []}, "retractions": [], }) t = db_b.get_thread(thread_id) assert t is None, "blocked author's thread should not sync" finally: import shutil shutil.rmtree(dir_a) shutil.rmtree(dir_b) def test_sync_retraction(): """Retraction signal from author should retract content on peer.""" dir_a = tempfile.mkdtemp() dir_b = tempfile.mkdtemp() try: db_a, ha = make_handler(dir_a) db_b, hb = make_handler(dir_b) now = "2026-06-05T12:00:00" thread_id = "t1" db_a.create_thread(thread_id, "To Retract", "", "", "", "abc123", "", now) # Sync thread to B first hb.handle_sync_request({ "query": {}, "threads": [dict(db_a.get_thread(thread_id))], "posts": [], "upvotes": [], "from_hash": "abc123", "blocks": {"mine": [], "peers": []}, "retractions": [], }) assert db_b.get_thread(thread_id) is not None # Now retract in A and sync later = "2026-06-05T13:00:00" db_a.retract_thread(thread_id, "abc123", later) hb.handle_sync_request({ "query": {}, "threads": [], "posts": [], "upvotes": [], "from_hash": "abc123", "blocks": {"mine": [], "peers": []}, "retractions": [{"id": thread_id, "type": "thread", "author": "abc123", "at": later}], }) t = db_b.get_thread(thread_id) assert t is not None assert t["title"] == "[retracted]" finally: import shutil shutil.rmtree(dir_a) shutil.rmtree(dir_b) def test_sync_bidirectional(): """A and B sync each other's threads.""" dir_a = tempfile.mkdtemp() dir_b = tempfile.mkdtemp() try: db_a, ha = make_handler(dir_a) db_b, hb = make_handler(dir_b) now = "2026-06-05T12:00:00" # A creates a thread db_a.create_thread("t_a", "From A", "", "", "", "aaa111", "", now) # B creates a thread db_b.create_thread("t_b", "From B", "", "", "", "bbb222", "", now) # A syncs B's content ha.handle_sync_request({ "query": {}, "threads": [dict(db_b.get_thread("t_b"))], "posts": [], "upvotes": [], "from_hash": "bbb222", "blocks": {"mine": [], "peers": []}, "retractions": [], }) # B syncs A's content hb.handle_sync_request({ "query": {}, "threads": [dict(db_a.get_thread("t_a"))], "posts": [], "upvotes": [], "from_hash": "aaa111", "blocks": {"mine": [], "peers": []}, "retractions": [], }) assert db_a.get_thread("t_b") is not None, "A should have B's thread" assert db_b.get_thread("t_a") is not None, "B should have A's thread" finally: import shutil shutil.rmtree(dir_a) shutil.rmtree(dir_b) def test_sync_block_gossip(): """Peer blocks should be recorded and auto-block at threshold.""" dir_a = tempfile.mkdtemp() dir_b = tempfile.mkdtemp() try: db_a, ha = make_handler(dir_a) db_b, hb = make_handler(dir_b) now = "2026-06-05T12:00:00" # A has blocked "malicious" db_a.set_setting("blocked_instances", "malicious") # A syncs to B — B should record A's blocks hb.handle_sync_request({ "query": {}, "threads": [], "posts": [], "upvotes": [], "from_hash": "peer_a", "blocks": {"mine": ["malicious"], "peers": []}, "retractions": [], }) blocks_a = db_b.get_peer_block_list() assert "malicious" in blocks_a # Need 2 more peers to reach threshold 3 hb.handle_sync_request({ "query": {}, "threads": [], "posts": [], "upvotes": [], "from_hash": "peer_b", "blocks": {"mine": ["malicious"], "peers": []}, "retractions": [], }) hb.handle_sync_request({ "query": {}, "threads": [], "posts": [], "upvotes": [], "from_hash": "peer_c", "blocks": {"mine": ["malicious"], "peers": []}, "retractions": [], }) # _apply_peer_blocks would be called during sync loop from tinyweb_forum.sync import ForumSync # Mock just the _apply_peer_blocks logic counts = db_b.get_peer_block_counts() assert counts.get("malicious", 0) >= 3 finally: import shutil shutil.rmtree(dir_a) shutil.rmtree(dir_b) def test_sync_updated_content(): """Updated thread should overwrite older version.""" dir_a = tempfile.mkdtemp() dir_b = tempfile.mkdtemp() try: db_a, ha = make_handler(dir_a) db_b, hb = make_handler(dir_b) now = "2026-06-05T12:00:00" thread_id = "t1" db_a.create_thread(thread_id, "Old Title", "", "", "", "abc123", "", now) hb.handle_sync_request({ "query": {}, "threads": [dict(db_a.get_thread(thread_id))], "posts": [], "upvotes": [], "from_hash": "abc123", "blocks": {"mine": [], "peers": []}, "retractions": [], }) assert db_b.get_thread(thread_id)["title"] == "Old Title" # Update in A later = "2026-06-05T14:00:00" db_a.update_thread(thread_id, "New Title", "", "", "", later) # Resync hb.handle_sync_request({ "query": {"since": [now]}, "threads": [dict(db_a.get_thread(thread_id))], "posts": [], "upvotes": [], "from_hash": "abc123", "blocks": {"mine": [], "peers": []}, "retractions": [], }) assert db_b.get_thread(thread_id)["title"] == "New Title" finally: import shutil shutil.rmtree(dir_a) shutil.rmtree(dir_b) def test_sync_peer_discovery(): """Peers should discover each other through sync gossip.""" dir_a = tempfile.mkdtemp() dir_b = tempfile.mkdtemp() dir_c = tempfile.mkdtemp() try: db_a, ha = make_handler(dir_a) db_b, hb = make_handler(dir_b) db_c, hc = make_handler(dir_c) now = "2026-06-05T12:00:00" db_a.create_thread("t1", "From A", "", "", "", "aaa", "", now) db_b.create_thread("t2", "From B", "", "", "", "bbb", "", now) db_c.create_thread("t3", "From C", "", "", "", "ccc", "", now) # Seed: A knows B, B knows C db_a.add_known_peer("bbb") db_b.add_known_peer("ccc") # C doesn't know anyone yet # B syncs with C — B sends its known peers (ccc's hash not in B's known peers since B is talking to C) # Actually, B knows C (bbb -> ccc), so when B sends sync request to C, # B includes known_peers. C learns about B. hc.handle_sync_request({ "query": {}, "threads": [dict(db_b.get_thread("t2"))], "posts": [], "upvotes": [], "from_hash": "bbb", "blocks": {"mine": [], "peers": []}, "retractions": [], "known_peers": ["aaa"], # B tells C about A }) # C should now know about B (from auto-discovery of from_hash) and A (from known_peers) known = [r["instance_hash"] for r in db_c.get_synced_instances()] assert "bbb" in known, "C should auto-discover B from from_hash" assert "aaa" in known, "C should discover A from known_peers gossip" finally: import shutil shutil.rmtree(dir_a) shutil.rmtree(dir_b) shutil.rmtree(dir_c) def test_announce_handler(): """Announce handler should auto-discover peers.""" dir_a = tempfile.mkdtemp() try: fdb = ForumDB(dir_a) from tinyweb_forum.sync import _ForumAnnounceHandler # Mock identity — RNS identity.hash returns bytes class MockIdentity: def __init__(self, h): self._hash_hex = h @property def hash(self): return bytes.fromhex(self._hash_hex) local_hex = "aa" * 16 peer_hex = "bb" * 16 handler = _ForumAnnounceHandler(fdb, MockIdentity(local_hex)) class MockAnnouncedIdentity: def __init__(self, h): self._hash_hex = h @property def hash(self): return bytes.fromhex(self._hash_hex) # Announce from a peer handler.received_announce( destination_hash=bytes.fromhex(peer_hex), announced_identity=MockAnnouncedIdentity(peer_hex), app_data=b"tinyweb-forum", ) known = [r["instance_hash"] for r in fdb.get_synced_instances()] assert peer_hex in known, "peer should be added to sync list" # Announce from ourselves should be ignored handler.received_announce( destination_hash=bytes.fromhex(local_hex), announced_identity=MockAnnouncedIdentity(local_hex), app_data=b"tinyweb-forum", ) known = [r["instance_hash"] for r in fdb.get_synced_instances()] assert known.count(local_hex) == 0, "own announce should be ignored" finally: import shutil shutil.rmtree(dir_a) if __name__ == "__main__": test_sync_thread() test_sync_reply() test_sync_upvote() test_sync_blocks_content() test_sync_retraction() test_sync_bidirectional() test_sync_block_gossip() test_sync_updated_content() test_sync_peer_discovery() test_announce_handler() print("all sync tests passed")