From a8aabb3427968261f973e0598d1bc9bc8a44747a Mon Sep 17 00:00:00 2001 From: lichenblankie Date: Thu, 4 Jun 2026 08:23:51 +0000 Subject: [PATCH] initial: decentralized link-sharing forum for TinyWeb --- .gitignore | 5 + README.md | 25 ++ pyproject.toml | 19 ++ tinyweb_forum/__init__.py | 34 +++ tinyweb_forum/db.py | 321 +++++++++++++++++++++++ tinyweb_forum/handlers.py | 538 ++++++++++++++++++++++++++++++++++++++ tinyweb_forum/sync.py | 146 +++++++++++ 7 files changed, 1088 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 pyproject.toml create mode 100644 tinyweb_forum/__init__.py create mode 100644 tinyweb_forum/db.py create mode 100644 tinyweb_forum/handlers.py create mode 100644 tinyweb_forum/sync.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a376320 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +__pycache__/ +*.pyc +*.pyo +dist/ +*.egg-info/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..4508429 --- /dev/null +++ b/README.md @@ -0,0 +1,25 @@ +# tinyweb-forum + +A decentralized link-sharing forum for [TinyWeb](https://github.com/derickfay/tinyweb). Share URLs and discuss them with other TinyWeb instances over the Reticulum mesh. + +## Install + +```bash +pip install tinyweb-forum +``` + +Enable the forum in TinyWeb's customize page (`/style`). + +## Development + +```bash +git clone https://github.com/derickfay/tinyweb-forum +pip install -e . +``` + +## How it works + +- Each TinyWeb instance stores forum threads and posts in its own `forum.db` +- Instances sync content with each other over RNS +- Moderation is per-instance: block instances, mute threads, keyword filters +- No global server, no algorithms, no tracking diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..19a0bb7 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,19 @@ +[build-system] +requires = ["setuptools"] +build-backend = "setuptools.backends._legacy:_Backend" + +[project] +name = "tinyweb-forum" +version = "0.1.0" +description = "Decentralized link-sharing forum for TinyWeb" +license = {text = "MIT"} +requires-python = ">=3.9" +dependencies = [ + "rns", +] + +[project.optional-dependencies] +dev = [] + +[tool.setuptools.packages.find] +include = ["tinyweb_forum*"] diff --git a/tinyweb_forum/__init__.py b/tinyweb_forum/__init__.py new file mode 100644 index 0000000..ffdbc9c --- /dev/null +++ b/tinyweb_forum/__init__.py @@ -0,0 +1,34 @@ +from tinyweb_forum.db import ForumDB +from tinyweb_forum.handlers import ForumHandlers +from tinyweb_forum.sync import ForumSync + +FORUM_ENABLED_KEY = "forum_enabled" + + +class ForumPlugin: + def __init__(self, data_dir, identity, reticulum, site_name="me"): + self.fdb = ForumDB(data_dir) + self.handlers = ForumHandlers( + self.fdb, None, identity, reticulum, site_name=site_name + ) + self.sync = ForumSync(self.fdb, identity, reticulum, lambda: self.handlers) + self.handlers.sync = self.sync + self.identity = identity + self.reticulum = reticulum + self._started = False + + def is_enabled(self): + return self.fdb.get_setting(FORUM_ENABLED_KEY, "0") == "1" + + def enable(self): + self.fdb.set_setting(FORUM_ENABLED_KEY, "1") + if not self._started: + self.sync.start() + self._started = True + + def disable(self): + self.fdb.set_setting(FORUM_ENABLED_KEY, "0") + # Keep sync running so we still receive content — disable just hides UI + + def handle(self, method, path, query, body, cookies=None): + return self.handlers.handle(method, path, query, body, cookies) diff --git a/tinyweb_forum/db.py b/tinyweb_forum/db.py new file mode 100644 index 0000000..a38ad5c --- /dev/null +++ b/tinyweb_forum/db.py @@ -0,0 +1,321 @@ +import sqlite3 +import os +import threading + +FORUM_DB = "forum.db" + + +class ForumDB: + def __init__(self, data_dir): + self.path = os.path.join(data_dir, FORUM_DB) + self._pool = [] + self._pool_lock = threading.Lock() + self._POOL_SIZE = 8 + self.init_db() + + def init_db(self): + os.makedirs(os.path.dirname(self.path), exist_ok=True) + db = sqlite3.connect(self.path) + db.execute( + "CREATE TABLE IF NOT EXISTS threads (" + " id TEXT PRIMARY KEY," + " title TEXT NOT NULL," + " url TEXT DEFAULT ''," + " body TEXT DEFAULT ''," + " tags TEXT DEFAULT ''," + " author_instance TEXT NOT NULL," + " author_name TEXT DEFAULT ''," + " created_at TEXT NOT NULL," + " updated_at TEXT NOT NULL," + " score INTEGER DEFAULT 0" + ")" + ) + db.execute( + "CREATE TABLE IF NOT EXISTS posts (" + " id TEXT PRIMARY KEY," + " thread_id TEXT NOT NULL," + " parent_id TEXT DEFAULT ''," + " body TEXT NOT NULL," + " author_instance TEXT NOT NULL," + " author_name TEXT DEFAULT ''," + " created_at TEXT NOT NULL," + " FOREIGN KEY (thread_id) REFERENCES threads(id)" + ")" + ) + db.execute( + "CREATE TABLE IF NOT EXISTS upvotes (" + " thread_id TEXT NOT NULL," + " instance_hash TEXT NOT NULL," + " PRIMARY KEY (thread_id, instance_hash)" + ")" + ) + db.execute( + "CREATE TABLE IF NOT EXISTS synced_instances (" + " instance_hash TEXT PRIMARY KEY," + " name TEXT DEFAULT ''," + " last_sync TEXT DEFAULT ''" + ")" + ) + db.execute( + "CREATE TABLE IF NOT EXISTS settings (" + " key TEXT PRIMARY KEY," + " value TEXT" + ")" + ) + db.execute("CREATE INDEX IF NOT EXISTS idx_posts_thread ON posts(thread_id)") + db.execute("CREATE INDEX IF NOT EXISTS idx_posts_created ON posts(created_at)") + db.execute("CREATE INDEX IF NOT EXISTS idx_threads_updated ON threads(updated_at)") + db.execute("CREATE INDEX IF NOT EXISTS idx_threads_created ON threads(created_at)") + db.commit() + db.close() + + def get_db(self): + with self._pool_lock: + if self._pool: + db = self._pool.pop() + try: + db.execute("SELECT 1") + return db + except Exception: + pass + db = sqlite3.connect(self.path, timeout=10) + db.execute("PRAGMA journal_mode=WAL") + db.row_factory = sqlite3.Row + return db + + def return_db(self, db): + try: + db.rollback() + except Exception: + try: + db.close() + except Exception: + pass + return + with self._pool_lock: + if len(self._pool) < self._POOL_SIZE: + self._pool.append(db) + else: + db.close() + + def get_setting(self, key, default=""): + db = self.get_db() + try: + row = db.execute("SELECT value FROM settings WHERE key = ?", (key,)).fetchone() + return row["value"] if row else default + finally: + self.return_db(db) + + def set_setting(self, key, value): + db = self.get_db() + try: + db.execute( + "INSERT INTO settings (key, value) VALUES (?, ?) " + "ON CONFLICT(key) DO UPDATE SET value=excluded.value", + (key, value), + ) + db.commit() + finally: + self.return_db(db) + + def get_thread(self, thread_id): + db = self.get_db() + try: + return db.execute( + "SELECT * FROM threads WHERE id = ?", (thread_id,) + ).fetchone() + finally: + self.return_db(db) + + def get_posts(self, thread_id): + db = self.get_db() + try: + return db.execute( + "SELECT * FROM posts WHERE thread_id = ? ORDER BY created_at ASC", + (thread_id,), + ).fetchall() + finally: + self.return_db(db) + + def get_threads(self, page=1, per_page=20, tag="", search=""): + db = self.get_db() + try: + offset = (page - 1) * per_page + params = [] + where = [] + if tag: + where.append("t.tags LIKE ?") + params.append(f"%{tag}%") + if search: + where.append("(t.title LIKE ? OR t.body LIKE ?)") + params.extend([f"%{search}%", f"%{search}%"]) + where_clause = (" WHERE " + " AND ".join(where)) if where else "" + total = db.execute( + f"SELECT count(*) FROM threads t{where_clause}", params + ).fetchone()[0] + rows = db.execute( + f"SELECT t.*, (SELECT count(*) FROM posts p WHERE p.thread_id = t.id) AS reply_count " + f"FROM threads t{where_clause} ORDER BY t.updated_at DESC LIMIT ? OFFSET ?", + params + [per_page, offset], + ).fetchall() + return rows, total + finally: + self.return_db(db) + + def create_thread(self, thread_id, title, url, body, tags, author_instance, author_name, now): + db = self.get_db() + try: + db.execute( + "INSERT INTO threads (id, title, url, body, tags, author_instance, author_name, created_at, updated_at) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", + (thread_id, title, url, body, tags, author_instance, author_name, now, now), + ) + db.commit() + finally: + self.return_db(db) + + def create_post(self, post_id, thread_id, parent_id, body, author_instance, author_name, now): + db = self.get_db() + try: + db.execute( + "INSERT INTO posts (id, thread_id, parent_id, body, author_instance, author_name, created_at) " + "VALUES (?, ?, ?, ?, ?, ?, ?)", + (post_id, thread_id, parent_id, body, author_instance, author_name, now), + ) + db.execute("UPDATE threads SET updated_at = ? WHERE id = ?", (now, thread_id)) + db.commit() + finally: + self.return_db(db) + + def toggle_upvote(self, thread_id, instance_hash): + db = self.get_db() + try: + row = db.execute( + "SELECT 1 FROM upvotes WHERE thread_id = ? AND instance_hash = ?", + (thread_id, instance_hash), + ).fetchone() + if row: + db.execute( + "DELETE FROM upvotes WHERE thread_id = ? AND instance_hash = ?", + (thread_id, instance_hash), + ) + db.execute("UPDATE threads SET score = score - 1 WHERE id = ?", (thread_id,)) + delta = -1 + else: + db.execute( + "INSERT INTO upvotes (thread_id, instance_hash) VALUES (?, ?)", + (thread_id, instance_hash), + ) + db.execute("UPDATE threads SET score = score + 1 WHERE id = ?", (thread_id,)) + delta = 1 + db.commit() + return delta + finally: + self.return_db(db) + + def get_synced_instances(self): + db = self.get_db() + try: + return db.execute("SELECT * FROM synced_instances").fetchall() + finally: + self.return_db(db) + + def upsert_synced_instance(self, instance_hash, name=""): + db = self.get_db() + try: + db.execute( + "INSERT INTO synced_instances (instance_hash, name) VALUES (?, ?) " + "ON CONFLICT(instance_hash) DO UPDATE SET name=excluded.name", + (instance_hash, name), + ) + db.commit() + finally: + self.return_db(db) + + def remove_synced_instance(self, instance_hash): + db = self.get_db() + try: + db.execute("DELETE FROM synced_instances WHERE instance_hash = ?", (instance_hash,)) + db.commit() + finally: + self.return_db(db) + + def update_last_sync(self, instance_hash, now): + db = self.get_db() + try: + db.execute( + "UPDATE synced_instances SET last_sync = ? WHERE instance_hash = ?", + (now, instance_hash), + ) + db.commit() + finally: + self.return_db(db) + + def set_last_sync(self, instance_hash, timestamp): + self.update_last_sync(instance_hash, timestamp) + + def get_new_content(self, since): + db = self.get_db() + try: + threads = db.execute( + "SELECT * FROM threads WHERE updated_at > ? ORDER BY updated_at ASC", + (since,), + ).fetchall() + posts = db.execute( + "SELECT * FROM posts WHERE created_at > ? ORDER BY created_at ASC", + (since,), + ).fetchall() + upvote_threads = db.execute( + "SELECT thread_id FROM upvotes u " + "WHERE NOT EXISTS (SELECT 1 FROM threads t WHERE t.id = u.thread_id AND t.updated_at > ?)", + (since,), + ).fetchall() + return threads, posts, [r["thread_id"] for r in upvote_threads] + finally: + self.return_db(db) + + def merge_thread(self, thread): + db = self.get_db() + try: + existing = db.execute("SELECT updated_at FROM threads WHERE id = ?", (thread["id"],)).fetchone() + if existing and existing["updated_at"] >= thread["updated_at"]: + return + db.execute( + "INSERT INTO threads (id, title, url, body, tags, author_instance, author_name, created_at, updated_at, score) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) " + "ON CONFLICT(id) DO UPDATE SET " + "title=excluded.title, url=excluded.url, body=excluded.body, " + "tags=excluded.tags, updated_at=excluded.updated_at", + (thread["id"], thread["title"], thread["url"], thread["body"], + thread["tags"], thread["author_instance"], thread["author_name"], + thread["created_at"], thread["updated_at"], thread["score"]), + ) + db.commit() + finally: + self.return_db(db) + + def merge_post(self, post): + db = self.get_db() + try: + db.execute( + "INSERT OR IGNORE INTO posts (id, thread_id, parent_id, body, author_instance, author_name, created_at) " + "VALUES (?, ?, ?, ?, ?, ?, ?)", + (post["id"], post["thread_id"], post["parent_id"], post["body"], + post["author_instance"], post["author_name"], post["created_at"]), + ) + db.commit() + finally: + self.return_db(db) + + def merge_upvote(self, thread_id, instance_hash): + db = self.get_db() + try: + db.execute( + "INSERT OR IGNORE INTO upvotes (thread_id, instance_hash) VALUES (?, ?)", + (thread_id, instance_hash), + ) + if db.total_changes: + db.execute("UPDATE threads SET score = score + 1 WHERE id = ?", (thread_id,)) + db.commit() + finally: + self.return_db(db) diff --git a/tinyweb_forum/handlers.py b/tinyweb_forum/handlers.py new file mode 100644 index 0000000..c44bfa8 --- /dev/null +++ b/tinyweb_forum/handlers.py @@ -0,0 +1,538 @@ +import json +import secrets +import threading +from datetime import datetime +from urllib.parse import unquote + + +PER_PAGE = 20 +RECENT_SECONDS = 86400 * 7 # "new" = within last 7 days + + +def esc(s): + import html + return html.escape(str(s)) + + +class ForumHandlers: + def __init__(self, fdb, sync, identity, reticulum, site_name="me"): + self.fdb = fdb + self.sync = sync + self.identity = identity + self.reticulum = reticulum + self.site_name = site_name + self._request_local = threading.local() + + def _get_csrf(self): + return getattr(self._request_local, 'csrf_token', '') + + def _csrf_field(self): + token = self._get_csrf() + return f'' + + def _check_csrf(self, body): + token = body.get("_csrf", [""])[0] + expected = self._get_csrf() + if not expected or not token: + return False + return secrets.compare_digest(token, expected) + + def _respond(self, body_html, status=200): + return { + "status": status, + "content_type": "text/html; charset=utf-8", + "body": body_html, + "headers": {}, + } + + def _json(self, data, status=200): + return { + "status": status, + "content_type": "application/json", + "body": json.dumps(data), + "headers": {}, + } + + def _redirect(self, location): + return { + "status": 302, + "content_type": "text/html; charset=utf-8", + "body": "", + "headers": {"Location": location}, + } + + def _error(self, status): + return self._respond(f"

{status}

", status) + + def _paginate(self, query): + try: + p = int(query.get("p", ["1"])[0]) + except (ValueError, IndexError): + p = 1 + return max(1, p) + + def _page_nav(self, page, total, base_url): + if total <= PER_PAGE: + return "" + total_pages = (total + PER_PAGE - 1) // PER_PAGE + sep = "&" if "?" in base_url else "?" + parts = [] + if page > 1: + parts.append(f'« prev') + parts.append(f"page {page} of {total_pages}") + if page < total_pages: + parts.append(f'next »') + return f'

{" | ".join(parts)}

' + + def _now(self): + return datetime.now().strftime("%Y-%m-%dT%H:%M:%S") + + def _time_ago(self, ts): + try: + dt = datetime.strptime(ts, "%Y-%m-%dT%H:%M:%S") + except (ValueError, TypeError): + return ts + delta = datetime.now() - dt + if delta.days > 365: + return f"{delta.days // 365}y ago" + if delta.days > 30: + return f"{delta.days // 30}mo ago" + if delta.days > 0: + return f"{delta.days}d ago" + if delta.seconds >= 3600: + return f"{delta.seconds // 3600}h ago" + if delta.seconds >= 60: + return f"{delta.seconds // 60}m ago" + return "just now" + + def _is_new(self, ts): + try: + dt = datetime.strptime(ts, "%Y-%m-%dT%H:%M:%S") + except (ValueError, TypeError): + return False + return (datetime.now() - dt).total_seconds() < RECENT_SECONDS + + def _blocked_instances(self): + raw = self.fdb.get_setting("blocked_instances", "") + return set(h.strip() for h in raw.split(",") if h.strip()) + + def _muted_threads(self): + raw = self.fdb.get_setting("muted_threads", "") + return set(h.strip() for h in raw.split(",") if h.strip()) + + def _keyword_filters(self): + raw = self.fdb.get_setting("keyword_filters", "") + return [k.strip().lower() for k in raw.split(",") if k.strip()] + + def _passes_filters(self, thread): + blocked = self._blocked_instances() + if thread["author_instance"] in blocked: + return False + keywords = self._keyword_filters() + if keywords: + text = (thread["title"] + " " + thread["body"]).lower() + if any(k in text for k in keywords): + return False + return True + + # --- Routes --- + + def handle_list(self, query): + page = self._paginate(query) + tag = unquote(query.get("tag", [""])[0]).strip() + search = query.get("q", [""])[0].strip() + rows, total = self.fdb.get_threads(page=page, per_page=PER_PAGE, tag=tag, search=search) + muted = self._muted_threads() + new_count = 0 + items = "" + for r in rows: + if r["id"] in muted: + continue + if self._is_new(r["created_at"]): + new_count += 1 + badge = "[share]" if r["url"] else "[request]" + tags_html = "" + if r["tags"]: + tag_links = " ".join( + f'[{esc(t.strip())}]' + for t in r["tags"].split(",") if t.strip() + ) + tags_html = f' {tag_links}' + reply_label = f"{r['reply_count']} replies" if r['reply_count'] else "no replies" + items += ( + f'
  • ' + f'{badge} ' + f'{esc(r["title"])}' + f'{tags_html}' + f'
    ' + f'{esc(r["author_name"] or r["author_instance"][:8])}' + f' · {self._time_ago(r["created_at"])}' + f' · {r["score"]} upvotes' + f' · {reply_label}' + f'
  • ' + ) + if not items: + items = "

    No threads yet.

    " + new_label = f" ({new_count} new)" if new_count else "" + search_form = ( + f'
    ' + f'' + f'
    ' + ) + tag_label = f' — tag: {esc(tag)}' if tag else "" + return self._respond( + f"

    forum{tag_label}

    " + f"

    {search_form}" + f' + new thread' + f' mod' + f"

    " + f"

    {total} threads{new_label}

    " + f"" + f"{self._page_nav(page, total, f'/forum?q={esc(search)}&tag={esc(tag)}' if search or tag else '/forum')}" + ) + + def handle_new_form(self, msg=""): + return self._respond( + f"

    new thread

    " + f'
    ' + f'{self._csrf_field()}' + f'

    ' + f'

    ' + f'

    ' + f'

    ' + f'' + f"
    " + f"

    {msg}

    " + f'back' + ) + + def handle_new_submit(self, body): + title = body.get("title", [""])[0].strip() + url = body.get("url", [""])[0].strip() + body_text = body.get("body", [""])[0].strip() + tags = body.get("tags", [""])[0].strip() + if not title: + return self.handle_new_form("Title is required.") + thread_id = secrets.token_hex(16) + author_instance = self.identity.hash.hex() if self.identity else "local" + author_name = self.site_name + now = self._now() + self.fdb.create_thread(thread_id, title, url, body_text, tags, author_instance, author_name, now) + return self._redirect(f"/forum/t/{thread_id}") + + def handle_thread(self, thread_id, query=None): + thread = self.fdb.get_thread(thread_id) + if not thread: + return self._error(404) + posts = self.fdb.get_posts(thread_id) + muted = self._muted_threads() + is_muted = thread["id"] in muted + + badge = "[share]" if thread["url"] else "[request]" + url_html = "" + if thread["url"]: + url_html = ( + f'

    {esc(thread["url"])}' + f' (+ save to my index)

    ' + ) + tags_html = "" + if thread["tags"]: + tag_links = " ".join( + f'[{esc(t.strip())}]' + for t in thread["tags"].split(",") if t.strip() + ) + tags_html = f'

    {tag_links}

    ' + + body_html = f"

    {esc(thread['body'])}

    " if thread["body"] else "" + + mute_btn = ( + f'
    ' + f'{self._csrf_field()}
    ' + if is_muted else + f'
    ' + f'{self._csrf_field()}
    ' + ) + + posts_html = "" + for p in posts: + save_links = "" + for word in p["body"].split(): + w = word.strip().strip(",.!?;:") + if w.startswith(("http://", "https://")): + save_links += ( + f' + save' + ) + parent_ref = "" + if p["parent_id"]: + parent_ref = f' ↪ reply' + posts_html += ( + f'
    ' + f'{esc(p["author_name"] or p["author_instance"][:8])}' + f' · {self._time_ago(p["created_at"])}{parent_ref}' + f'

    {esc(p["body"])}

    ' + f'{save_links}' + f'
    ' + ) + + reply_form = ( + f'
    ' + f'{self._csrf_field()}' + f'
    ' + f'' + f"
    " + ) + + return self._respond( + f"

    {badge} {esc(thread['title'])}

    " + f'

    ' + f'by {esc(thread["author_name"] or thread["author_instance"][:8])}' + f' · {self._time_ago(thread["created_at"])}' + f' · {thread["score"]} upvotes' + f' · {mute_btn}' + f' ·

    ' + f'{self._csrf_field()}
    ' + f'

    ' + f'{url_html}' + f'{body_html}' + f'{tags_html}' + f"
    " + f"{posts_html}" + f"
    " + f"{reply_form}" + f'back to forum' + ) + + def handle_reply(self, thread_id, body): + body_text = body.get("body", [""])[0].strip() + if not body_text: + return self._redirect(f"/forum/t/{thread_id}") + parent_id = body.get("parent_id", [""])[0].strip() + author_instance = self.identity.hash.hex() if self.identity else "local" + author_name = self.site_name + post_id = secrets.token_hex(16) + now = self._now() + self.fdb.create_post(post_id, thread_id, parent_id, body_text, author_instance, author_name, now) + return self._redirect(f"/forum/t/{thread_id}") + + def handle_upvote(self, thread_id, body): + thread = self.fdb.get_thread(thread_id) + if not thread: + return self._error(404) + instance_hash = self.identity.hash.hex() if self.identity else "local" + self.fdb.toggle_upvote(thread_id, instance_hash) + return self._redirect(f"/forum/t/{thread_id}") + + def handle_mute(self, thread_id): + muted = self._muted_threads() + muted.add(thread_id) + self.fdb.set_setting("muted_threads", ",".join(muted)) + return self._redirect(f"/forum") + + def handle_unmute(self, thread_id): + muted = self._muted_threads() + muted.discard(thread_id) + self.fdb.set_setting("muted_threads", ",".join(muted)) + return self._redirect(f"/forum/t/{thread_id}") + + def handle_moderation(self, msg=""): + blocked = self._blocked_instances() + blocked_items = "" + if blocked: + for h in sorted(blocked): + blocked_items += ( + f'
  • {esc(h[:16])}... ' + f'
    ' + f'{self._csrf_field()}' + f'' + f'
    ' + f'
  • ' + ) + blocked_items = f"" + else: + blocked_items = "

    No instances blocked.

    " + + filters = self._keyword_filters() + filters_str = ", ".join(filters) if filters else "" + + synced = self.fdb.get_synced_instances() + synced_items = "" + for s in synced: + synced_items += ( + f'
  • {esc(s["name"] or s["instance_hash"][:16])}... ' + f'
    ' + f'{self._csrf_field()}' + f'' + f'
    ' + f'
  • ' + ) + synced_items = f"" if synced_items else "

    No instances synced yet.

    " + + return self._respond( + f"

    forum moderation

    " + f"

    {msg}

    " + f"

    blocked instances

    " + f"{blocked_items}" + f'
    ' + f'{self._csrf_field()}' + f' ' + f'' + f"
    " + f"

    keyword filters

    " + f'
    ' + f'{self._csrf_field()}' + f'' + f'' + f"
    " + f"

    synced instances

    " + f"{synced_items}" + f'
    ' + f'{self._csrf_field()}' + f' ' + f' ' + f'' + f"
    " + f'
    ' + f'back to forum' + ) + + def handle_block(self, body): + instance = body.get("instance", [""])[0].strip().replace("<", "").replace(">", "") + if len(instance) != 32: + return self.handle_moderation("Invalid instance hash (must be 32 hex chars).") + blocked = self._blocked_instances() + blocked.add(instance) + self.fdb.set_setting("blocked_instances", ",".join(blocked)) + return self.handle_moderation(f"Blocked {instance[:16]}...") + + def handle_unblock(self, body): + instance = body.get("instance", [""])[0].strip() + blocked = self._blocked_instances() + blocked.discard(instance) + self.fdb.set_setting("blocked_instances", ",".join(blocked)) + return self.handle_moderation(f"Unblocked {instance[:16]}...") + + def handle_filters(self, body): + keywords = body.get("keywords", [""])[0].strip() + self.fdb.set_setting("keyword_filters", keywords) + return self.handle_moderation("Filters saved.") + + def handle_sync_add(self, body): + instance = body.get("instance", [""])[0].strip().replace("<", "").replace(">", "") + name = body.get("name", [""])[0].strip() + if len(instance) != 32: + return self.handle_moderation("Invalid instance hash (must be 32 hex chars).") + self.fdb.upsert_synced_instance(instance, name) + return self.handle_moderation(f"Added {name or instance[:16]}... to sync.") + + def handle_unsync(self, body): + instance = body.get("instance", [""])[0].strip() + self.fdb.remove_synced_instance(instance) + return self.handle_moderation("Removed.") + + # --- Sync endpoint (called over RNS) --- + + def handle_sync_request(self, data): + """Handle incoming sync request from another forum instance.""" + since = data.get("query", {}).get("since", [""])[0] if isinstance(data.get("query"), dict) else "" + incoming_threads = data.get("threads", []) + incoming_posts = data.get("posts", []) + incoming_upvotes = data.get("upvotes", []) + + if incoming_threads: + for t in incoming_threads: + self.fdb.merge_thread(t) + if incoming_posts: + for p in incoming_posts: + self.fdb.merge_post(p) + if incoming_upvotes: + for uv in incoming_upvotes: + self.fdb.merge_upvote(uv["thread_id"], uv["instance_hash"]) + + threads, posts, upvote_threads = [], [], [] + if since: + ts, posts_list, up_list = self.fdb.get_new_content(since) + threads = [dict(r) for r in ts] + posts = [dict(r) for r in posts_list] + upvote_threads = up_list + + return { + "status": 200, + "content_type": "application/json", + "body": json.dumps({ + "threads": threads, + "posts": posts, + "upvote_threads": upvote_threads, + }), + "headers": {}, + } + + def handle_sync_add_instance(self, body): + """Add instance for sync (from moderation page action).""" + return self.handle_sync_add(body) + + # --- Router --- + + def _with_csrf(self, resp, csrf_token): + resp.setdefault("headers", {}) + if resp.get("content_type", "").startswith("text/html"): + resp["headers"]["Set-Cookie"] = ( + f"_csrf={csrf_token}; SameSite=Strict; HttpOnly; Path=/forum" + ) + return resp + + def handle(self, method, path, query, body, cookies=None): + csrf_token = (cookies or {}).get("_csrf", "") + if not csrf_token: + csrf_token = secrets.token_hex(32) + self._request_local.csrf_token = csrf_token + + if not path.startswith("/forum"): + return self._with_csrf(self._error(404), csrf_token) + + sub = path[len("/forum"):] + + if method == "GET": + if sub == "" or sub == "/": + return self._with_csrf(self.handle_list(query), csrf_token) + elif sub == "/new": + return self._with_csrf(self.handle_new_form(), csrf_token) + elif sub == "/moderation": + return self._with_csrf(self.handle_moderation(), csrf_token) + elif sub.startswith("/t/"): + tid = sub[3:] + return self._with_csrf(self.handle_thread(tid, query), csrf_token) + elif method == "POST": + if not self._check_csrf(body): + return self._with_csrf( + self._respond("

    403 Forbidden

    ", status=403), csrf_token + ) + if sub == "/new": + return self._with_csrf(self.handle_new_submit(body), csrf_token) + elif sub.startswith("/t/"): + rest = sub[3:] + if "/reply" in rest: + tid = rest.split("/reply")[0] + return self._with_csrf(self.handle_reply(tid, body), csrf_token) + elif rest.endswith("/upvote"): + tid = rest[:-7] + return self._with_csrf(self.handle_upvote(tid, body), csrf_token) + elif sub.startswith("/mute/"): + return self._with_csrf(self.handle_mute(sub[6:]), csrf_token) + elif sub.startswith("/unmute/"): + return self._with_csrf(self.handle_unmute(sub[8:]), csrf_token) + elif sub == "/block": + return self._with_csrf(self.handle_block(body), csrf_token) + elif sub == "/unblock": + return self._with_csrf(self.handle_unblock(body), csrf_token) + elif sub == "/filters": + return self._with_csrf(self.handle_filters(body), csrf_token) + elif sub == "/sync/add": + return self._with_csrf(self.handle_sync_add(body), csrf_token) + elif sub == "/unsync": + return self._with_csrf(self.handle_unsync(body), csrf_token) + + return self._with_csrf(self._error(404), csrf_token) + + def handle_sync(self, data): + """Entry point for incoming RNS sync requests.""" + return self.handle_sync_request(data) diff --git a/tinyweb_forum/sync.py b/tinyweb_forum/sync.py new file mode 100644 index 0000000..32133b8 --- /dev/null +++ b/tinyweb_forum/sync.py @@ -0,0 +1,146 @@ +import json +import threading +import time +import RNS + +FORUM_APP = "tinyweb-forum" +SYNC_INTERVAL = 300 # 5 minutes +REQUEST_TIMEOUT = 60 + + +class ForumSync: + def __init__(self, fdb, identity, reticulum, handlers_ref): + self.fdb = fdb + self.identity = identity + self.reticulum = reticulum + self.handlers_ref = handlers_ref + self.destination = None + self._running = False + self._thread = None + + def start(self): + self.destination = RNS.Destination( + self.identity, + RNS.Destination.IN, + RNS.Destination.SINGLE, + FORUM_APP, + ) + self.destination.register_request_handler( + "/forum", + response_generator=self._rns_handler, + allow=RNS.Destination.ALLOW_ALL, + ) + self.destination.announce() + self._running = True + self._thread = threading.Thread(target=self._sync_loop, daemon=True) + self._thread.start() + + def stop(self): + self._running = False + + def _rns_handler(self, path, data, request_id, link_id, remote_identity, requested_at): + return self.handlers_ref().handle_sync(data) + + def _sync_loop(self): + while self._running: + try: + instances = self.fdb.get_synced_instances() + for inst in instances: + if not self._running: + break + try: + self._sync_with(inst["instance_hash"]) + except Exception as e: + print(f"[forum] sync error with {inst['instance_hash'][:16]}: {e}") + except Exception: + pass + for _ in range(SYNC_INTERVAL): + if not self._running: + return + time.sleep(1) + + def _sync_with(self, instance_hash): + dest_hash = bytes.fromhex(instance_hash) + if not RNS.Transport.has_path(dest_hash): + RNS.Transport.request_path(dest_hash) + elapsed = 0 + while not RNS.Transport.has_path(dest_hash) and elapsed < 15: + time.sleep(0.5) + elapsed += 0.5 + if not RNS.Transport.has_path(dest_hash): + return + + server_identity = RNS.Identity.recall(dest_hash) + if server_identity is None: + return + + destination = RNS.Destination( + server_identity, + RNS.Destination.OUT, + RNS.Destination.SINGLE, + FORUM_APP, + ) + + link = RNS.Link(destination) + elapsed = 0 + while link.status == RNS.Link.PENDING and elapsed < 15: + time.sleep(0.25) + elapsed += 0.25 + + if link.status != RNS.Link.ACTIVE: + return + + try: + inst = self.fdb.get_synced_instances() + last_sync = "" + for s in inst: + if s["instance_hash"] == instance_hash: + last_sync = s["last_sync"] or "" + break + + since = last_sync.replace(" ", "T") if last_sync else "" + + threads, posts = [], [] + upvotes = [] + if since: + ts, ps, uv = self.fdb.get_new_content(since) + threads = [dict(r) for r in ts] + posts = [dict(r) for r in ps] + upvotes = [{"thread_id": tid, "instance_hash": instance_hash} for tid in uv] + + request_data = { + "query": {"since": [since]} if since else {}, + "threads": threads, + "posts": posts, + "upvotes": upvotes, + } + + receipt = link.request("/forum", data=request_data, timeout=REQUEST_TIMEOUT) + elapsed = 0 + done = (RNS.RequestReceipt.READY, RNS.RequestReceipt.DELIVERED, RNS.RequestReceipt.FAILED) + while receipt.get_status() not in done and elapsed < REQUEST_TIMEOUT: + time.sleep(0.1) + elapsed += 0.1 + + if receipt.get_status() in (RNS.RequestReceipt.READY, RNS.RequestReceipt.DELIVERED): + resp = receipt.get_response() + if isinstance(resp, dict) and resp.get("status") == 200: + try: + data = json.loads(resp["body"]) + except (json.JSONDecodeError, KeyError): + data = {} + for t in data.get("threads", []): + self.fdb.merge_thread(t) + for p in data.get("posts", []): + self.fdb.merge_post(p) + for tid in data.get("upvote_threads", []): + self.fdb.merge_upvote(tid, instance_hash) + now = time.strftime("%Y-%m-%dT%H:%M:%S") + self.fdb.set_last_sync(instance_hash, now) + else: + pass + finally: + link.teardown() + + def handle_sync(self, data): + return self.handlers_ref().handle_sync_request(data)