tinyweb/handlers.py
Derick Phan 2df92752b6
Normalize template line endings before storing to fix navbar disappearing
The previous fix only normalized \r\n for comparison but stored the raw
template with browser line endings. Now all \r\n and \r are converted to
\n before both comparing and storing, preventing the bare skeleton from
ever being saved as a custom template.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-26 22:29:45 -07:00

1098 lines
40 KiB
Python

import json
import secrets
import threading
from datetime import datetime
from urllib.parse import unquote
from db import get_db, return_db, get_setting, set_setting, get_site_name, index_url, clean_url
from templates import esc, snippet, wrap_page, DEFAULT_TEMPLATE
from rns_client import fetch_remote_sites
_request_local = threading.local()
def _get_csrf_token():
return getattr(_request_local, 'csrf_token', '')
def _csrf_field():
return f'<input type="hidden" name="_csrf" value="{_get_csrf_token()}">'
def _check_csrf(body):
token = body.get("_csrf", [""])[0]
expected = _get_csrf_token()
if not expected or not token:
return False
return secrets.compare_digest(token, expected)
def _sanitize_fts_query(query):
"""Escape user input for safe use in FTS5 MATCH."""
escaped = query.replace('"', '""')
return f'"{escaped}"'
def _get_bookmark_token():
token = get_setting("bookmark_token")
if not token:
token = secrets.token_hex(16)
set_setting("bookmark_token", token)
return token
def _respond(body_html, status=200, use_default=False):
return {
"status": status,
"content_type": "text/html; charset=utf-8",
"body": wrap_page(body_html, use_default=use_default),
"headers": {},
}
def _redirect(location):
if not location.startswith("/") or location.startswith("//"):
location = "/"
return {
"status": 302,
"content_type": "text/html; charset=utf-8",
"body": "",
"headers": {"Location": location},
}
def _json_response(data, status=200, headers=None):
return {
"status": status,
"content_type": "application/json",
"body": json.dumps(data, indent=2),
"headers": headers or {},
}
def _text_response(text, status=200, headers=None):
return {
"status": status,
"content_type": "text/plain",
"body": text,
"headers": headers or {},
}
def _error(status):
return _respond(f"<h1>{status}</h1>", status)
PER_PAGE = 10
def _paginate(query, key="p"):
try:
page = int(query.get(key, ["1"])[0])
except (ValueError, IndexError):
page = 1
return max(1, page)
def _page_nav(page, total, base_url):
if total <= PER_PAGE:
return ""
total_pages = (total + PER_PAGE - 1) // PER_PAGE
sep = "&" if "?" in base_url else "?"
parts = []
if page > 1:
parts.append(f'<a href="{base_url}{sep}p={page - 1}">&laquo; prev</a>')
parts.append(f"page {page} of {total_pages}")
if page < total_pages:
parts.append(f'<a href="{base_url}{sep}p={page + 1}">next &raquo;</a>')
return f'<p class="pagination">{" | ".join(parts)}</p>'
# --- Tag helpers ---
def _get_page_tags(page_id, db=None):
close = False
if db is None:
db = get_db()
close = True
rows = db.execute(
"SELECT t.name FROM tags t JOIN page_tags pt ON t.id = pt.tag_id "
"WHERE pt.page_id = ? ORDER BY t.name", (page_id,)
).fetchall()
if close:
return_db(db)
return [r["name"] for r in rows]
def _set_page_tags(page_id, tag_string, db=None):
close = False
if db is None:
db = get_db()
close = True
db.execute("DELETE FROM page_tags WHERE page_id = ?", (page_id,))
for name in (t.strip().lower() for t in tag_string.split(",") if t.strip()):
db.execute("INSERT OR IGNORE INTO tags (name) VALUES (?)", (name,))
tag_id = db.execute("SELECT id FROM tags WHERE name = ?", (name,)).fetchone()["id"]
db.execute("INSERT OR IGNORE INTO page_tags (page_id, tag_id) VALUES (?, ?)", (page_id, tag_id))
if close:
db.commit()
return_db(db)
# --- Route handlers ---
def handle_search(query):
q = query.get("q", [""])[0].strip()
page = _paginate(query)
offset = (page - 1) * PER_PAGE
db = get_db()
try:
count = db.execute("SELECT count(*) FROM pages").fetchone()[0]
name = get_site_name()
result_html = ""
trusted_html = ""
if q:
try:
total_results = db.execute(
"SELECT count(*) FROM pages_fts WHERE pages_fts MATCH ?",
(_sanitize_fts_query(q),),
).fetchone()[0]
rows = db.execute(
"SELECT p.id, p.url, p.title, p.body, p.note "
"FROM pages_fts f JOIN pages p ON f.rowid = p.id "
"WHERE pages_fts MATCH ? ORDER BY rank LIMIT ? OFFSET ?",
(_sanitize_fts_query(q), PER_PAGE, offset),
).fetchall()
except Exception:
rows = []
total_results = 0
if rows:
for r in rows:
note_html = ""
if r["note"]:
note_html = f'<div class="note"><em>{esc(r["note"])}</em></div>'
tags = _get_page_tags(r["id"], db)
tags_html = ""
if tags:
tag_links = " ".join(f'<a href="/tags/{esc(t)}" class="tag">[{esc(t)}]</a>' for t in tags)
tags_html = f'<div class="tags">{tag_links}</div>'
result_html += (
f'<div class="result">'
f'<a href="{esc(r["url"])}">{esc(r["title"])}</a><br>'
f'<small>{esc(r["url"])}</small><br>'
f'{esc(snippet(r["body"], q))}'
f'{note_html}{tags_html}'
f'</div>'
)
else:
result_html = "<p>No results in your index.</p>"
# search all linked pages from trusted sites
words = q.lower().split()
all_links = db.execute(
"SELECT l.url, l.label, p.title AS source_title "
"FROM links l JOIN pages p ON l.page_id = p.id",
).fetchall()
indexed_urls = set(r["url"] for r in rows) if rows else set()
seen = set()
trusted = []
for l in all_links:
if l["url"] in indexed_urls or l["url"] in seen:
continue
if any(w in l["label"].lower() for w in words):
seen.add(l["url"])
trusted.append(l)
if len(trusted) >= 20:
break
if trusted:
items = ""
for l in trusted:
items += (
f'<li><a href="{esc(l["url"])}">{esc(l["label"])}</a> '
f'<small>— from {esc(l["source_title"])}</small></li>'
)
trusted_html = (
f'<details class="trusted">'
f'<summary>from your trusted sites ({len(trusted)})</summary>'
f'<ul>{items}</ul>'
f'</details>'
)
# search synced pages from subscriptions
try:
remote_rows = db.execute(
"SELECT rp.url, rp.title, rp.note, s.name AS source_name "
"FROM remote_pages_fts rpf "
"JOIN remote_pages rp ON rpf.rowid = rp.id "
"JOIN subscriptions s ON rp.subscription_id = s.id "
"WHERE remote_pages_fts MATCH ? ORDER BY rank LIMIT 50",
(_sanitize_fts_query(q),),
).fetchall()
except Exception:
remote_rows = []
remote_html = ""
if q and remote_rows:
# group by source
by_source = {}
for r in remote_rows:
source = r["source_name"] or "unknown"
by_source.setdefault(source, []).append(r)
for source, items in by_source.items():
source_items = ""
for r in items:
note_html = f' — <em>{esc(r["note"])}</em>' if r["note"] else ""
source_items += (
f'<li><a href="{esc(r["url"])}">{esc(r["title"])}</a>'
f'{note_html} <small>({esc(r["url"])})</small></li>'
)
remote_html += (
f'<details class="remote" open>'
f'<summary>from {esc(source)} ({len(items)})</summary>'
f'<ul>{source_items}</ul>'
f'</details>'
)
finally:
return_db(db)
sub_count = ""
if q and remote_rows:
sub_count = f" + {len(remote_rows)} from subscriptions"
return _respond(
f'<form method="get" action="/">'
f'<input name="q" value="{esc(q)}" placeholder="search your index" size="40">'
f' <button type="submit">search</button>'
f'</form>'
f'<p class="meta">{count} pages indexed'
f' · <a href="/add">+ add url</a></p>'
f'{result_html}'
f'{_page_nav(page, total_results, f"/?q={esc(q)}") if q else ""}'
f'{trusted_html}{remote_html}'
)
def handle_add_form(msg=""):
return _respond(
f"<h1>add url</h1>"
f'<form method="post" action="/add">'
f'{_csrf_field()}'
f'<input name="url" placeholder="https://example.com" size="50"><br><br>'
f'<input name="note" placeholder="why are you saving this? (optional)" size="50"><br><br>'
f'<input name="tags" placeholder="tags (comma-separated, e.g. solarpunk, mesh)" size="50"><br><br>'
f'<button type="submit">index</button>'
f"</form>"
f"<p>{msg}</p>"
f'<a href="/">back</a>'
)
def handle_add_submit(body):
url = clean_url(body.get("url", [""])[0].strip())
note = body.get("note", [""])[0].strip()
tags = body.get("tags", [""])[0].strip()
if not url:
return handle_add_form("URL is required.")
if not url.startswith(("http://", "https://")):
return handle_add_form("URL must start with http:// or https://")
try:
title = index_url(url, note)
if tags:
db = get_db()
try:
row = db.execute("SELECT id FROM pages WHERE url = ?", (url,)).fetchone()
if row:
_set_page_tags(row["id"], tags, db)
db.commit()
finally:
return_db(db)
return handle_add_form(f'Indexed: <a href="{esc(url)}">{esc(title)}</a>')
except ValueError as e:
return handle_add_form(f"Error: {esc(str(e))}")
except Exception:
return handle_add_form("Error: could not fetch or index that URL.")
def handle_pages(query=None):
page = _paginate(query or {})
offset = (page - 1) * PER_PAGE
db = get_db()
try:
total = db.execute("SELECT count(*) FROM pages").fetchone()[0]
rows = db.execute(
"SELECT id, url, title, note FROM pages ORDER BY id DESC LIMIT ? OFFSET ?",
(PER_PAGE, offset),
).fetchall()
items = ""
for r in rows:
note_html = f' — <em>{esc(r["note"])}</em>' if r["note"] else ""
tags = _get_page_tags(r["id"], db)
tags_html = ""
if tags:
tag_links = " ".join(f'<a href="/tags/{esc(t)}">[{esc(t)}]</a>' for t in tags)
tags_html = f' {tag_links}'
items += (
f'<li>{esc(r["title"])}{note_html}{tags_html} '
f'<small>(<a href="{esc(r["url"])}">{esc(r["url"])}</a>)</small> '
f'<a href="/edit/{r["id"]}">edit</a> '
f'<a href="/delete/{r["id"]}">remove</a></li>'
)
finally:
return_db(db)
return _respond(
f"<h1>indexed pages ({total})</h1>"
f"<ul>{items}</ul>"
f'{_page_nav(page, total, "/pages")}'
f'<p><a href="/export">export</a> | <a href="/import">import</a></p>'
f'<a href="/">back</a>'
)
def handle_edit_form(page_id, msg=""):
db = get_db()
try:
row = db.execute("SELECT id, url, title, note FROM pages WHERE id = ?", (page_id,)).fetchone()
if not row:
return _error(404)
tags = ", ".join(_get_page_tags(page_id, db))
finally:
return_db(db)
return _respond(
f"<h1>edit page</h1>"
f"<p><b>{esc(row['title'])}</b><br>"
f"<small>{esc(row['url'])}</small></p>"
f'<form method="post" action="/edit/{row["id"]}">'
f'{_csrf_field()}'
f'<input name="note" value="{esc(row["note"])}" placeholder="why did you save this?" size="50"><br><br>'
f'<input name="tags" value="{esc(tags)}" placeholder="tags (comma-separated)" size="50"><br><br>'
f'<button type="submit">save</button>'
f"</form>"
f"<p>{msg}</p>"
f'<a href="/pages">back</a>'
)
def handle_edit_submit(page_id, body):
note = body.get("note", [""])[0].strip()
tags = body.get("tags", [""])[0].strip()
db = get_db()
try:
db.execute("UPDATE pages SET note = ? WHERE id = ?", (note, page_id))
_set_page_tags(page_id, tags, db)
db.commit()
finally:
return_db(db)
return _redirect("/pages")
def handle_delete_confirm(page_id):
db = get_db()
try:
row = db.execute("SELECT id, url, title FROM pages WHERE id = ?", (page_id,)).fetchone()
finally:
return_db(db)
if not row:
return _error(404)
return _respond(
f"<h1>confirm delete</h1>"
f"<p>Remove <b>{esc(row['title'])}</b><br>"
f"<small>{esc(row['url'])}</small></p>"
f'<form method="post" action="/delete/{row["id"]}">'
f'{_csrf_field()}'
f'<button type="submit">yes, delete</button>'
f"</form>"
f' <a href="/pages">cancel</a>'
)
def handle_delete(page_id):
db = get_db()
try:
db.execute("DELETE FROM page_tags WHERE page_id = ?", (page_id,))
db.execute("DELETE FROM links WHERE page_id = ?", (page_id,))
db.execute("DELETE FROM pages WHERE id = ?", (page_id,))
db.commit()
finally:
return_db(db)
return _redirect("/pages")
def handle_bookmark(query):
token = query.get("token", [""])[0]
expected = _get_bookmark_token()
if not token or not secrets.compare_digest(token, expected):
return _text_response("error: invalid or missing token", status=403, headers={"Access-Control-Allow-Origin": "*"})
url = clean_url(query.get("url", [""])[0].strip())
if not url or not url.startswith(("http://", "https://")):
return _text_response("error: invalid url", headers={"Access-Control-Allow-Origin": "*"})
try:
title = index_url(url)
msg = f"ok: {title}"
except Exception as e:
msg = f"error: {e}"
return _text_response(msg, headers={"Access-Control-Allow-Origin": "*"})
def handle_export():
db = get_db()
try:
rows = db.execute("SELECT url, title, note FROM pages ORDER BY id").fetchall()
finally:
return_db(db)
data = [{"url": r["url"], "title": r["title"], "note": r["note"]} for r in rows]
return _json_response(data, headers={"Content-Disposition": "attachment; filename=tinyweb-export.json"})
def handle_import_form(msg=""):
return _respond(
f"<h1>import</h1>"
f"<p>Paste the contents of a tinyweb export file (JSON).</p>"
f'<form method="post" action="/import">'
f'{_csrf_field()}'
f'<textarea name="data" rows="12" cols="60" placeholder=\'[{{"url": "...", "note": "..."}}]\'></textarea><br><br>'
f'<button type="submit">import</button>'
f"</form>"
f"<p>{msg}</p>"
f'<a href="/pages">back</a>'
)
def handle_import_submit(body):
raw = body.get("data", [""])[0].strip()
if not raw:
return handle_import_form("Paste JSON data.")
try:
data = json.loads(raw)
except json.JSONDecodeError:
return handle_import_form("Invalid JSON.")
if not isinstance(data, list):
return handle_import_form("Expected a JSON array.")
MAX_IMPORT = 100
if len(data) > MAX_IMPORT:
return handle_import_form(f"Too many entries. Maximum is {MAX_IMPORT}.")
imported = 0
errors = 0
for entry in data:
url = entry.get("url", "").strip()
note = entry.get("note", "").strip()
if not url:
continue
try:
index_url(url, note)
imported += 1
except Exception:
errors += 1
return handle_import_form(f"Imported {imported} page(s). {errors} error(s).")
def handle_style_form(msg=""):
template = get_setting("custom_template") or DEFAULT_TEMPLATE
name = get_site_name()
sharing = get_setting("sharing_enabled", "0")
checked = " checked" if sharing == "1" else ""
return _respond(
f"<h1>customize</h1>"
f"<h2>name your search engine</h2>"
f'<form method="post" action="/style">'
f'{_csrf_field()}'
f'<input name="site_name" value="{esc(name)}" placeholder="tinyweb" size="30"><br><br>'
f"<h2>sharing</h2>"
f'<label><input type="checkbox" name="sharing_enabled" value="1"{checked}>'
f" share your site list publicly at /api/sites</label><br><br>"
f"<h2>custom html</h2>"
f"<p>Edit the full page template. Use <code>{esc('{{content}}')}</code> "
f"where page content should appear.</p>"
f'<textarea name="template" rows="20" cols="60">{esc(template)}</textarea><br><br>'
f'<button type="submit">save</button>'
f"</form>"
f"<h2>bookmarklet</h2>"
f"<p>Drag this link to your bookmarks bar. Click it on any page to index it instantly.</p>"
f'<p><a href="javascript:void(fetch(\'http://localhost:8080/bookmark?url=\'+encodeURIComponent(location.href)+\'&token={_get_bookmark_token()}\').then(r=>r.text()).then(t=>alert(t)).catch(()=>alert(\'tinyweb not running\')))">+ save to {esc(name)}</a></p>'
f"<h2>reset</h2>"
f'<form method="post" action="/style/reset">'
f'{_csrf_field()}'
f'<button type="submit">reset template to default</button>'
f"</form>"
f"<p>{msg}</p>"
f'<a href="/">back</a>',
use_default=True,
)
def handle_style_submit(body):
template = body.get("template", [""])[0].replace("\r\n", "\n").replace("\r", "\n")
name = body.get("site_name", ["tinyweb"])[0].strip()
sharing = "1" if body.get("sharing_enabled") else "0"
set_setting("custom_template", template if template.strip() != DEFAULT_TEMPLATE.strip() else "")
set_setting("site_name", name or "tinyweb")
set_setting("sharing_enabled", sharing)
return handle_style_form("Saved.")
def handle_about():
name = get_site_name()
dest_hash = get_setting("dest_hash")
sharing = get_setting("sharing_enabled", "0") == "1"
db = get_db()
try:
page_count = db.execute("SELECT count(*) FROM pages").fetchone()[0]
tag_count = db.execute("SELECT count(DISTINCT tag_id) FROM page_tags").fetchone()[0]
sub_count = db.execute("SELECT count(*) FROM subscriptions").fetchone()[0]
finally:
return_db(db)
sharing_html = (
'<p>This instance shares its index publicly. Subscribe to join the network.</p>'
if sharing else
'<p>This instance is private.</p>'
)
hash_html = ""
if dest_hash:
hash_html = (
f'<h2>subscribe</h2>'
f'<p>To subscribe to this instance, add this destination hash in your TinyWeb:</p>'
f'<pre>{esc(dest_hash)}</pre>'
)
return _respond(
f'<h1>{esc(name)}</h1>'
f'<p>A personal search engine, built for the slow web.</p>'
f'<p>TinyWeb is about taking back the internet. No algorithms, no ads, no tracking. '
f'Just human-curated pages shared freely across a mesh network.</p>'
f'<ul>'
f'<li><b>{page_count}</b> page(s) indexed</li>'
f'<li><b>{tag_count}</b> tag(s)</li>'
f'<li><b>{sub_count}</b> subscription(s)</li>'
f'</ul>'
f'{sharing_html}'
f'{hash_html}'
f'<h2>what is the slow web?</h2>'
f'<p>The slow web is a movement for intentionality over speed, '
f'human curation over algorithmic feeds, privacy over surveillance, '
f'and community over corporations. Every page in this index was saved by a person '
f'because they found it valuable — not because an algorithm told them to click.</p>'
f'<h2>how it works</h2>'
f'<ul>'
f'<li>Save pages you find valuable with the bookmarklet or /add</li>'
f'<li>Search your personal index — queries never leave your machine</li>'
f'<li>Subscribe to friends over Reticulum — encrypted, decentralized, works without the internet</li>'
f'<li>Tag and organize your collection into curated lists</li>'
f'</ul>'
f'<p><a href="/">search</a> | <a href="/pages">browse</a> | <a href="/tags">tags</a></p>'
)
def handle_tags():
db = get_db()
try:
rows = db.execute(
"SELECT t.name, COUNT(pt.page_id) AS cnt FROM tags t "
"JOIN page_tags pt ON t.id = pt.tag_id "
"GROUP BY t.id ORDER BY t.name"
).fetchall()
finally:
return_db(db)
items = ""
for r in rows:
items += f'<li><a href="/tags/{esc(r["name"])}">{esc(r["name"])}</a> ({r["cnt"]})</li>'
return _respond(
f"<h1>tags</h1>"
f"<ul>{items}</ul>" if items else "<p>No tags yet. Add tags when saving or editing pages.</p>"
f'<a href="/">back</a>'
)
def handle_tag_browse(tag_name, query=None):
page = _paginate(query or {})
offset = (page - 1) * PER_PAGE
db = get_db()
try:
total = db.execute(
"SELECT count(*) FROM page_tags pt JOIN tags t ON t.id = pt.tag_id WHERE t.name = ?",
(tag_name,),
).fetchone()[0]
rows = db.execute(
"SELECT p.id, p.url, p.title, p.note FROM pages p "
"JOIN page_tags pt ON p.id = pt.page_id "
"JOIN tags t ON t.id = pt.tag_id "
"WHERE t.name = ? ORDER BY p.id DESC LIMIT ? OFFSET ?",
(tag_name, PER_PAGE, offset),
).fetchall()
items = ""
for r in rows:
note_html = f' — <em>{esc(r["note"])}</em>' if r["note"] else ""
tags = _get_page_tags(r["id"], db)
tag_links = " ".join(f'<a href="/tags/{esc(t)}">[{esc(t)}]</a>' for t in tags)
items += (
f'<li>{esc(r["title"])}{note_html} {tag_links} '
f'<small>(<a href="{esc(r["url"])}">{esc(r["url"])}</a>)</small></li>'
)
finally:
return_db(db)
return _respond(
f'<h1>tag: {esc(tag_name)}</h1>'
f'<p>{total} page(s)</p>'
f'<ul>{items}</ul>'
f'{_page_nav(page, total, f"/tags/{esc(tag_name)}")}'
f'<a href="/tags">all tags</a> | <a href="/">back</a>'
)
def handle_api_sites(query=None):
if get_setting("sharing_enabled", "0") != "1":
return _json_response(
{"error": "sharing disabled"},
status=403,
headers={"Access-Control-Allow-Origin": "*"},
)
since = (query or {}).get("since", [""])[0].strip()
db = get_db()
try:
if since:
rows = db.execute(
"SELECT id, url, title, note, last_modified FROM pages "
"WHERE last_modified > ? ORDER BY id DESC",
(since,),
).fetchall()
else:
rows = db.execute("SELECT id, url, title, note, last_modified FROM pages ORDER BY id DESC").fetchall()
sites = []
for r in rows:
tags = _get_page_tags(r["id"], db)
sites.append({
"url": r["url"], "title": r["title"], "note": r["note"],
"tags": tags, "last_modified": r["last_modified"] or "",
})
# Include list of all current URLs so subscriber can detect deletions
all_urls = [r["url"] for r in db.execute("SELECT url FROM pages").fetchall()] if not since else None
finally:
return_db(db)
data = {"name": get_site_name(), "sites": sites}
if all_urls is not None:
data["all_urls"] = all_urls
return _json_response(data, headers={"Access-Control-Allow-Origin": "*"})
def handle_subscriptions(msg=""):
db = get_db()
try:
subs = db.execute("SELECT * FROM subscriptions ORDER BY id DESC").fetchall()
finally:
return_db(db)
cards = ""
for s in subs:
auto_label = "on" if s["auto_sync"] else "off"
last = s["last_sync"] or "never"
cards += (
f'<div style="border:1px solid #ddd;border-radius:4px;padding:0.9rem 1rem;margin-bottom:0.75rem">'
f'<div style="margin-bottom:0.4rem"><b>{esc(s["name"] or "unknown")}</b></div>'
f'<div><small>{esc(s["dest_hash"])}</small></div>'
f'<div style="margin-top:0.4rem;font-size:0.85rem;color:#606060">last sync: {esc(last)}</div>'
f'<div style="display:flex;gap:0.5rem;align-items:center;flex-wrap:wrap;margin-top:0.7rem">'
f'<a href="/subscriptions/browse/{s["id"]}">browse</a>'
f'<form method="post" action="/subscriptions/sync/{s["id"]}" style="display:inline">'
f'{_csrf_field()}<button>sync now</button></form>'
f'<form method="post" action="/subscriptions/autosync/{s["id"]}" style="display:inline">'
f'{_csrf_field()}<button>auto-sync: {auto_label}</button></form>'
f'<form method="post" action="/subscriptions/delete/{s["id"]}" style="display:inline">'
f'{_csrf_field()}<button>remove</button></form>'
f'</div>'
f'</div>'
)
listing = ""
if subs:
listing = (
f'{cards}'
f'<form method="post" action="/subscriptions/syncall">'
f'{_csrf_field()}<button>sync all</button></form>'
)
return _respond(
f"<h1>subscriptions</h1>"
f'<form method="post" action="/subscriptions/add">'
f'{_csrf_field()}'
f'<input name="dest_hash" placeholder="destination hash" size="40"> '
f'<button>subscribe</button>'
f'</form>'
f'<p>{msg}</p>'
f'<hr>{listing}'
f'<br><a href="/">back</a>'
)
def handle_subscription_add(body):
dest_hash = body.get("dest_hash", [""])[0].strip().replace("<", "").replace(">", "")
if not dest_hash or len(dest_hash) != 32:
return handle_subscriptions("Enter a valid 32-character destination hash.")
try:
int(dest_hash, 16)
except ValueError:
return handle_subscriptions("Invalid destination hash (must be hex).")
try:
data = fetch_remote_sites(dest_hash)
name = data.get("name", "")
except PermissionError:
return handle_subscriptions("That instance has sharing disabled.")
except Exception:
return handle_subscriptions("Could not reach that instance.")
db = get_db()
try:
db.execute(
"INSERT INTO subscriptions (dest_hash, name) VALUES (?, ?) "
"ON CONFLICT(dest_hash) DO UPDATE SET name=excluded.name",
(dest_hash, name),
)
db.commit()
finally:
return_db(db)
return handle_subscriptions(f"Subscribed to {esc(name or dest_hash)}.")
def handle_subscription_browse(sub_id):
db = get_db()
try:
sub = db.execute("SELECT * FROM subscriptions WHERE id = ?", (sub_id,)).fetchone()
if not sub:
return _error(404)
local_urls = set(r["url"] for r in db.execute("SELECT url FROM pages").fetchall())
# Use locally synced data if available, otherwise fetch live
remote_rows = db.execute(
"SELECT url, title, note, tags FROM remote_pages WHERE subscription_id = ?",
(sub_id,),
).fetchall()
finally:
return_db(db)
if remote_rows:
sites = []
for r in remote_rows:
tags = [t for t in r["tags"].split(",") if t] if r["tags"] else []
sites.append({"url": r["url"], "title": r["title"], "note": r["note"], "tags": tags})
else:
try:
data = fetch_remote_sites(sub["dest_hash"])
sites = data.get("sites", [])
except PermissionError:
return handle_subscriptions("That instance has sharing disabled.")
except Exception:
return handle_subscriptions("Could not fetch sites from that instance.")
new_items = ""
existing_items = ""
new_count = 0
for s in sites:
if s["url"] in local_urls:
existing_items += (
f'<li style="opacity:0.5">{esc(s["title"])} '
f'<small>({esc(s["url"])})</small> — already indexed</li>'
)
else:
new_count += 1
note_html = f' — <em>{esc(s["note"])}</em>' if s.get("note") else ""
tags_html = ""
if s.get("tags"):
tags_html = " " + " ".join(f'[{esc(t)}]' for t in s["tags"])
new_items += (
f'<li><label><input type="checkbox" name="urls" value="{esc(s["url"])}">'
f' {esc(s["title"])}{note_html}{tags_html}'
f' <small>({esc(s["url"])})</small></label></li>'
)
buttons = ""
if new_count:
buttons = '<button>import selected</button> <button name="import_all" value="1">import all new</button>'
return _respond(
f'<h1>browsing: {esc(sub["name"] or sub["dest_hash"])}</h1>'
f'<p>{len(sites)} site(s) available, {new_count} new</p>'
f'<form method="post" action="/subscriptions/pick">'
f'{_csrf_field()}'
f'<input type="hidden" name="sub_id" value="{sub_id}">'
f'<ul>{new_items}</ul>'
f'{buttons}'
f'</form>'
f'<h3>already indexed</h3><ul>{existing_items}</ul>'
f'<a href="/subscriptions">back</a>'
)
def handle_subscription_pick(body):
sub_id = body.get("sub_id", [""])[0]
import_all = body.get("import_all", [""])[0]
# Build a url->tags map from remote_pages for this subscription
db = get_db()
try:
remote_rows = db.execute(
"SELECT url, tags FROM remote_pages WHERE subscription_id = ?", (sub_id,)
).fetchall()
remote_tags = {r["url"]: r["tags"] for r in remote_rows}
if import_all:
local_urls = set(r["url"] for r in db.execute("SELECT url FROM pages").fetchall())
urls = [r["url"] for r in remote_rows if r["url"] not in local_urls]
else:
urls = body.get("urls", [])
finally:
return_db(db)
if not urls:
return handle_subscriptions("No sites selected.")
imported = 0
errors = 0
for url in urls:
try:
index_url(url)
# Import tags from the remote page
tags_str = remote_tags.get(url, "")
if tags_str:
db = get_db()
try:
row = db.execute("SELECT id FROM pages WHERE url = ?", (url,)).fetchone()
if row:
_set_page_tags(row["id"], tags_str, db)
db.commit()
finally:
return_db(db)
imported += 1
except Exception:
errors += 1
return handle_subscriptions(f"Imported {imported} page(s). {errors} error(s).")
def handle_subscription_sync(sub_id):
db = get_db()
try:
sub = db.execute("SELECT * FROM subscriptions WHERE id = ?", (sub_id,)).fetchone()
if not sub:
return handle_subscriptions("Subscription not found.")
# Use last_sync for delta sync if available
since = sub["last_sync"].replace(" ", "T") if sub["last_sync"] else ""
try:
data = fetch_remote_sites(sub["dest_hash"], since=since)
sites = data.get("sites", [])
all_urls = data.get("all_urls")
remote_name = data.get("name", sub["name"])
except PermissionError:
return handle_subscriptions("That instance has sharing disabled.")
except Exception:
return handle_subscriptions("Could not sync with that instance.")
# If full sync (all_urls provided), remove pages no longer on remote
if all_urls is not None:
existing = db.execute(
"SELECT id, url FROM remote_pages WHERE subscription_id = ?", (sub_id,)
).fetchall()
remote_url_set = set(all_urls)
for row in existing:
if row["url"] not in remote_url_set:
db.execute("DELETE FROM remote_pages WHERE id = ?", (row["id"],))
# Upsert changed/new pages
synced = 0
for s in sites:
try:
tags_str = ",".join(s.get("tags", []))
db.execute(
"INSERT INTO remote_pages (subscription_id, url, title, note, tags) VALUES (?, ?, ?, ?, ?) "
"ON CONFLICT(subscription_id, url) DO UPDATE SET title=excluded.title, note=excluded.note, tags=excluded.tags",
(sub_id, s["url"], s["title"], s.get("note", ""), tags_str),
)
synced += 1
except Exception:
pass
now = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
db.execute("UPDATE subscriptions SET last_sync = ?, name = ? WHERE id = ?", (now, remote_name, sub_id))
db.commit()
finally:
return_db(db)
return handle_subscriptions(f"Synced {synced} site(s) from {esc(remote_name)}.")
def handle_subscription_autosync(sub_id):
db = get_db()
try:
db.execute("UPDATE subscriptions SET auto_sync = 1 - auto_sync WHERE id = ?", (sub_id,))
db.commit()
finally:
return_db(db)
return _redirect("/subscriptions")
def handle_subscription_delete(sub_id):
db = get_db()
try:
db.execute("DELETE FROM remote_pages WHERE subscription_id = ?", (sub_id,))
db.execute("DELETE FROM subscriptions WHERE id = ?", (sub_id,))
db.commit()
finally:
return_db(db)
return _redirect("/subscriptions")
def handle_subscription_syncall():
db = get_db()
try:
subs = db.execute("SELECT * FROM subscriptions WHERE auto_sync = 1").fetchall()
finally:
return_db(db)
if not subs:
return handle_subscriptions("No subscriptions have auto-sync enabled.")
total = 0
for sub in subs:
try:
since = sub["last_sync"].replace(" ", "T") if sub["last_sync"] else ""
data = fetch_remote_sites(sub["dest_hash"], since=since)
sites = data.get("sites", [])
all_urls = data.get("all_urls")
remote_name = data.get("name", sub["name"])
db = get_db()
try:
if all_urls is not None:
existing = db.execute(
"SELECT id, url FROM remote_pages WHERE subscription_id = ?", (sub["id"],)
).fetchall()
remote_url_set = set(all_urls)
for row in existing:
if row["url"] not in remote_url_set:
db.execute("DELETE FROM remote_pages WHERE id = ?", (row["id"],))
for s in sites:
try:
tags_str = ",".join(s.get("tags", []))
db.execute(
"INSERT INTO remote_pages (subscription_id, url, title, note, tags) VALUES (?, ?, ?, ?, ?) "
"ON CONFLICT(subscription_id, url) DO UPDATE SET title=excluded.title, note=excluded.note, tags=excluded.tags",
(sub["id"], s["url"], s["title"], s.get("note", ""), tags_str),
)
except Exception:
pass
now = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
db.execute("UPDATE subscriptions SET last_sync = ?, name = ? WHERE id = ?", (now, remote_name, sub["id"]))
db.commit()
finally:
return_db(db)
total += 1
except Exception:
pass
return handle_subscriptions(f"Synced {total} subscription(s).")
# --- Dispatcher ---
def _dispatch_inner(data):
method = data.get("method", "GET")
path = data.get("path", "/")
query = data.get("query", {})
body = data.get("body", {})
gateway_host = data.get("gateway_host", "")
def extract_id(prefix):
try:
return int(path[len(prefix):])
except (ValueError, IndexError):
return None
if method == "GET":
if path == "/":
return handle_search(query)
elif path == "/add":
return handle_add_form()
elif path == "/pages":
return handle_pages(query)
elif path.startswith("/edit/"):
pid = extract_id("/edit/")
return handle_edit_form(pid) if pid is not None else _error(400)
elif path.startswith("/delete/"):
pid = extract_id("/delete/")
return handle_delete_confirm(pid) if pid is not None else _error(400)
elif path == "/bookmark":
return handle_bookmark(query)
elif path == "/style":
return handle_style_form()
elif path == "/about":
return handle_about()
elif path == "/export":
return handle_export()
elif path == "/import":
return handle_import_form()
elif path == "/tags":
return handle_tags()
elif path.startswith("/tags/"):
tag_name = unquote(path[len("/tags/"):])
return handle_tag_browse(tag_name, query) if tag_name else _error(400)
elif path == "/api/sites":
return handle_api_sites(query)
elif path == "/subscriptions":
return handle_subscriptions()
elif path.startswith("/subscriptions/browse/"):
sid = extract_id("/subscriptions/browse/")
return handle_subscription_browse(sid) if sid is not None else _error(400)
elif method == "POST":
if not _check_csrf(body):
return _respond("<h1>403 Forbidden</h1><p>Invalid or missing CSRF token.</p>", status=403)
if path == "/add":
return handle_add_submit(body)
elif path.startswith("/edit/"):
pid = extract_id("/edit/")
return handle_edit_submit(pid, body) if pid is not None else _error(400)
elif path.startswith("/delete/"):
pid = extract_id("/delete/")
return handle_delete(pid) if pid is not None else _error(400)
elif path == "/style":
return handle_style_submit(body)
elif path == "/style/reset":
set_setting("custom_template", "")
return handle_style_form("Template reset to default.")
elif path == "/import":
return handle_import_submit(body)
elif path == "/subscriptions/add":
return handle_subscription_add(body)
elif path == "/subscriptions/pick":
return handle_subscription_pick(body)
elif path.startswith("/subscriptions/sync/"):
sid = extract_id("/subscriptions/sync/")
return handle_subscription_sync(sid) if sid is not None else _error(400)
elif path.startswith("/subscriptions/autosync/"):
sid = extract_id("/subscriptions/autosync/")
return handle_subscription_autosync(sid) if sid is not None else _error(400)
elif path.startswith("/subscriptions/delete/"):
sid = extract_id("/subscriptions/delete/")
return handle_subscription_delete(sid) if sid is not None else _error(400)
elif path == "/subscriptions/syncall":
return handle_subscription_syncall()
return _error(404)
def dispatch_request(data):
cookies = data.get("cookies", {})
csrf_token = cookies.get("_csrf", "")
if not csrf_token:
csrf_token = secrets.token_hex(32)
_request_local.csrf_token = csrf_token
resp = _dispatch_inner(data)
resp.setdefault("headers", {})
resp["headers"]["Set-Cookie"] = f"_csrf={csrf_token}; SameSite=Strict; HttpOnly; Path=/"
resp["headers"]["X-Frame-Options"] = "DENY"
resp["headers"]["X-Content-Type-Options"] = "nosniff"
if resp.get("content_type", "").startswith("text/html"):
resp["headers"]["Content-Security-Policy"] = (
"default-src 'self'; "
"script-src 'self' 'unsafe-inline'; "
"style-src 'self' 'unsafe-inline' https://fonts.googleapis.com; "
"font-src 'self' https://fonts.gstatic.com; "
"img-src * data:; "
"frame-ancestors 'none'; "
"form-action 'self'; "
"base-uri 'self'"
)
return resp