add sync tests: thread, reply, upvote, block, retraction, bidirectional, gossip, update

This commit is contained in:
lichenblankie 2026-06-05 00:34:16 +00:00
parent a2d029097d
commit d201fa0bc9

346
tests/test_sync.py Normal file
View file

@ -0,0 +1,346 @@
import json
import os
import tempfile
import time
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from tinyweb_forum.db import ForumDB
from tinyweb_forum.handlers import ForumHandlers
def make_handler(data_dir, site_name="test"):
fdb = ForumDB(data_dir)
handlers = ForumHandlers(fdb, None, None, None, site_name=site_name)
return fdb, handlers
def test_sync_thread():
dir_a = tempfile.mkdtemp()
dir_b = tempfile.mkdtemp()
try:
db_a, ha = make_handler(dir_a)
db_b, hb = make_handler(dir_b)
now = "2026-06-05T12:00:00"
thread_id = "t1"
db_a.create_thread(thread_id, "Hello World", "https://example.com",
"First post", "test,hello",
"abc123", "", now)
# Simulate sync A -> B
hb.handle_sync_request({
"query": {},
"threads": [dict(db_a.get_thread(thread_id))],
"posts": [],
"upvotes": [],
"from_hash": "abc123",
"blocks": {"mine": [], "peers": []},
"retractions": [],
})
t = db_b.get_thread(thread_id)
assert t is not None, "thread should exist in B"
assert t["title"] == "Hello World"
assert t["url"] == "https://example.com"
assert t["tags"] == "test,hello"
assert t["author_instance"] == "abc123"
finally:
import shutil
shutil.rmtree(dir_a)
shutil.rmtree(dir_b)
def test_sync_reply():
dir_a = tempfile.mkdtemp()
dir_b = tempfile.mkdtemp()
try:
db_a, ha = make_handler(dir_a)
db_b, hb = make_handler(dir_b)
now = "2026-06-05T12:00:00"
thread_id = "t1"
db_a.create_thread(thread_id, "Thread", "", "Body", "",
"abc123", "", now)
post_id = "p1"
db_a.create_post(post_id, thread_id, "", "A reply", "def456", "", now)
hb.handle_sync_request({
"query": {},
"threads": [dict(db_a.get_thread(thread_id))],
"posts": [dict(db_a.get_posts(thread_id)[0])],
"upvotes": [],
"from_hash": "abc123",
"blocks": {"mine": [], "peers": []},
"retractions": [],
})
posts = db_b.get_posts(thread_id)
assert len(list(posts)) == 1
assert list(posts)[0]["body"] == "A reply"
finally:
import shutil
shutil.rmtree(dir_a)
shutil.rmtree(dir_b)
def test_sync_upvote():
dir_a = tempfile.mkdtemp()
dir_b = tempfile.mkdtemp()
try:
db_a, ha = make_handler(dir_a)
db_b, hb = make_handler(dir_b)
now = "2026-06-05T12:00:00"
thread_id = "t1"
db_a.create_thread(thread_id, "Thread", "", "", "",
"abc123", "", now)
hb.handle_sync_request({
"query": {},
"threads": [dict(db_a.get_thread(thread_id))],
"posts": [],
"upvotes": [{"thread_id": thread_id, "instance_hash": "xyz789"}],
"from_hash": "abc123",
"blocks": {"mine": [], "peers": []},
"retractions": [],
})
t = db_b.get_thread(thread_id)
assert t is not None
assert t["score"] == 1
finally:
import shutil
shutil.rmtree(dir_a)
shutil.rmtree(dir_b)
def test_sync_blocks_content():
"""Threads from blocked authors should not sync."""
dir_a = tempfile.mkdtemp()
dir_b = tempfile.mkdtemp()
try:
db_a, ha = make_handler(dir_a)
db_b, hb = make_handler(dir_b)
# Block the author
db_b.set_setting("blocked_instances", "abc123")
now = "2026-06-05T12:00:00"
thread_id = "t1"
db_a.create_thread(thread_id, "Blocked Thread", "", "", "",
"abc123", "", now)
hb.handle_sync_request({
"query": {},
"threads": [dict(db_a.get_thread(thread_id))],
"posts": [],
"upvotes": [],
"from_hash": "abc123",
"blocks": {"mine": [], "peers": []},
"retractions": [],
})
t = db_b.get_thread(thread_id)
assert t is None, "blocked author's thread should not sync"
finally:
import shutil
shutil.rmtree(dir_a)
shutil.rmtree(dir_b)
def test_sync_retraction():
"""Retraction signal from author should retract content on peer."""
dir_a = tempfile.mkdtemp()
dir_b = tempfile.mkdtemp()
try:
db_a, ha = make_handler(dir_a)
db_b, hb = make_handler(dir_b)
now = "2026-06-05T12:00:00"
thread_id = "t1"
db_a.create_thread(thread_id, "To Retract", "", "", "",
"abc123", "", now)
# Sync thread to B first
hb.handle_sync_request({
"query": {},
"threads": [dict(db_a.get_thread(thread_id))],
"posts": [], "upvotes": [],
"from_hash": "abc123",
"blocks": {"mine": [], "peers": []},
"retractions": [],
})
assert db_b.get_thread(thread_id) is not None
# Now retract in A and sync
later = "2026-06-05T13:00:00"
db_a.retract_thread(thread_id, "abc123", later)
hb.handle_sync_request({
"query": {},
"threads": [], "posts": [], "upvotes": [],
"from_hash": "abc123",
"blocks": {"mine": [], "peers": []},
"retractions": [{"id": thread_id, "type": "thread",
"author": "abc123", "at": later}],
})
t = db_b.get_thread(thread_id)
assert t is not None
assert t["title"] == "[retracted]"
finally:
import shutil
shutil.rmtree(dir_a)
shutil.rmtree(dir_b)
def test_sync_bidirectional():
"""A and B sync each other's threads."""
dir_a = tempfile.mkdtemp()
dir_b = tempfile.mkdtemp()
try:
db_a, ha = make_handler(dir_a)
db_b, hb = make_handler(dir_b)
now = "2026-06-05T12:00:00"
# A creates a thread
db_a.create_thread("t_a", "From A", "", "", "",
"aaa111", "", now)
# B creates a thread
db_b.create_thread("t_b", "From B", "", "", "",
"bbb222", "", now)
# A syncs B's content
ha.handle_sync_request({
"query": {},
"threads": [dict(db_b.get_thread("t_b"))],
"posts": [], "upvotes": [],
"from_hash": "bbb222",
"blocks": {"mine": [], "peers": []},
"retractions": [],
})
# B syncs A's content
hb.handle_sync_request({
"query": {},
"threads": [dict(db_a.get_thread("t_a"))],
"posts": [], "upvotes": [],
"from_hash": "aaa111",
"blocks": {"mine": [], "peers": []},
"retractions": [],
})
assert db_a.get_thread("t_b") is not None, "A should have B's thread"
assert db_b.get_thread("t_a") is not None, "B should have A's thread"
finally:
import shutil
shutil.rmtree(dir_a)
shutil.rmtree(dir_b)
def test_sync_block_gossip():
"""Peer blocks should be recorded and auto-block at threshold."""
dir_a = tempfile.mkdtemp()
dir_b = tempfile.mkdtemp()
try:
db_a, ha = make_handler(dir_a)
db_b, hb = make_handler(dir_b)
now = "2026-06-05T12:00:00"
# A has blocked "malicious"
db_a.set_setting("blocked_instances", "malicious")
# A syncs to B — B should record A's blocks
hb.handle_sync_request({
"query": {},
"threads": [], "posts": [], "upvotes": [],
"from_hash": "peer_a",
"blocks": {"mine": ["malicious"], "peers": []},
"retractions": [],
})
blocks_a = db_b.get_peer_block_list()
assert "malicious" in blocks_a
# Need 2 more peers to reach threshold 3
hb.handle_sync_request({
"query": {},
"threads": [], "posts": [], "upvotes": [],
"from_hash": "peer_b",
"blocks": {"mine": ["malicious"], "peers": []},
"retractions": [],
})
hb.handle_sync_request({
"query": {},
"threads": [], "posts": [], "upvotes": [],
"from_hash": "peer_c",
"blocks": {"mine": ["malicious"], "peers": []},
"retractions": [],
})
# _apply_peer_blocks would be called during sync loop
from tinyweb_forum.sync import ForumSync
# Mock just the _apply_peer_blocks logic
counts = db_b.get_peer_block_counts()
assert counts.get("malicious", 0) >= 3
finally:
import shutil
shutil.rmtree(dir_a)
shutil.rmtree(dir_b)
def test_sync_updated_content():
"""Updated thread should overwrite older version."""
dir_a = tempfile.mkdtemp()
dir_b = tempfile.mkdtemp()
try:
db_a, ha = make_handler(dir_a)
db_b, hb = make_handler(dir_b)
now = "2026-06-05T12:00:00"
thread_id = "t1"
db_a.create_thread(thread_id, "Old Title", "", "", "",
"abc123", "", now)
hb.handle_sync_request({
"query": {},
"threads": [dict(db_a.get_thread(thread_id))],
"posts": [], "upvotes": [],
"from_hash": "abc123",
"blocks": {"mine": [], "peers": []},
"retractions": [],
})
assert db_b.get_thread(thread_id)["title"] == "Old Title"
# Update in A
later = "2026-06-05T14:00:00"
db_a.update_thread(thread_id, "New Title", "", "", "", later)
# Resync
hb.handle_sync_request({
"query": {"since": [now]},
"threads": [dict(db_a.get_thread(thread_id))],
"posts": [], "upvotes": [],
"from_hash": "abc123",
"blocks": {"mine": [], "peers": []},
"retractions": [],
})
assert db_b.get_thread(thread_id)["title"] == "New Title"
finally:
import shutil
shutil.rmtree(dir_a)
shutil.rmtree(dir_b)
if __name__ == "__main__":
test_sync_thread()
test_sync_reply()
test_sync_upvote()
test_sync_blocks_content()
test_sync_retraction()
test_sync_bidirectional()
test_sync_block_gossip()
test_sync_updated_content()
print("all sync tests passed")