tinyweb-forum/tinyweb_forum/sync.py

146 lines
4.9 KiB
Python

import json
import threading
import time
import RNS
FORUM_APP = "tinyweb-forum"
SYNC_INTERVAL = 300 # 5 minutes
REQUEST_TIMEOUT = 60
class ForumSync:
def __init__(self, fdb, identity, reticulum, handlers_ref):
self.fdb = fdb
self.identity = identity
self.reticulum = reticulum
self.handlers_ref = handlers_ref
self.destination = None
self._running = False
self._thread = None
def start(self):
self.destination = RNS.Destination(
self.identity,
RNS.Destination.IN,
RNS.Destination.SINGLE,
FORUM_APP,
)
self.destination.register_request_handler(
"/forum",
response_generator=self._rns_handler,
allow=RNS.Destination.ALLOW_ALL,
)
self.destination.announce()
self._running = True
self._thread = threading.Thread(target=self._sync_loop, daemon=True)
self._thread.start()
def stop(self):
self._running = False
def _rns_handler(self, path, data, request_id, link_id, remote_identity, requested_at):
return self.handlers_ref().handle_sync(data)
def _sync_loop(self):
while self._running:
try:
instances = self.fdb.get_synced_instances()
for inst in instances:
if not self._running:
break
try:
self._sync_with(inst["instance_hash"])
except Exception as e:
print(f"[forum] sync error with {inst['instance_hash'][:16]}: {e}")
except Exception:
pass
for _ in range(SYNC_INTERVAL):
if not self._running:
return
time.sleep(1)
def _sync_with(self, instance_hash):
dest_hash = bytes.fromhex(instance_hash)
if not RNS.Transport.has_path(dest_hash):
RNS.Transport.request_path(dest_hash)
elapsed = 0
while not RNS.Transport.has_path(dest_hash) and elapsed < 15:
time.sleep(0.5)
elapsed += 0.5
if not RNS.Transport.has_path(dest_hash):
return
server_identity = RNS.Identity.recall(dest_hash)
if server_identity is None:
return
destination = RNS.Destination(
server_identity,
RNS.Destination.OUT,
RNS.Destination.SINGLE,
FORUM_APP,
)
link = RNS.Link(destination)
elapsed = 0
while link.status == RNS.Link.PENDING and elapsed < 15:
time.sleep(0.25)
elapsed += 0.25
if link.status != RNS.Link.ACTIVE:
return
try:
inst = self.fdb.get_synced_instances()
last_sync = ""
for s in inst:
if s["instance_hash"] == instance_hash:
last_sync = s["last_sync"] or ""
break
since = last_sync.replace(" ", "T") if last_sync else ""
threads, posts = [], []
upvotes = []
if since:
ts, ps, uv = self.fdb.get_new_content(since)
threads = [dict(r) for r in ts]
posts = [dict(r) for r in ps]
upvotes = [{"thread_id": tid, "instance_hash": instance_hash} for tid in uv]
request_data = {
"query": {"since": [since]} if since else {},
"threads": threads,
"posts": posts,
"upvotes": upvotes,
}
receipt = link.request("/forum", data=request_data, timeout=REQUEST_TIMEOUT)
elapsed = 0
done = (RNS.RequestReceipt.READY, RNS.RequestReceipt.DELIVERED, RNS.RequestReceipt.FAILED)
while receipt.get_status() not in done and elapsed < REQUEST_TIMEOUT:
time.sleep(0.1)
elapsed += 0.1
if receipt.get_status() in (RNS.RequestReceipt.READY, RNS.RequestReceipt.DELIVERED):
resp = receipt.get_response()
if isinstance(resp, dict) and resp.get("status") == 200:
try:
data = json.loads(resp["body"])
except (json.JSONDecodeError, KeyError):
data = {}
for t in data.get("threads", []):
self.fdb.merge_thread(t)
for p in data.get("posts", []):
self.fdb.merge_post(p)
for tid in data.get("upvote_threads", []):
self.fdb.merge_upvote(tid, instance_hash)
now = time.strftime("%Y-%m-%dT%H:%M:%S")
self.fdb.set_last_sync(instance_hash, now)
else:
pass
finally:
link.teardown()
def handle_sync(self, data):
return self.handlers_ref().handle_sync_request(data)