tinyweb-forum/tinyweb_forum/sync.py

195 lines
7.8 KiB
Python

import json
import threading
import time
import RNS
FORUM_APP = "tinyweb-forum"
SYNC_INTERVAL = 300 # 5 minutes
REQUEST_TIMEOUT = 60
class ForumSync:
def __init__(self, fdb, identity, reticulum, handlers_ref):
self.fdb = fdb
self.identity = identity
self.reticulum = reticulum
self.handlers_ref = handlers_ref
self.destination = None
self._running = False
self._thread = None
def start(self):
self.destination = RNS.Destination(
self.identity,
RNS.Destination.IN,
RNS.Destination.SINGLE,
FORUM_APP,
)
self.destination.register_request_handler(
"/forum",
response_generator=self._rns_handler,
allow=RNS.Destination.ALLOW_ALL,
)
self.destination.announce()
self._running = True
self._thread = threading.Thread(target=self._sync_loop, daemon=True)
self._thread.start()
def stop(self):
self._running = False
def _rns_handler(self, path, data, request_id, link_id, remote_identity, requested_at):
if remote_identity:
data["peer_hash"] = remote_identity.hash.hex()
return self.handlers_ref().handle_sync(data)
def _sync_loop(self):
while self._running:
try:
instances = self.fdb.get_synced_instances()
for inst in instances:
if not self._running:
break
try:
self._sync_with(inst["instance_hash"])
except Exception as e:
print(f"[forum] sync error with {inst['instance_hash'][:16]}: {e}")
except Exception:
pass
for _ in range(SYNC_INTERVAL):
if not self._running:
return
time.sleep(1)
def _sync_with(self, instance_hash):
dest_hash = bytes.fromhex(instance_hash)
if not RNS.Transport.has_path(dest_hash):
RNS.Transport.request_path(dest_hash)
elapsed = 0
while not RNS.Transport.has_path(dest_hash) and elapsed < 15:
time.sleep(0.5)
elapsed += 0.5
if not RNS.Transport.has_path(dest_hash):
return
server_identity = RNS.Identity.recall(dest_hash)
if server_identity is None:
return
destination = RNS.Destination(
server_identity,
RNS.Destination.OUT,
RNS.Destination.SINGLE,
FORUM_APP,
)
link = RNS.Link(destination)
elapsed = 0
while link.status == RNS.Link.PENDING and elapsed < 15:
time.sleep(0.25)
elapsed += 0.25
if link.status != RNS.Link.ACTIVE:
return
try:
inst = self.fdb.get_synced_instances()
last_sync = ""
for s in inst:
if s["instance_hash"] == instance_hash:
last_sync = s["last_sync"] or ""
break
since = last_sync.replace(" ", "T") if last_sync else ""
threads, posts = [], []
upvotes = []
if since:
ts, ps, uv = self.fdb.get_new_content(since)
threads = [dict(r) for r in ts]
posts = [dict(r) for r in ps]
upvotes = [{"thread_id": tid, "instance_hash": instance_hash} for tid in uv]
my_blocks = [h.strip() for h in self.fdb.get_setting("blocked_instances", "").split(",") if h.strip()]
my_peer_blocks = self.fdb.get_peer_block_list()
retracted = [{"id": cid, "type": ct, "author": ai, "at": ra}
for cid, ct, ai, ra in self.fdb.get_raw_retractions()]
my_hash = self.identity.hash.hex() if self.identity else "local"
known_peers = [h for h in self.fdb.get_all_known_hashes() if h != instance_hash and h != my_hash]
request_data = {
"query": {"since": [since]} if since else {},
"threads": threads,
"posts": posts,
"upvotes": upvotes,
"from_hash": my_hash,
"blocks": {"mine": my_blocks, "peers": my_peer_blocks},
"retractions": retracted,
"known_peers": known_peers,
}
receipt = link.request("/forum", data=request_data, timeout=REQUEST_TIMEOUT)
elapsed = 0
done = (RNS.RequestReceipt.READY, RNS.RequestReceipt.DELIVERED, RNS.RequestReceipt.FAILED)
while receipt.get_status() not in done and elapsed < REQUEST_TIMEOUT:
time.sleep(0.1)
elapsed += 0.1
if receipt.get_status() in (RNS.RequestReceipt.READY, RNS.RequestReceipt.DELIVERED):
resp = receipt.get_response()
if isinstance(resp, dict) and resp.get("status") == 200:
try:
data = json.loads(resp["body"])
except (json.JSONDecodeError, KeyError):
data = {}
my_blocks = set(h.strip() for h in self.fdb.get_setting("blocked_instances", "").split(",") if h.strip())
for t in data.get("threads", []):
if t.get("author_instance", "") not in my_blocks:
self.fdb.merge_thread(t)
for p in data.get("posts", []):
if p.get("author_instance", "") not in my_blocks:
self.fdb.merge_post(p)
for tid in data.get("upvote_threads", []):
self.fdb.merge_upvote(tid, instance_hash)
# Gossip blocks from peer
peer_blocks = data.get("blocks", {})
for h in peer_blocks.get("mine", []):
if h and h not in my_blocks and instance_hash:
self.fdb.record_peer_block(instance_hash, h)
for h in peer_blocks.get("peers", []):
if h and h not in my_blocks and instance_hash:
self.fdb.record_peer_block(instance_hash, h)
self._apply_peer_blocks()
# Merge incoming retractions
for r in data.get("retractions", []):
if r.get("id") and r.get("type") and r.get("author") and r.get("at"):
self.fdb.merge_retraction(r["id"], r["type"], r["author"], r["at"])
# Discover new peers from gossip
for peer_hash in data.get("known_peers", []):
if peer_hash and peer_hash != my_hash and peer_hash != instance_hash:
self.fdb.add_known_peer(peer_hash)
now = time.strftime("%Y-%m-%dT%H:%M:%S")
self.fdb.set_last_sync(instance_hash, now)
else:
pass
finally:
link.teardown()
def _apply_peer_blocks(self):
counts = self.fdb.get_peer_block_counts()
blocked = set(h.strip() for h in self.fdb.get_setting("blocked_instances", "").split(",") if h.strip())
auto_blocked = set(h.strip() for h in self.fdb.get_setting("auto_blocked_instances", "").split(",") if h.strip())
changed = False
for h, count in counts.items():
if h not in blocked and h not in auto_blocked and count >= 3:
blocked.add(h)
auto_blocked.add(h)
changed = True
if changed:
self.fdb.set_setting("blocked_instances", ",".join(blocked))
self.fdb.set_setting("auto_blocked_instances", ",".join(auto_blocked))
def handle_sync(self, data):
return self.handlers_ref().handle_sync_request(data)