tinyweb/handlers.py
Derick Phan e0a12272ed
Fix about page showing stale tag count after tag removal
Count tags from page_tags instead of the tags table, which retains
orphaned rows when tags are removed from pages.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-26 07:44:26 -07:00

874 lines
31 KiB
Python

import json
from datetime import datetime
from db import get_db, get_setting, set_setting, get_site_name, index_url, clean_url
from templates import esc, snippet, wrap_page
from rns_client import fetch_remote_sites
def _respond(body_html, status=200):
return {
"status": status,
"content_type": "text/html; charset=utf-8",
"body": wrap_page(body_html),
"headers": {},
}
def _redirect(location):
return {
"status": 302,
"content_type": "text/html; charset=utf-8",
"body": "",
"headers": {"Location": location},
}
def _json_response(data, status=200, headers=None):
return {
"status": status,
"content_type": "application/json",
"body": json.dumps(data, indent=2),
"headers": headers or {},
}
def _text_response(text, status=200, headers=None):
return {
"status": status,
"content_type": "text/plain",
"body": text,
"headers": headers or {},
}
def _error(status):
return _respond(f"<h1>{status}</h1>", status)
# --- Tag helpers ---
def _get_page_tags(page_id, db=None):
close = False
if db is None:
db = get_db()
close = True
rows = db.execute(
"SELECT t.name FROM tags t JOIN page_tags pt ON t.id = pt.tag_id "
"WHERE pt.page_id = ? ORDER BY t.name", (page_id,)
).fetchall()
if close:
db.close()
return [r["name"] for r in rows]
def _set_page_tags(page_id, tag_string, db=None):
close = False
if db is None:
db = get_db()
close = True
db.execute("DELETE FROM page_tags WHERE page_id = ?", (page_id,))
for name in (t.strip().lower() for t in tag_string.split(",") if t.strip()):
db.execute("INSERT OR IGNORE INTO tags (name) VALUES (?)", (name,))
tag_id = db.execute("SELECT id FROM tags WHERE name = ?", (name,)).fetchone()["id"]
db.execute("INSERT OR IGNORE INTO page_tags (page_id, tag_id) VALUES (?, ?)", (page_id, tag_id))
if close:
db.commit()
db.close()
# --- Route handlers ---
def handle_search(query):
q = query.get("q", [""])[0].strip()
db = get_db()
count = db.execute("SELECT count(*) FROM pages").fetchone()[0]
name = get_site_name()
result_html = ""
trusted_html = ""
if q:
rows = db.execute(
"SELECT p.id, p.url, p.title, p.body, p.note "
"FROM pages_fts f JOIN pages p ON f.rowid = p.id "
"WHERE pages_fts MATCH ? ORDER BY rank LIMIT 50",
(q,),
).fetchall()
if rows:
for r in rows:
note_html = ""
if r["note"]:
note_html = f'<div class="note"><em>{esc(r["note"])}</em></div>'
tags = _get_page_tags(r["id"], db)
tags_html = ""
if tags:
tag_links = " ".join(f'<a href="/tags/{esc(t)}" class="tag">[{esc(t)}]</a>' for t in tags)
tags_html = f'<div class="tags">{tag_links}</div>'
result_html += (
f'<div class="result">'
f'<a href="{esc(r["url"])}">{esc(r["title"])}</a><br>'
f'<small>{esc(r["url"])}</small><br>'
f'{esc(snippet(r["body"], q))}'
f'{note_html}{tags_html}'
f'</div>'
)
else:
result_html = "<p>No results in your index.</p>"
# search all linked pages from trusted sites
words = q.lower().split()
all_links = db.execute(
"SELECT l.url, l.label, p.title AS source_title "
"FROM links l JOIN pages p ON l.page_id = p.id",
).fetchall()
indexed_urls = set(r["url"] for r in rows) if rows else set()
seen = set()
trusted = []
for l in all_links:
if l["url"] in indexed_urls or l["url"] in seen:
continue
if any(w in l["label"].lower() for w in words):
seen.add(l["url"])
trusted.append(l)
if len(trusted) >= 20:
break
if trusted:
items = ""
for l in trusted:
items += (
f'<li><a href="{esc(l["url"])}">{esc(l["label"])}</a> '
f'<small>— from {esc(l["source_title"])}</small></li>'
)
trusted_html = (
f'<details class="trusted">'
f'<summary>from your trusted sites ({len(trusted)})</summary>'
f'<ul>{items}</ul>'
f'</details>'
)
# search synced pages from subscriptions
remote_rows = db.execute(
"SELECT rp.url, rp.title, rp.note, s.name AS source_name "
"FROM remote_pages_fts rpf "
"JOIN remote_pages rp ON rpf.rowid = rp.id "
"JOIN subscriptions s ON rp.subscription_id = s.id "
"WHERE remote_pages_fts MATCH ? ORDER BY rank LIMIT 50",
(q,),
).fetchall()
remote_html = ""
if q and remote_rows:
# group by source
by_source = {}
for r in remote_rows:
source = r["source_name"] or "unknown"
by_source.setdefault(source, []).append(r)
for source, items in by_source.items():
source_items = ""
for r in items:
note_html = f' — <em>{esc(r["note"])}</em>' if r["note"] else ""
source_items += (
f'<li><a href="{esc(r["url"])}">{esc(r["title"])}</a>'
f'{note_html} <small>({esc(r["url"])})</small></li>'
)
remote_html += (
f'<details class="remote" open>'
f'<summary>from {esc(source)} ({len(items)})</summary>'
f'<ul>{source_items}</ul>'
f'</details>'
)
db.close()
sub_count = ""
if q and remote_rows:
sub_count = f" + {len(remote_rows)} from subscriptions"
return _respond(
f'<h1><a href="/">{esc(name)}</a></h1>'
f'<form method="get" action="/">'
f'<input name="q" value="{esc(q)}" placeholder="search your index" size="40">'
f' <button type="submit">search</button>'
f'</form>'
f'<p>{count} page(s) indexed.'
f' <a href="/add">+ add url</a>'
f' | <a href="/pages">browse</a>'
f' | <a href="/tags">tags</a>'
f' | <a href="/subscriptions">subscriptions</a>'
f' | <a href="/style">customize</a>'
f' | <a href="/about">about</a></p>'
f'<hr>{result_html}{trusted_html}{remote_html}'
)
def handle_add_form(msg=""):
return _respond(
f"<h1>add url</h1>"
f'<form method="post" action="/add">'
f'<input name="url" placeholder="https://example.com" size="50"><br><br>'
f'<input name="note" placeholder="why are you saving this? (optional)" size="50"><br><br>'
f'<input name="tags" placeholder="tags (comma-separated, e.g. solarpunk, mesh)" size="50"><br><br>'
f'<button type="submit">index</button>'
f"</form>"
f"<p>{msg}</p>"
f'<a href="/">back</a>'
)
def handle_add_submit(body):
url = clean_url(body.get("url", [""])[0].strip())
note = body.get("note", [""])[0].strip()
tags = body.get("tags", [""])[0].strip()
if not url:
return handle_add_form("URL is required.")
if not url.startswith(("http://", "https://")):
return handle_add_form("URL must start with http:// or https://")
try:
title = index_url(url, note)
if tags:
db = get_db()
row = db.execute("SELECT id FROM pages WHERE url = ?", (url,)).fetchone()
if row:
_set_page_tags(row["id"], tags, db)
db.commit()
db.close()
return handle_add_form(f'Indexed: <a href="{esc(url)}">{esc(title)}</a>')
except Exception as e:
return handle_add_form(f"Error: {esc(str(e))}")
def handle_pages():
db = get_db()
rows = db.execute("SELECT id, url, title, note FROM pages ORDER BY id DESC").fetchall()
items = ""
for r in rows:
note_html = f' — <em>{esc(r["note"])}</em>' if r["note"] else ""
tags = _get_page_tags(r["id"], db)
tags_html = ""
if tags:
tag_links = " ".join(f'<a href="/tags/{esc(t)}">[{esc(t)}]</a>' for t in tags)
tags_html = f' {tag_links}'
items += (
f'<li>{esc(r["title"])}{note_html}{tags_html} '
f'<small>(<a href="{esc(r["url"])}">{esc(r["url"])}</a>)</small> '
f'<a href="/edit/{r["id"]}">edit</a> '
f'<a href="/delete/{r["id"]}">remove</a></li>'
)
db.close()
return _respond(
f"<h1>indexed pages ({len(rows)})</h1>"
f"<ul>{items}</ul>"
f'<p><a href="/export">export</a> | <a href="/import">import</a></p>'
f'<a href="/">back</a>'
)
def handle_edit_form(page_id, msg=""):
db = get_db()
row = db.execute("SELECT id, url, title, note FROM pages WHERE id = ?", (page_id,)).fetchone()
if not row:
db.close()
return _error(404)
tags = ", ".join(_get_page_tags(page_id, db))
db.close()
return _respond(
f"<h1>edit page</h1>"
f"<p><b>{esc(row['title'])}</b><br>"
f"<small>{esc(row['url'])}</small></p>"
f'<form method="post" action="/edit/{row["id"]}">'
f'<input name="note" value="{esc(row["note"])}" placeholder="why did you save this?" size="50"><br><br>'
f'<input name="tags" value="{esc(tags)}" placeholder="tags (comma-separated)" size="50"><br><br>'
f'<button type="submit">save</button>'
f"</form>"
f"<p>{msg}</p>"
f'<a href="/pages">back</a>'
)
def handle_edit_submit(page_id, body):
note = body.get("note", [""])[0].strip()
tags = body.get("tags", [""])[0].strip()
db = get_db()
db.execute("UPDATE pages SET note = ? WHERE id = ?", (note, page_id))
_set_page_tags(page_id, tags, db)
db.commit()
db.close()
return _redirect("/pages")
def handle_delete(page_id):
db = get_db()
db.execute("DELETE FROM links WHERE page_id = ?", (page_id,))
db.execute("DELETE FROM pages WHERE id = ?", (page_id,))
db.commit()
db.close()
return _redirect("/pages")
def handle_bookmark(query):
url = clean_url(query.get("url", [""])[0].strip())
if not url or not url.startswith(("http://", "https://")):
return _text_response("error: invalid url", headers={"Access-Control-Allow-Origin": "*"})
try:
title = index_url(url)
msg = f"ok: {title}"
except Exception as e:
msg = f"error: {e}"
return _text_response(msg, headers={"Access-Control-Allow-Origin": "*"})
def handle_export():
db = get_db()
rows = db.execute("SELECT url, title, note FROM pages ORDER BY id").fetchall()
db.close()
data = [{"url": r["url"], "title": r["title"], "note": r["note"]} for r in rows]
return _json_response(data, headers={"Content-Disposition": "attachment; filename=tinyweb-export.json"})
def handle_import_form(msg=""):
return _respond(
f"<h1>import</h1>"
f"<p>Paste the contents of a tinyweb export file (JSON).</p>"
f'<form method="post" action="/import">'
f'<textarea name="data" rows="12" cols="60" placeholder=\'[{{"url": "...", "note": "..."}}]\'></textarea><br><br>'
f'<button type="submit">import</button>'
f"</form>"
f"<p>{msg}</p>"
f'<a href="/pages">back</a>'
)
def handle_import_submit(body):
raw = body.get("data", [""])[0].strip()
if not raw:
return handle_import_form("Paste JSON data.")
try:
data = json.loads(raw)
except json.JSONDecodeError:
return handle_import_form("Invalid JSON.")
if not isinstance(data, list):
return handle_import_form("Expected a JSON array.")
imported = 0
errors = 0
for entry in data:
url = entry.get("url", "").strip()
note = entry.get("note", "").strip()
if not url:
continue
try:
index_url(url, note)
imported += 1
except Exception:
errors += 1
return handle_import_form(f"Imported {imported} page(s). {errors} error(s).")
def handle_style_form(msg=""):
css = get_setting("custom_css")
name = get_site_name()
sharing = get_setting("sharing_enabled", "0")
checked = " checked" if sharing == "1" else ""
return _respond(
f"<h1>customize</h1>"
f"<h2>name your search engine</h2>"
f'<form method="post" action="/style">'
f'<input name="site_name" value="{esc(name)}" placeholder="tinyweb" size="30"><br><br>'
f"<h2>sharing</h2>"
f'<label><input type="checkbox" name="sharing_enabled" value="1"{checked}>'
f" share your site list publicly at /api/sites</label><br><br>"
f"<h2>custom css</h2>"
f"<p>Some classes you can target:</p>"
f"<pre>"
f"body - page background, font\n"
f"h1 - page titles\n"
f"input, button - search bar\n"
f"a - links\n"
f".result - each search result\n"
f".note - your notes on results\n"
f".trusted - trusted sites dropdown\n"
f"small - url text\n"
f"ul, li - browse page list"
f"</pre>"
f'<textarea name="css" rows="16" cols="60">{esc(css)}</textarea><br><br>'
f'<button type="submit">save</button>'
f"</form>"
f"<h2>bookmarklet</h2>"
f"<p>Drag this link to your bookmarks bar. Click it on any page to index it instantly.</p>"
f'<p><a href="javascript:void(fetch(\'http://localhost:8080/bookmark?url=\'+encodeURIComponent(location.href)).then(r=>r.text()).then(t=>alert(t)).catch(()=>alert(\'tinyweb not running\')))">+ save to {esc(name)}</a></p>'
f"<p>{msg}</p>"
f'<a href="/">back</a>'
)
def handle_style_submit(body):
css = body.get("css", [""])[0]
name = body.get("site_name", ["tinyweb"])[0].strip()
sharing = "1" if body.get("sharing_enabled") else "0"
set_setting("custom_css", css)
set_setting("site_name", name or "tinyweb")
set_setting("sharing_enabled", sharing)
return handle_style_form("Saved.")
def handle_about():
name = get_site_name()
dest_hash = get_setting("dest_hash")
sharing = get_setting("sharing_enabled", "0") == "1"
db = get_db()
page_count = db.execute("SELECT count(*) FROM pages").fetchone()[0]
tag_count = db.execute("SELECT count(DISTINCT tag_id) FROM page_tags").fetchone()[0]
sub_count = db.execute("SELECT count(*) FROM subscriptions").fetchone()[0]
db.close()
sharing_html = (
'<p>This instance shares its index publicly. Subscribe to join the network.</p>'
if sharing else
'<p>This instance is private.</p>'
)
hash_html = ""
if dest_hash:
hash_html = (
f'<h2>subscribe</h2>'
f'<p>To subscribe to this instance, add this destination hash in your TinyWeb:</p>'
f'<pre>{esc(dest_hash)}</pre>'
)
return _respond(
f'<h1>{esc(name)}</h1>'
f'<p>A personal search engine, built for the slow web.</p>'
f'<p>TinyWeb is about taking back the internet. No algorithms, no ads, no tracking. '
f'Just human-curated pages shared freely across a mesh network.</p>'
f'<ul>'
f'<li><b>{page_count}</b> page(s) indexed</li>'
f'<li><b>{tag_count}</b> tag(s)</li>'
f'<li><b>{sub_count}</b> subscription(s)</li>'
f'</ul>'
f'{sharing_html}'
f'{hash_html}'
f'<h2>what is the slow web?</h2>'
f'<p>The slow web is a movement for intentionality over speed, '
f'human curation over algorithmic feeds, privacy over surveillance, '
f'and community over corporations. Every page in this index was saved by a person '
f'because they found it valuable — not because an algorithm told them to click.</p>'
f'<h2>how it works</h2>'
f'<ul>'
f'<li>Save pages you find valuable with the bookmarklet or /add</li>'
f'<li>Search your personal index — queries never leave your machine</li>'
f'<li>Subscribe to friends over Reticulum — encrypted, decentralized, works without the internet</li>'
f'<li>Tag and organize your collection into curated lists</li>'
f'</ul>'
f'<p><a href="/">search</a> | <a href="/pages">browse</a> | <a href="/tags">tags</a></p>'
)
def handle_tags():
db = get_db()
rows = db.execute(
"SELECT t.name, COUNT(pt.page_id) AS cnt FROM tags t "
"JOIN page_tags pt ON t.id = pt.tag_id "
"GROUP BY t.id ORDER BY t.name"
).fetchall()
db.close()
items = ""
for r in rows:
items += f'<li><a href="/tags/{esc(r["name"])}">{esc(r["name"])}</a> ({r["cnt"]})</li>'
return _respond(
f"<h1>tags</h1>"
f"<ul>{items}</ul>" if items else "<p>No tags yet. Add tags when saving or editing pages.</p>"
f'<a href="/">back</a>'
)
def handle_tag_browse(tag_name):
db = get_db()
rows = db.execute(
"SELECT p.id, p.url, p.title, p.note FROM pages p "
"JOIN page_tags pt ON p.id = pt.page_id "
"JOIN tags t ON t.id = pt.tag_id "
"WHERE t.name = ? ORDER BY p.id DESC",
(tag_name,),
).fetchall()
items = ""
for r in rows:
note_html = f' — <em>{esc(r["note"])}</em>' if r["note"] else ""
tags = _get_page_tags(r["id"], db)
tag_links = " ".join(f'<a href="/tags/{esc(t)}">[{esc(t)}]</a>' for t in tags)
items += (
f'<li>{esc(r["title"])}{note_html} {tag_links} '
f'<small>(<a href="{esc(r["url"])}">{esc(r["url"])}</a>)</small></li>'
)
db.close()
return _respond(
f'<h1>tag: {esc(tag_name)}</h1>'
f'<p>{len(rows)} page(s)</p>'
f'<ul>{items}</ul>'
f'<a href="/tags">all tags</a> | <a href="/">back</a>'
)
def handle_api_sites():
if get_setting("sharing_enabled", "0") != "1":
return _json_response(
{"error": "sharing disabled"},
status=403,
headers={"Access-Control-Allow-Origin": "*"},
)
db = get_db()
rows = db.execute("SELECT id, url, title, note FROM pages ORDER BY id DESC").fetchall()
sites = []
for r in rows:
tags = _get_page_tags(r["id"], db)
sites.append({"url": r["url"], "title": r["title"], "note": r["note"], "tags": tags})
db.close()
data = {"name": get_site_name(), "sites": sites}
return _json_response(data, headers={"Access-Control-Allow-Origin": "*"})
def handle_subscriptions(msg=""):
db = get_db()
subs = db.execute("SELECT * FROM subscriptions ORDER BY id DESC").fetchall()
db.close()
items = ""
for s in subs:
auto_label = "on" if s["auto_sync"] else "off"
last = s["last_sync"] or "never"
items += (
f'<tr>'
f'<td><b>{esc(s["name"] or "unknown")}</b><br><small>{esc(s["dest_hash"])}</small></td>'
f'<td>{esc(last)}</td>'
f'<td>'
f'<form method="post" action="/subscriptions/autosync/{s["id"]}" style="display:inline">'
f'<button>auto-sync: {auto_label}</button></form>'
f'</td>'
f'<td>'
f'<a href="/subscriptions/browse/{s["id"]}">browse</a> '
f'<form method="post" action="/subscriptions/sync/{s["id"]}" style="display:inline">'
f'<button>sync now</button></form> '
f'<form method="post" action="/subscriptions/delete/{s["id"]}" style="display:inline">'
f'<button>remove</button></form>'
f'</td>'
f'</tr>'
)
table = ""
if subs:
table = (
f'<table><tr><th>instance</th><th>last sync</th><th>auto-sync</th><th>actions</th></tr>'
f'{items}</table>'
f'<form method="post" action="/subscriptions/syncall">'
f'<button>sync all</button></form>'
)
return _respond(
f"<h1>subscriptions</h1>"
f'<form method="post" action="/subscriptions/add">'
f'<input name="dest_hash" placeholder="destination hash" size="40"> '
f'<button>subscribe</button>'
f'</form>'
f'<p>{msg}</p>'
f'<hr>{table}'
f'<br><a href="/">back</a>'
)
def handle_subscription_add(body):
dest_hash = body.get("dest_hash", [""])[0].strip().replace("<", "").replace(">", "")
if not dest_hash or len(dest_hash) != 32:
return handle_subscriptions("Enter a valid 32-character destination hash.")
try:
int(dest_hash, 16)
except ValueError:
return handle_subscriptions("Invalid destination hash (must be hex).")
try:
data = fetch_remote_sites(dest_hash)
name = data.get("name", "")
except PermissionError:
return handle_subscriptions("That instance has sharing disabled.")
except Exception as e:
return handle_subscriptions(f"Could not reach that instance: {esc(str(e))}")
db = get_db()
try:
db.execute(
"INSERT INTO subscriptions (dest_hash, name) VALUES (?, ?) "
"ON CONFLICT(dest_hash) DO UPDATE SET name=excluded.name",
(dest_hash, name),
)
db.commit()
finally:
db.close()
return handle_subscriptions(f"Subscribed to {esc(name or dest_hash)}.")
def handle_subscription_browse(sub_id):
db = get_db()
sub = db.execute("SELECT * FROM subscriptions WHERE id = ?", (sub_id,)).fetchone()
if not sub:
db.close()
return _error(404)
local_urls = set(r["url"] for r in db.execute("SELECT url FROM pages").fetchall())
# Use locally synced data if available, otherwise fetch live
remote_rows = db.execute(
"SELECT url, title, note, tags FROM remote_pages WHERE subscription_id = ?",
(sub_id,),
).fetchall()
db.close()
if remote_rows:
sites = []
for r in remote_rows:
tags = [t for t in r["tags"].split(",") if t] if r["tags"] else []
sites.append({"url": r["url"], "title": r["title"], "note": r["note"], "tags": tags})
else:
try:
data = fetch_remote_sites(sub["dest_hash"])
sites = data.get("sites", [])
except PermissionError:
return handle_subscriptions("That instance has sharing disabled.")
except Exception as e:
return handle_subscriptions(f"Could not fetch sites: {esc(str(e))}")
new_items = ""
existing_items = ""
new_count = 0
for s in sites:
if s["url"] in local_urls:
existing_items += (
f'<li style="opacity:0.5">{esc(s["title"])} '
f'<small>({esc(s["url"])})</small> — already indexed</li>'
)
else:
new_count += 1
note_html = f' — <em>{esc(s["note"])}</em>' if s.get("note") else ""
tags_html = ""
if s.get("tags"):
tags_html = " " + " ".join(f'[{esc(t)}]' for t in s["tags"])
new_items += (
f'<li><label><input type="checkbox" name="urls" value="{esc(s["url"])}">'
f' {esc(s["title"])}{note_html}{tags_html}'
f' <small>({esc(s["url"])})</small></label></li>'
)
buttons = ""
if new_count:
buttons = '<button>import selected</button> <button name="import_all" value="1">import all new</button>'
return _respond(
f'<h1>browsing: {esc(sub["name"] or sub["dest_hash"])}</h1>'
f'<p>{len(sites)} site(s) available, {new_count} new</p>'
f'<form method="post" action="/subscriptions/pick">'
f'<input type="hidden" name="sub_id" value="{sub_id}">'
f'<ul>{new_items}</ul>'
f'{buttons}'
f'</form>'
f'<h3>already indexed</h3><ul>{existing_items}</ul>'
f'<a href="/subscriptions">back</a>'
)
def handle_subscription_pick(body):
sub_id = body.get("sub_id", [""])[0]
import_all = body.get("import_all", [""])[0]
# Build a url->tags map from remote_pages for this subscription
db = get_db()
remote_rows = db.execute(
"SELECT url, tags FROM remote_pages WHERE subscription_id = ?", (sub_id,)
).fetchall()
remote_tags = {r["url"]: r["tags"] for r in remote_rows}
if import_all:
local_urls = set(r["url"] for r in db.execute("SELECT url FROM pages").fetchall())
urls = [r["url"] for r in remote_rows if r["url"] not in local_urls]
else:
urls = body.get("urls", [])
db.close()
if not urls:
return handle_subscriptions("No sites selected.")
imported = 0
errors = 0
for url in urls:
try:
index_url(url)
# Import tags from the remote page
tags_str = remote_tags.get(url, "")
if tags_str:
db = get_db()
row = db.execute("SELECT id FROM pages WHERE url = ?", (url,)).fetchone()
if row:
_set_page_tags(row["id"], tags_str, db)
db.commit()
db.close()
imported += 1
except Exception:
errors += 1
return handle_subscriptions(f"Imported {imported} page(s). {errors} error(s).")
def handle_subscription_sync(sub_id):
db = get_db()
sub = db.execute("SELECT * FROM subscriptions WHERE id = ?", (sub_id,)).fetchone()
if not sub:
db.close()
return handle_subscriptions("Subscription not found.")
try:
data = fetch_remote_sites(sub["dest_hash"])
sites = data.get("sites", [])
remote_name = data.get("name", sub["name"])
except PermissionError:
db.close()
return handle_subscriptions("That instance has sharing disabled.")
except Exception as e:
db.close()
return handle_subscriptions(f"Could not sync: {esc(str(e))}")
# Clear old remote pages for this subscription and re-insert
db.execute("DELETE FROM remote_pages WHERE subscription_id = ?", (sub_id,))
synced = 0
for s in sites:
try:
tags_str = ",".join(s.get("tags", []))
db.execute(
"INSERT INTO remote_pages (subscription_id, url, title, note, tags) VALUES (?, ?, ?, ?, ?)",
(sub_id, s["url"], s["title"], s.get("note", ""), tags_str),
)
synced += 1
except Exception:
pass
now = datetime.now().strftime("%Y-%m-%d %H:%M")
db.execute("UPDATE subscriptions SET last_sync = ?, name = ? WHERE id = ?", (now, remote_name, sub_id))
db.commit()
db.close()
return handle_subscriptions(f"Synced {synced} site(s) from {esc(remote_name)}.")
def handle_subscription_autosync(sub_id):
db = get_db()
db.execute("UPDATE subscriptions SET auto_sync = 1 - auto_sync WHERE id = ?", (sub_id,))
db.commit()
db.close()
return _redirect("/subscriptions")
def handle_subscription_delete(sub_id):
db = get_db()
db.execute("DELETE FROM subscriptions WHERE id = ?", (sub_id,))
db.commit()
db.close()
return _redirect("/subscriptions")
def handle_subscription_syncall():
db = get_db()
subs = db.execute("SELECT * FROM subscriptions WHERE auto_sync = 1").fetchall()
db.close()
if not subs:
return handle_subscriptions("No subscriptions have auto-sync enabled.")
total = 0
for sub in subs:
try:
data = fetch_remote_sites(sub["dest_hash"])
sites = data.get("sites", [])
remote_name = data.get("name", sub["name"])
db = get_db()
db.execute("DELETE FROM remote_pages WHERE subscription_id = ?", (sub["id"],))
for s in sites:
try:
tags_str = ",".join(s.get("tags", []))
db.execute(
"INSERT INTO remote_pages (subscription_id, url, title, note, tags) VALUES (?, ?, ?, ?, ?)",
(sub["id"], s["url"], s["title"], s.get("note", ""), tags_str),
)
except Exception:
pass
now = datetime.now().strftime("%Y-%m-%d %H:%M")
db.execute("UPDATE subscriptions SET last_sync = ?, name = ? WHERE id = ?", (now, remote_name, sub["id"]))
db.commit()
db.close()
total += 1
except Exception:
pass
return handle_subscriptions(f"Synced {total} subscription(s).")
# --- Dispatcher ---
def dispatch_request(data):
method = data.get("method", "GET")
path = data.get("path", "/")
query = data.get("query", {})
body = data.get("body", {})
gateway_host = data.get("gateway_host", "")
def extract_id(prefix):
try:
return int(path[len(prefix):])
except (ValueError, IndexError):
return None
if method == "GET":
if path == "/":
return handle_search(query)
elif path == "/add":
return handle_add_form()
elif path == "/pages":
return handle_pages()
elif path.startswith("/edit/"):
pid = extract_id("/edit/")
return handle_edit_form(pid) if pid is not None else _error(400)
elif path.startswith("/delete/"):
pid = extract_id("/delete/")
return handle_delete(pid) if pid is not None else _error(400)
elif path == "/bookmark":
return handle_bookmark(query)
elif path == "/style":
return handle_style_form()
elif path == "/about":
return handle_about()
elif path == "/export":
return handle_export()
elif path == "/import":
return handle_import_form()
elif path == "/tags":
return handle_tags()
elif path.startswith("/tags/"):
tag_name = path[len("/tags/"):]
return handle_tag_browse(tag_name) if tag_name else _error(400)
elif path == "/api/sites":
return handle_api_sites()
elif path == "/subscriptions":
return handle_subscriptions()
elif path.startswith("/subscriptions/browse/"):
sid = extract_id("/subscriptions/browse/")
return handle_subscription_browse(sid) if sid is not None else _error(400)
elif method == "POST":
if path == "/add":
return handle_add_submit(body)
elif path.startswith("/edit/"):
pid = extract_id("/edit/")
return handle_edit_submit(pid, body) if pid is not None else _error(400)
elif path == "/style":
return handle_style_submit(body)
elif path == "/import":
return handle_import_submit(body)
elif path == "/subscriptions/add":
return handle_subscription_add(body)
elif path == "/subscriptions/pick":
return handle_subscription_pick(body)
elif path.startswith("/subscriptions/sync/"):
sid = extract_id("/subscriptions/sync/")
return handle_subscription_sync(sid) if sid is not None else _error(400)
elif path.startswith("/subscriptions/autosync/"):
sid = extract_id("/subscriptions/autosync/")
return handle_subscription_autosync(sid) if sid is not None else _error(400)
elif path.startswith("/subscriptions/delete/"):
sid = extract_id("/subscriptions/delete/")
return handle_subscription_delete(sid) if sid is not None else _error(400)
elif path == "/subscriptions/syncall":
return handle_subscription_syncall()
return _error(404)