Add Reticulum-native subscriptions and sync-based distributed search

- Subscriptions now use Reticulum destination hashes instead of HTTP URLs
- All subscription syncing happens over encrypted RNS links (rns_client.py)
- Add remote_pages table for synced content from subscriptions
- Search results now include pages from synced subscriptions, grouped by source
- Remove HTTP dependency from subscription handlers

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Derick Phan 2026-03-25 22:51:22 -07:00
parent f609f867ef
commit 9a9b5e0617
No known key found for this signature in database
3 changed files with 201 additions and 56 deletions

37
db.py
View file

@ -47,12 +47,27 @@ def init_db():
db.execute(
"CREATE TABLE IF NOT EXISTS subscriptions ("
" id INTEGER PRIMARY KEY AUTOINCREMENT,"
" url TEXT UNIQUE NOT NULL,"
" dest_hash TEXT UNIQUE NOT NULL,"
" name TEXT DEFAULT '',"
" auto_sync INTEGER DEFAULT 0,"
" last_sync TEXT DEFAULT ''"
")"
)
db.execute(
"CREATE TABLE IF NOT EXISTS remote_pages ("
" id INTEGER PRIMARY KEY AUTOINCREMENT,"
" subscription_id INTEGER NOT NULL,"
" url TEXT NOT NULL,"
" title TEXT,"
" note TEXT DEFAULT '',"
" FOREIGN KEY (subscription_id) REFERENCES subscriptions(id) ON DELETE CASCADE,"
" UNIQUE(subscription_id, url)"
")"
)
db.execute(
"CREATE VIRTUAL TABLE IF NOT EXISTS remote_pages_fts "
"USING fts5(title, url, note, content=remote_pages, content_rowid=id)"
)
db.executescript("""
CREATE TRIGGER IF NOT EXISTS pages_ai AFTER INSERT ON pages BEGIN
INSERT INTO pages_fts(rowid, title, body, url, note)
@ -68,7 +83,27 @@ def init_db():
INSERT INTO pages_fts(rowid, title, body, url, note)
VALUES (new.id, new.title, new.body, new.url, new.note);
END;
CREATE TRIGGER IF NOT EXISTS remote_pages_ai AFTER INSERT ON remote_pages BEGIN
INSERT INTO remote_pages_fts(rowid, title, url, note)
VALUES (new.id, new.title, new.url, new.note);
END;
CREATE TRIGGER IF NOT EXISTS remote_pages_ad AFTER DELETE ON remote_pages BEGIN
INSERT INTO remote_pages_fts(remote_pages_fts, rowid, title, url, note)
VALUES ('delete', old.id, old.title, old.url, old.note);
END;
CREATE TRIGGER IF NOT EXISTS remote_pages_au AFTER UPDATE ON remote_pages BEGIN
INSERT INTO remote_pages_fts(remote_pages_fts, rowid, title, url, note)
VALUES ('delete', old.id, old.title, old.url, old.note);
INSERT INTO remote_pages_fts(rowid, title, url, note)
VALUES (new.id, new.title, new.url, new.note);
END;
""")
# Migrate old subscriptions table if needed
cols = [row[1] for row in db.execute("PRAGMA table_info(subscriptions)").fetchall()]
if "url" in cols and "dest_hash" not in cols:
db.execute("ALTER TABLE subscriptions RENAME COLUMN url TO dest_hash")
db.commit()
db.commit()
db.close()

View file

@ -1,9 +1,9 @@
import json
from datetime import datetime
import requests
from db import get_db, get_setting, set_setting, get_site_name, index_url
from templates import esc, snippet, wrap_page
from rns_client import fetch_remote_sites
def _respond(body_html, status=200):
@ -112,7 +112,42 @@ def handle_search(query):
f'</details>'
)
# search synced pages from subscriptions
remote_rows = db.execute(
"SELECT rp.url, rp.title, rp.note, s.name AS source_name "
"FROM remote_pages_fts rpf "
"JOIN remote_pages rp ON rpf.rowid = rp.id "
"JOIN subscriptions s ON rp.subscription_id = s.id "
"WHERE remote_pages_fts MATCH ? ORDER BY rank LIMIT 50",
(q,),
).fetchall()
remote_html = ""
if q and remote_rows:
# group by source
by_source = {}
for r in remote_rows:
source = r["source_name"] or "unknown"
by_source.setdefault(source, []).append(r)
for source, items in by_source.items():
source_items = ""
for r in items:
note_html = f' — <em>{esc(r["note"])}</em>' if r["note"] else ""
source_items += (
f'<li><a href="{esc(r["url"])}">{esc(r["title"])}</a>'
f'{note_html} <small>({esc(r["url"])})</small></li>'
)
remote_html += (
f'<details class="remote" open>'
f'<summary>from {esc(source)} ({len(items)})</summary>'
f'<ul>{source_items}</ul>'
f'</details>'
)
db.close()
sub_count = ""
if q and remote_rows:
sub_count = f" + {len(remote_rows)} from subscriptions"
return _respond(
f'<h1><a href="/">{esc(name)}</a></h1>'
f'<form method="get" action="/">'
@ -124,7 +159,7 @@ def handle_search(query):
f' | <a href="/pages">browse</a>'
f' | <a href="/subscriptions">subscriptions</a>'
f' | <a href="/style">customize</a></p>'
f'<hr>{result_html}{trusted_html}'
f'<hr>{result_html}{trusted_html}{remote_html}'
)
@ -348,7 +383,7 @@ def handle_subscriptions(msg=""):
last = s["last_sync"] or "never"
items += (
f'<tr>'
f'<td><b>{esc(s["name"] or "unknown")}</b><br><small>{esc(s["url"])}</small></td>'
f'<td><b>{esc(s["name"] or "unknown")}</b><br><small>{esc(s["dest_hash"])}</small></td>'
f'<td>{esc(last)}</td>'
f'<td>'
f'<form method="post" action="/subscriptions/autosync/{s["id"]}" style="display:inline">'
@ -374,7 +409,7 @@ def handle_subscriptions(msg=""):
return _respond(
f"<h1>subscriptions</h1>"
f'<form method="post" action="/subscriptions/add">'
f'<input name="url" placeholder="http://friend:5001" size="40"> '
f'<input name="dest_hash" placeholder="destination hash" size="40"> '
f'<button>subscribe</button>'
f'</form>'
f'<p>{msg}</p>'
@ -384,29 +419,31 @@ def handle_subscriptions(msg=""):
def handle_subscription_add(body):
url = body.get("url", [""])[0].strip().rstrip("/")
if not url or not url.startswith(("http://", "https://")):
return handle_subscriptions("URL must start with http:// or https://")
dest_hash = body.get("dest_hash", [""])[0].strip().replace("<", "").replace(">", "")
if not dest_hash or len(dest_hash) != 32:
return handle_subscriptions("Enter a valid 32-character destination hash.")
try:
resp = requests.get(f"{url}/api/sites", timeout=5)
if resp.status_code == 403:
return handle_subscriptions("That instance has sharing disabled.")
resp.raise_for_status()
data = resp.json()
int(dest_hash, 16)
except ValueError:
return handle_subscriptions("Invalid destination hash (must be hex).")
try:
data = fetch_remote_sites(dest_hash)
name = data.get("name", "")
except PermissionError:
return handle_subscriptions("That instance has sharing disabled.")
except Exception as e:
return handle_subscriptions(f"Could not reach that instance: {esc(str(e))}")
db = get_db()
try:
db.execute(
"INSERT INTO subscriptions (url, name) VALUES (?, ?) "
"ON CONFLICT(url) DO UPDATE SET name=excluded.name",
(url, name),
"INSERT INTO subscriptions (dest_hash, name) VALUES (?, ?) "
"ON CONFLICT(dest_hash) DO UPDATE SET name=excluded.name",
(dest_hash, name),
)
db.commit()
finally:
db.close()
return handle_subscriptions(f"Subscribed to {esc(name or url)}.")
return handle_subscriptions(f"Subscribed to {esc(name or dest_hash)}.")
def handle_subscription_browse(sub_id):
@ -416,13 +453,22 @@ def handle_subscription_browse(sub_id):
db.close()
return _error(404)
local_urls = set(r["url"] for r in db.execute("SELECT url FROM pages").fetchall())
# Use locally synced data if available, otherwise fetch live
remote_rows = db.execute(
"SELECT url, title, note FROM remote_pages WHERE subscription_id = ?",
(sub_id,),
).fetchall()
db.close()
if remote_rows:
sites = [{"url": r["url"], "title": r["title"], "note": r["note"]} for r in remote_rows]
else:
try:
resp = requests.get(f"{sub['url']}/api/sites", timeout=5)
if resp.status_code == 403:
data = fetch_remote_sites(sub["dest_hash"])
sites = data.get("sites", [])
except PermissionError:
return handle_subscriptions("That instance has sharing disabled.")
resp.raise_for_status()
sites = resp.json().get("sites", [])
except Exception as e:
return handle_subscriptions(f"Could not fetch sites: {esc(str(e))}")
@ -448,7 +494,7 @@ def handle_subscription_browse(sub_id):
if new_count:
buttons = '<button>import selected</button> <button name="import_all" value="1">import all new</button>'
return _respond(
f'<h1>browsing: {esc(sub["name"] or sub["url"])}</h1>'
f'<h1>browsing: {esc(sub["name"] or sub["dest_hash"])}</h1>'
f'<p>{len(sites)} site(s) available, {new_count} new</p>'
f'<form method="post" action="/subscriptions/pick">'
f'<input type="hidden" name="sub_id" value="{sub_id}">'
@ -466,18 +512,12 @@ def handle_subscription_pick(body):
if import_all:
db = get_db()
sub = db.execute("SELECT * FROM subscriptions WHERE id = ?", (sub_id,)).fetchone()
local_urls = set(r["url"] for r in db.execute("SELECT url FROM pages").fetchall())
remote = db.execute(
"SELECT url FROM remote_pages WHERE subscription_id = ?", (sub_id,)
).fetchall()
db.close()
if not sub:
return handle_subscriptions("Subscription not found.")
try:
resp = requests.get(f"{sub['url']}/api/sites", timeout=5)
resp.raise_for_status()
sites = resp.json().get("sites", [])
except Exception as e:
return handle_subscriptions(f"Error: {esc(str(e))}")
urls = [s["url"] for s in sites if s["url"] not in local_urls]
urls = [r["url"] for r in remote if r["url"] not in local_urls]
else:
urls = body.get("urls", [])
@ -502,27 +542,24 @@ def handle_subscription_sync(sub_id):
db.close()
return handle_subscriptions("Subscription not found.")
try:
resp = requests.get(f"{sub['url']}/api/sites", timeout=5)
if resp.status_code == 403:
db.close()
return handle_subscriptions("That instance has sharing disabled.")
resp.raise_for_status()
data = resp.json()
data = fetch_remote_sites(sub["dest_hash"])
sites = data.get("sites", [])
remote_name = data.get("name", sub["name"])
except PermissionError:
db.close()
return handle_subscriptions("That instance has sharing disabled.")
except Exception as e:
db.close()
return handle_subscriptions(f"Could not sync: {esc(str(e))}")
local_urls = set(r["url"] for r in db.execute("SELECT url FROM pages").fetchall())
# Clear old remote pages for this subscription and re-insert
db.execute("DELETE FROM remote_pages WHERE subscription_id = ?", (sub_id,))
synced = 0
for s in sites:
if s["url"] in local_urls:
continue
try:
db.execute(
"INSERT INTO pages (url, title, body, note) VALUES (?, ?, ?, ?)",
(s["url"], s["title"], f"[synced from {remote_name}]", s.get("note", "")),
"INSERT INTO remote_pages (subscription_id, url, title, note) VALUES (?, ?, ?, ?)",
(sub_id, s["url"], s["title"], s.get("note", "")),
)
synced += 1
except Exception:
@ -531,7 +568,7 @@ def handle_subscription_sync(sub_id):
db.execute("UPDATE subscriptions SET last_sync = ?, name = ? WHERE id = ?", (now, remote_name, sub_id))
db.commit()
db.close()
return handle_subscriptions(f"Synced {synced} new site(s) from {esc(remote_name)}.")
return handle_subscriptions(f"Synced {synced} site(s) from {esc(remote_name)}.")
def handle_subscription_autosync(sub_id):
@ -559,21 +596,16 @@ def handle_subscription_syncall():
total = 0
for sub in subs:
try:
resp = requests.get(f"{sub['url']}/api/sites", timeout=5)
if resp.status_code != 200:
continue
data = resp.json()
data = fetch_remote_sites(sub["dest_hash"])
sites = data.get("sites", [])
remote_name = data.get("name", sub["name"])
db = get_db()
local_urls = set(r["url"] for r in db.execute("SELECT url FROM pages").fetchall())
db.execute("DELETE FROM remote_pages WHERE subscription_id = ?", (sub["id"],))
for s in sites:
if s["url"] in local_urls:
continue
try:
db.execute(
"INSERT INTO pages (url, title, body, note) VALUES (?, ?, ?, ?)",
(s["url"], s["title"], f"[synced from {remote_name}]", s.get("note", "")),
"INSERT INTO remote_pages (subscription_id, url, title, note) VALUES (?, ?, ?, ?)",
(sub["id"], s["url"], s["title"], s.get("note", "")),
)
except Exception:
pass

78
rns_client.py Normal file
View file

@ -0,0 +1,78 @@
import time
import RNS
APP_NAME = "tinyweb"
ASPECTS = ["server"]
REQUEST_TIMEOUT = 30
def fetch_remote_sites(dest_hash_hex):
"""
Connect to a remote TinyWeb instance over Reticulum and fetch its
shared sites. Returns the response dict from /api/sites, or raises
an exception on failure.
"""
dest_hash = bytes.fromhex(dest_hash_hex)
# Resolve path if needed
if not RNS.Transport.has_path(dest_hash):
RNS.Transport.request_path(dest_hash)
elapsed = 0
while not RNS.Transport.has_path(dest_hash) and elapsed < 15:
time.sleep(0.5)
elapsed += 0.5
if not RNS.Transport.has_path(dest_hash):
raise ConnectionError(f"Could not find path to {dest_hash_hex}")
server_identity = RNS.Identity.recall(dest_hash)
if server_identity is None:
raise ConnectionError(f"Could not recall identity for {dest_hash_hex}")
destination = RNS.Destination(
server_identity,
RNS.Destination.OUT,
RNS.Destination.SINGLE,
APP_NAME,
*ASPECTS,
)
# Establish link
link = RNS.Link(destination)
elapsed = 0
while link.status == RNS.Link.PENDING and elapsed < 15:
time.sleep(0.25)
elapsed += 0.25
if link.status != RNS.Link.ACTIVE:
raise ConnectionError(f"Could not establish link to {dest_hash_hex}")
try:
# Request /api/sites
request_data = {
"method": "GET",
"path": "/api/sites",
"query": {},
"body": {},
"gateway_host": "",
}
receipt = link.request("/tinyweb", data=request_data, timeout=REQUEST_TIMEOUT)
elapsed = 0
done = (RNS.RequestReceipt.READY, RNS.RequestReceipt.DELIVERED, RNS.RequestReceipt.FAILED)
while receipt.get_status() not in done and elapsed < REQUEST_TIMEOUT:
time.sleep(0.5)
elapsed += 0.5
if receipt.get_status() in (RNS.RequestReceipt.READY, RNS.RequestReceipt.DELIVERED):
resp = receipt.get_response()
if resp["status"] == 403:
raise PermissionError("That instance has sharing disabled.")
if resp["status"] != 200:
raise ConnectionError(f"Remote returned status {resp['status']}")
import json
return json.loads(resp["body"])
else:
raise ConnectionError(f"Request failed or timed out")
finally:
link.teardown()