tinyweb/handlers.py
Derick Phan d39f9a7813
Some checks are pending
/ build (push) Waiting to run
/ release (push) Blocked by required conditions
Add bulk operations, select all, and orphaned tag cleanup
- Bulk delete and retag from browse page with checkboxes
- Select all / deselect all toggle
- Delete confirmation shows count of selected pages
- Auto-cleanup orphaned tags on delete, edit, and bulk actions

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-08 10:33:57 -07:00

1474 lines
57 KiB
Python

import json
import re
import secrets
import threading
from datetime import datetime
from urllib.parse import unquote
from db import get_db, return_db, get_setting, set_setting, get_site_name, index_url, clean_url
from templates import esc, wrap_page, DEFAULT_TEMPLATE
from rns_client import fetch_remote_sites
_request_local = threading.local()
def _get_csrf_token():
return getattr(_request_local, 'csrf_token', '')
def _csrf_field():
return f'<input type="hidden" name="_csrf" value="{_get_csrf_token()}">'
def _check_csrf(body):
token = body.get("_csrf", [""])[0]
expected = _get_csrf_token()
if not expected or not token:
return False
return secrets.compare_digest(token, expected)
_STOPWORDS = frozenset({
"a", "an", "the", "and", "or", "but", "is", "are", "was", "were",
"in", "on", "at", "to", "for", "of", "with", "by", "from", "as",
"into", "about", "how", "what", "which", "who", "where", "when",
"do", "does", "did", "be", "been", "being", "have", "has", "had",
"it", "its", "this", "that", "not", "no", "so", "if", "can", "will",
"my", "your", "i", "me", "we", "you", "he", "she", "they",
})
def _sanitize_fts_query(query):
"""Escape user input for safe use in FTS5 MATCH.
Splits into individual quoted tokens joined by implicit AND,
so all words must appear but in any order. Appends * to the
last token for prefix matching. Stopwords are dropped to avoid
overly strict matching.
"""
words = query.split()
if not words:
return '""'
tokens = []
for i, w in enumerate(words):
# Strip FTS5 special characters to prevent injection
cleaned = re.sub(r'["\'\(\)\*\+\-\^~]', '', w).strip()
if not cleaned:
continue
if cleaned.lower() in _STOPWORDS:
continue
if i == len(words) - 1:
# Prefix match on the last token for partial word matching
tokens.append(f"{cleaned}*")
else:
tokens.append(f'"{cleaned}"')
return " ".join(tokens) if tokens else '""'
def _get_bookmark_token():
token = get_setting("bookmark_token")
if not token:
token = secrets.token_hex(16)
set_setting("bookmark_token", token)
return token
def _respond(body_html, status=200, use_default=False):
return {
"status": status,
"content_type": "text/html; charset=utf-8",
"body": wrap_page(body_html, use_default=use_default),
"headers": {},
}
def _redirect(location):
if not location.startswith("/") or location.startswith("//"):
location = "/"
return {
"status": 302,
"content_type": "text/html; charset=utf-8",
"body": "",
"headers": {"Location": location},
}
def _json_response(data, status=200, headers=None):
return {
"status": status,
"content_type": "application/json",
"body": json.dumps(data, indent=2),
"headers": headers or {},
}
def _text_response(text, status=200, headers=None):
return {
"status": status,
"content_type": "text/plain",
"body": text,
"headers": headers or {},
}
def _error(status):
return _respond(f"<h1>{status}</h1>", status)
PER_PAGE = 10
BROWSE_PER_PAGE = 50
def _paginate(query, key="p"):
try:
page = int(query.get(key, ["1"])[0])
except (ValueError, IndexError):
page = 1
return max(1, page)
def _page_nav(page, total, base_url, per_page=None):
per_page = per_page or PER_PAGE
if total <= per_page:
return ""
total_pages = (total + per_page - 1) // per_page
sep = "&" if "?" in base_url else "?"
parts = []
if page > 1:
parts.append(f'<a href="{base_url}{sep}p={page - 1}">&laquo; prev</a>')
parts.append(f"page {page} of {total_pages}")
if page < total_pages:
parts.append(f'<a href="{base_url}{sep}p={page + 1}">next &raquo;</a>')
return f'<p class="pagination">{" | ".join(parts)}</p>'
# --- Tag helpers ---
def _get_page_tags(page_id, db=None):
close = False
if db is None:
db = get_db()
close = True
rows = db.execute(
"SELECT t.name FROM tags t JOIN page_tags pt ON t.id = pt.tag_id "
"WHERE pt.page_id = ? ORDER BY t.name", (page_id,)
).fetchall()
if close:
return_db(db)
return [r["name"] for r in rows]
def _set_page_tags(page_id, tag_string, db=None):
close = False
if db is None:
db = get_db()
close = True
db.execute("DELETE FROM page_tags WHERE page_id = ?", (page_id,))
for name in (t.strip().lower() for t in tag_string.split(",") if t.strip()):
db.execute("INSERT OR IGNORE INTO tags (name) VALUES (?)", (name,))
tag_id = db.execute("SELECT id FROM tags WHERE name = ?", (name,)).fetchone()["id"]
db.execute("INSERT OR IGNORE INTO page_tags (page_id, tag_id) VALUES (?, ?)", (page_id, tag_id))
if close:
db.commit()
return_db(db)
def _cleanup_orphaned_tags(db):
"""Delete tags that have no page associations."""
db.execute("DELETE FROM tags WHERE id NOT IN (SELECT DISTINCT tag_id FROM page_tags)")
# --- Route handlers ---
def handle_search(query):
q = query.get("q", [""])[0].strip()
page = _paginate(query)
offset = (page - 1) * PER_PAGE
db = get_db()
try:
count = db.execute("SELECT count(*) FROM pages").fetchone()[0]
name = get_site_name()
result_html = ""
trusted_html = ""
if q:
# BM25 keyword search with column weights: title=10, body=1, url=5, note=3
try:
fts_q = _sanitize_fts_query(q)
bm25_rows = db.execute(
"SELECT p.id, p.url, p.title, p.body, p.note "
"FROM pages_fts f JOIN pages p ON f.rowid = p.id "
"WHERE pages_fts MATCH ? "
"ORDER BY bm25(pages_fts, 10.0, 1.0, 5.0, 3.0) LIMIT 100",
(fts_q,),
).fetchall()
except Exception:
bm25_rows = []
# Hybrid search: merge BM25 + semantic via RRF
bm25_ids = [r["id"] for r in bm25_rows]
chunk_snippets = {} # page_id -> best chunk text
if get_setting("semantic_search", "1") == "1":
try:
from embeddings import hybrid_search
use_reranker = get_setting("use_reranker", "1") == "1"
fused = hybrid_search(q, bm25_ids, limit=100, db=db, use_reranker=use_reranker)
fused_ids = [pid for pid, _ in fused]
chunk_snippets = {pid: text for pid, text in fused if text}
except Exception:
fused_ids = bm25_ids
else:
fused_ids = bm25_ids
total_results = len(fused_ids)
page_ids = fused_ids[offset:offset + PER_PAGE]
if page_ids:
# Fetch rows in fused order
placeholders = ",".join("?" * len(page_ids))
all_rows = db.execute(
f"SELECT id, url, title, body, note, summary FROM pages WHERE id IN ({placeholders})",
page_ids,
).fetchall()
row_map = {r["id"]: r for r in all_rows}
rows = [row_map[pid] for pid in page_ids if pid in row_map]
else:
rows = []
if rows:
for r in rows:
note_html = ""
if r["note"]:
note_html = f'<div class="note"><em>{esc(r["note"])}</em></div>'
tags = _get_page_tags(r["id"], db)
tags_html = ""
if tags:
tag_links = " ".join(f'<a href="/tags/{esc(t)}" class="tag">[{esc(t)}]</a>' for t in tags)
tags_html = f'<div class="tags">{tag_links}</div>'
snip_html = f'<br>{esc(r["summary"])}' if r["summary"] else ""
result_html += (
f'<div class="result">'
f'<a href="{esc(r["url"])}" rel="noreferrer noopener">{esc(r["title"])}</a><br>'
f'<small>{esc(r["url"])}</small>'
f'{snip_html}'
f'{note_html}{tags_html}'
f'</div>'
)
else:
result_html = "<p>No results in your index.</p>"
# search all linked pages from trusted sites
words = q.lower().split()
all_links = db.execute(
"SELECT l.url, l.label, p.title AS source_title "
"FROM links l JOIN pages p ON l.page_id = p.id",
).fetchall()
indexed_urls = set(r["url"] for r in rows) if rows else set()
seen = set()
trusted = []
for l in all_links:
if l["url"] in indexed_urls or l["url"] in seen:
continue
if any(w in l["label"].lower() for w in words):
seen.add(l["url"])
trusted.append(l)
if len(trusted) >= 20:
break
if trusted:
items = ""
for l in trusted:
items += (
f'<li><a href="{esc(clean_url(l["url"]))}" rel="noreferrer noopener">{esc(l["label"])}</a> '
f'<small>— from {esc(l["source_title"])}</small></li>'
)
trusted_html = (
f'<details class="trusted">'
f'<summary>from your trusted sites ({len(trusted)})</summary>'
f'<ul>{items}</ul>'
f'</details>'
)
# search synced pages from subscriptions
try:
remote_rows = db.execute(
"SELECT rp.url, rp.title, rp.note, s.name AS source_name "
"FROM remote_pages_fts rpf "
"JOIN remote_pages rp ON rpf.rowid = rp.id "
"JOIN subscriptions s ON rp.subscription_id = s.id "
"WHERE remote_pages_fts MATCH ? ORDER BY rank LIMIT 50",
(_sanitize_fts_query(q),),
).fetchall()
except Exception:
remote_rows = []
remote_html = ""
if q and remote_rows:
# group by source
by_source = {}
for r in remote_rows:
source = r["source_name"] or "unknown"
by_source.setdefault(source, []).append(r)
for source, items in by_source.items():
source_items = ""
for r in items:
note_html = f' — <em>{esc(r["note"])}</em>' if r["note"] else ""
source_items += (
f'<li><a href="{esc(clean_url(r["url"]))}" rel="noreferrer noopener">{esc(r["title"])}</a>'
f'{note_html} <small>({esc(clean_url(r["url"]))})</small></li>'
)
remote_html += (
f'<details class="remote" open>'
f'<summary>from {esc(source)} ({len(items)})</summary>'
f'<ul>{source_items}</ul>'
f'</details>'
)
finally:
return_db(db)
sub_count = ""
if q and remote_rows:
sub_count = f" + {len(remote_rows)} from subscriptions"
return _respond(
f'<form method="get" action="/">'
f'<input name="q" value="{esc(q)}" placeholder="search your index" size="40">'
f' <button type="submit">search</button>'
f'</form>'
f'<p class="meta">{count} pages indexed'
f' · <a href="/add">+ add url</a></p>'
f'{result_html}'
f'{_page_nav(page, total_results, f"/?q={esc(q)}") if q else ""}'
f'{trusted_html}{remote_html}'
)
def handle_add_form(msg="", action_type="index"):
if action_type == "subscribe":
return _respond(
f"<h1>subscribe</h1>"
f"<p>Subscribe to a friend's TinyWeb instance to sync their index</p>"
f'<form method="post" action="/subscriptions/add">'
f'{_csrf_field()}'
f'<input name="dest_hash" placeholder="destination hash (32 hex chars)" size="50"><br><br>'
f'<button type="submit">subscribe</button>'
f"</form>"
f"<p><small>or <a href=\"/add\">add a single site</a></small></p>"
f"<p>{msg}</p>"
f'<a href="/">back</a>'
)
return _respond(
f"<h1>add url</h1>"
f"<p>Add a site to your index</p>"
f'<form method="post" action="/add">'
f'{_csrf_field()}'
f'<input name="url" placeholder="https://example.com" size="50"><br><br>'
f'<input name="note" placeholder="why are you saving this? (optional)" size="50"><br><br>'
f'<input name="tags" placeholder="tags (comma-separated, e.g. solarpunk, mesh)" size="50"><br><br>'
f'<button type="submit">index</button>'
f"</form>"
f"<p>{msg}</p>"
f'<a href="/">back</a>'
)
def handle_add_submit(body):
input_type = body.get("input_type", ["url"])[0]
url = body.get("url", [""])[0].strip()
reticulum_dest = body.get("reticulum_dest", [""])[0].strip().replace("<", "").replace(">", "")
note = body.get("note", [""])[0].strip()
tags = body.get("tags", [""])[0].strip()
if input_type == "url":
if not url:
return handle_add_form("URL is required.")
url = clean_url(url)
if not url.startswith(("http://", "https://")):
return handle_add_form("URL must start with http:// or https://")
else:
if not reticulum_dest:
return handle_add_form("Reticulum destination hash is required.")
if len(reticulum_dest) != 32 or not all(c in "0123456789abcdefABCDEF" for c in reticulum_dest):
return handle_add_form("Invalid reticulum destination hash. Must be 32 hex characters.")
url = f"reticulum:{reticulum_dest}"
try:
title = index_url(url, note, reticulum_dest if reticulum_dest else "")
if tags:
db = get_db()
try:
row = db.execute("SELECT id FROM pages WHERE url = ?", (url,)).fetchone()
if row:
_set_page_tags(row["id"], tags, db)
db.commit()
finally:
return_db(db)
return handle_add_form(f'Indexed: {esc(url)}')
except ValueError as e:
return handle_add_form(f"Error: {esc(str(e))}")
except Exception as e:
error_msg = str(e).lower()
# Check if it's a block response
if "block" in error_msg or "cloudflare" in error_msg or "403" in error_msg:
# Show manual entry form for blocked sites
return _respond(
f"<h1>add url (manual entry)</h1>"
f"<p><strong>{esc(url)}</strong> blocks automated access. "
f"You can still save it manually:</p>"
f'<form method="post" action="/add/manual">'
f'{_csrf_field()}'
f'<input type="hidden" name="url" value="{esc(url)}">'
f'<input type="hidden" name="note" value="{esc(note)}">'
f'<input type="hidden" name="tags" value="{esc(tags)}">'
f'<label>Title:</label><br>'
f'<input name="manual_title" size="50" placeholder="page title" required><br><br>'
f'<label>Description:</label><br>'
f'<textarea name="manual_description" rows="4" cols="50" placeholder="what is this site about?" required></textarea><br><br>'
f'<button type="submit">save manually</button>'
f"</form>"
f'<a href="/">back</a>'
)
return handle_add_form(f"Error: could not fetch or index that URL. {esc(str(e)[:100])}")
def handle_add_manual_submit(body):
url = clean_url(body.get("url", [""])[0].strip())
note = body.get("note", [""])[0].strip()
tags = body.get("tags", [""])[0].strip()
manual_title = body.get("manual_title", [""])[0].strip()
manual_desc = body.get("manual_description", [""])[0].strip()
if not url:
return handle_add_form("URL is required.")
if not manual_title or not manual_desc:
return handle_add_form("Title and description are required for manual entry.")
db = get_db()
try:
now = __import__("datetime").datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
db.execute(
"INSERT INTO pages (url, title, body, note, last_modified, summary) VALUES (?, ?, ?, ?, ?, ?) "
"ON CONFLICT(url) DO UPDATE SET title=excluded.title, body=excluded.body, "
"note=excluded.note, last_modified=excluded.last_modified, summary=excluded.summary",
(url, manual_title, manual_desc, note, now, manual_desc[:200]),
)
# Get the page ID
page_id = db.execute("SELECT id FROM pages WHERE url = ?", (url,)).fetchone()[0]
# Add tags if provided
if tags:
_set_page_tags(page_id, tags, db)
db.commit()
# Generate embeddings for this page (if semantic search is enabled)
if get_setting("semantic_search", "1") == "1":
try:
from embeddings import store_embeddings
# Pass the page_id, title, description, and db connection
store_embeddings(page_id, manual_title, manual_desc, db)
db.commit()
except Exception as e:
# Log error but don't fail the whole operation
print(f"Error generating embeddings: {e}")
return handle_add_form(f'Added manually: <a href="{esc(url)}" rel="noreferrer noopener">{esc(manual_title)}</a>')
finally:
return_db(db)
def handle_pages(query=None):
msg = query.get("msg", [""])[0] if query else ""
msg_html = f'<p class="success">{esc(msg)}</p>' if msg else ""
page = _paginate(query or {})
offset = (page - 1) * BROWSE_PER_PAGE
db = get_db()
try:
total = db.execute("SELECT count(*) FROM pages").fetchone()[0]
rows = db.execute(
"SELECT id, url, title, note FROM pages ORDER BY id DESC LIMIT ? OFFSET ?",
(BROWSE_PER_PAGE, offset),
).fetchall()
items = ""
for r in rows:
note_html = f' — <em>{esc(r["note"])}</em>' if r["note"] else ""
tags = _get_page_tags(r["id"], db)
tags_html = ""
if tags:
tag_links = " ".join(f'<a href="/tags/{esc(t)}">[{esc(t)}]</a>' for t in tags)
tags_html = f' {tag_links}'
items += (
f'<li><label><input type="checkbox" name="ids" value="{r["id"]}"> '
f'{esc(r["title"])}</label>{note_html}{tags_html} '
f'<small>(<a href="{esc(r["url"])}" rel="noreferrer noopener">{esc(r["url"])}</a>)</small> '
f'<a href="/edit/{r["id"]}">edit</a> '
f'<a href="/delete/{r["id"]}">remove</a></li>'
)
finally:
return_db(db)
return _respond(
f"<h1>indexed pages ({total})</h1>"
f"{msg_html}"
f'<form method="post" action="/pages/bulk">'
f'{_csrf_field()}'
f'<p><label><input type="checkbox" id="select-all"> select all</label></p>'
f"<ul>{items}</ul>"
f'{_page_nav(page, total, "/pages", BROWSE_PER_PAGE)}'
f'<details><summary>bulk actions</summary>'
f'<p><button type="submit" name="action" value="delete" id="bulk-delete">delete selected</button></p>'
f'<p><input name="bulk_tags" placeholder="tags (comma-separated)" size="40"> '
f'<select name="tag_mode"><option value="add">add tags</option><option value="replace">replace tags</option></select> '
f'<button type="submit" name="action" value="retag">retag selected</button></p>'
f'</details>'
f'</form>'
f'<script>'
f'document.getElementById("select-all").addEventListener("change",function(){{'
f'document.querySelectorAll("input[name=ids]").forEach(function(c){{c.checked=this.checked}}.bind(this))'
f'}});'
f'document.getElementById("bulk-delete").addEventListener("click",function(e){{'
f'var n=document.querySelectorAll("input[name=ids]:checked").length;'
f'if(!n){{e.preventDefault();return}}'
f'if(!confirm("Delete "+n+" selected page"+(n===1?"":"s")+"?"))e.preventDefault()'
f'}});'
f'</script>'
f'<p><a href="/export">export</a> | <a href="/import">import</a></p>'
f'<a href="/">back</a>'
)
def handle_bulk_action(body):
ids = body.get("ids", [])
action = body.get("action", [""])[0]
if not ids:
return _redirect("/pages")
# Validate all ids are integers
try:
page_ids = [int(i) for i in ids]
except ValueError:
return _error(400)
db = get_db()
try:
if action == "delete":
for pid in page_ids:
db.execute("DELETE FROM page_tags WHERE page_id = ?", (pid,))
db.execute("DELETE FROM links WHERE page_id = ?", (pid,))
db.execute("DELETE FROM pages WHERE id = ?", (pid,))
_cleanup_orphaned_tags(db)
db.commit()
elif action == "retag":
bulk_tags = body.get("bulk_tags", [""])[0].strip()
tag_mode = body.get("tag_mode", ["add"])[0]
if bulk_tags:
for pid in page_ids:
if tag_mode == "add":
existing = _get_page_tags(pid, db)
new_tags = [t.strip().lower() for t in bulk_tags.split(",") if t.strip()]
merged = ", ".join(sorted(set(existing + new_tags)))
_set_page_tags(pid, merged, db)
else:
_set_page_tags(pid, bulk_tags, db)
_cleanup_orphaned_tags(db)
db.commit()
finally:
return_db(db)
return _redirect("/pages")
def handle_edit_form(page_id, msg=""):
db = get_db()
try:
row = db.execute("SELECT id, url, title, body, note, summary FROM pages WHERE id = ?", (page_id,)).fetchone()
if not row:
return _error(404)
tags = ", ".join(_get_page_tags(page_id, db))
finally:
return_db(db)
return _respond(
f"<h1>edit page</h1>"
f"<p><b>{esc(row['title'])}</b><br>"
f"<small>{esc(row['url'])}</small></p>"
f'<form method="post" action="/edit/{row["id"]}">'
f'{_csrf_field()}'
f'<label>Title:</label><br>'
f'<input name="title" value="{esc(row["title"])}" size="60"><br><br>'
f'<label>Summary (shown in search results):</label><br>'
f'<textarea name="summary" rows="3" cols="60">{esc(row["summary"] or "")}</textarea><br><br>'
f'<label>Note (why you saved this):</label><br>'
f'<input name="note" value="{esc(row["note"])}" size="50"><br><br>'
f'<label>Tags (comma-separated):</label><br>'
f'<input name="tags" value="{esc(tags)}" size="50"><br><br>'
f'<button type="submit">save</button>'
f"</form>"
f"<p>{msg}</p>"
f'<a href="/pages">back</a>'
)
def handle_edit_submit(page_id, body):
title = body.get("title", [""])[0].strip()
summary = body.get("summary", [""])[0].strip()
note = body.get("note", [""])[0].strip()
tags = body.get("tags", [""])[0].strip()
db = get_db()
try:
db.execute(
"UPDATE pages SET title = ?, summary = ?, note = ? WHERE id = ?",
(title, summary, note, page_id)
)
_set_page_tags(page_id, tags, db)
_cleanup_orphaned_tags(db)
db.commit()
finally:
return_db(db)
return _redirect("/pages")
def handle_delete_confirm(page_id):
db = get_db()
try:
row = db.execute("SELECT id, url, title FROM pages WHERE id = ?", (page_id,)).fetchone()
finally:
return_db(db)
if not row:
return _error(404)
return _respond(
f"<h1>confirm delete</h1>"
f"<p>Remove <b>{esc(row['title'])}</b><br>"
f"<small>{esc(row['url'])}</small></p>"
f'<form method="post" action="/delete/{row["id"]}">'
f'{_csrf_field()}'
f'<button type="submit">yes, delete</button>'
f"</form>"
f' <a href="/pages">cancel</a>'
)
def handle_delete(page_id):
db = get_db()
try:
db.execute("DELETE FROM page_tags WHERE page_id = ?", (page_id,))
db.execute("DELETE FROM links WHERE page_id = ?", (page_id,))
db.execute("DELETE FROM pages WHERE id = ?", (page_id,))
_cleanup_orphaned_tags(db)
db.commit()
finally:
return_db(db)
return _redirect("/pages")
def handle_bookmark(query):
token = query.get("token", [""])[0]
expected = _get_bookmark_token()
if not token or not secrets.compare_digest(token, expected):
return _text_response("error: invalid or missing token", status=403, headers={"Access-Control-Allow-Origin": "*"})
url = clean_url(query.get("url", [""])[0].strip())
if not url or not url.startswith(("http://", "https://")):
return _text_response("error: invalid url", headers={"Access-Control-Allow-Origin": "*"})
try:
title = index_url(url)
msg = f"ok: {title}"
except Exception as e:
msg = f"error: {e}"
return _text_response(msg, headers={"Access-Control-Allow-Origin": "*"})
def handle_export():
db = get_db()
try:
rows = db.execute("SELECT url, title, note FROM pages ORDER BY id").fetchall()
finally:
return_db(db)
data = [{"url": r["url"], "title": r["title"], "note": r["note"]} for r in rows]
return _json_response(data, headers={"Content-Disposition": "attachment; filename=tinyweb-export.json"})
def handle_import_form(msg=""):
return _respond(
f"<h1>import</h1>"
f"<p>Paste the contents of a tinyweb export file (JSON).</p>"
f'<form method="post" action="/import">'
f'{_csrf_field()}'
f'<textarea name="data" rows="12" cols="60" placeholder=\'[{{"url": "...", "note": "..."}}]\'></textarea><br><br>'
f'<button type="submit">import</button>'
f"</form>"
f"<p>{msg}</p>"
f'<a href="/pages">back</a>'
)
def handle_import_submit(body):
raw = body.get("data", [""])[0].strip()
if not raw:
return handle_import_form("Paste JSON data.")
try:
data = json.loads(raw)
except json.JSONDecodeError:
return handle_import_form("Invalid JSON.")
if not isinstance(data, list):
return handle_import_form("Expected a JSON array.")
MAX_IMPORT = 100
if len(data) > MAX_IMPORT:
return handle_import_form(f"Too many entries. Maximum is {MAX_IMPORT}.")
imported = 0
errors = 0
for entry in data:
url = entry.get("url", "").strip()
note = entry.get("note", "").strip()
if not url:
continue
try:
index_url(url, note)
imported += 1
except Exception:
errors += 1
return handle_import_form(f"Imported {imported} page(s). {errors} error(s).")
def handle_style_form(msg=""):
template = get_setting("custom_template") or DEFAULT_TEMPLATE
name = get_site_name()
sharing = get_setting("sharing_enabled", "0")
checked = " checked" if sharing == "1" else ""
semantic = get_setting("semantic_search", "0")
semantic_checked = " checked" if semantic == "1" else ""
reranker = get_setting("use_reranker", "0")
reranker_checked = " checked" if reranker == "1" else ""
disabled = "" if semantic == "1" else " disabled"
dimmed = ' style="opacity:0.4"' if semantic != "1" else ""
transport_host = get_setting("transport_host", "reticulum.derickphan.com")
transport_port = get_setting("transport_port", "4242")
return _respond(
f"<h1>customize</h1>"
f"<h2>name your search engine</h2>"
f'<form method="post" action="/style">'
f'{_csrf_field()}'
f'<input name="site_name" value="{esc(name)}" placeholder="tinyweb" size="30"><br><br>'
f"<h2>sharing</h2>"
f'<label><input type="checkbox" name="sharing_enabled" value="1"{checked}>'
f" share your site list publicly at /api/sites</label><br><br>"
f"<h2>mesh network</h2>"
f"<p>Connect to a Reticulum transport node to reach other peers.</p>"
f"<small>Default: reticulum.derickphan.com:4242</small><br>"
f'<input name="transport_host" value="{esc(transport_host)}" placeholder="hostname" size="30">'
f' <input name="transport_port" value="{esc(transport_port)}" placeholder="port" size="6"><br>'
f'<p><a href="https://rmap.world/" target="_blank" rel="noreferrer noopener">discover more nodes</a></p><br>'
f"<h2>search</h2>"
f"<h3>ai</h3>"
f'<label><input type="checkbox" name="semantic_search" value="1"{semantic_checked} '
f'onchange="var d=!this.checked;document.getElementById(\'reranker\').disabled=d;'
f'document.getElementById(\'ai-extras\').style.opacity=d?\'0.4\':\'1\'">'
f" semantic search (similarity matching)</label><br>"
f"<small>Requires onnxruntime, tokenizers, hnswlib. Downloads ~30MB of models on first use.</small><br><br>"
f'<div id="ai-extras"{dimmed}>'
f'<label><input type="checkbox" id="reranker" name="use_reranker" value="1"{reranker_checked}{disabled}>'
f" cross-encoder reranking (more accurate)</label><br>"
f"<small>Uses a 22MB model. Adds ~50ms per search. Disable for faster results.</small><br><br>"
f'<a href="/reindex">manage semantic index</a><br><br>'
f"</div>"
f"<h2>custom html</h2>"
f"<p>Edit the full page template. Use <code>{esc('{{content}}')}</code> "
f"where page content should appear.</p>"
f'<textarea name="template" rows="20" cols="60">{esc(template)}</textarea><br><br>'
f'<button type="submit">save</button>'
f"</form>"
f"<h2>bookmarklet</h2>"
f"<p>Drag this link to your bookmarks bar. Click it on any page to index it instantly.</p>"
f'<p><a href="javascript:void(fetch(\'http://localhost:8080/bookmark?url=\'+encodeURIComponent(location.href)+\'&token={_get_bookmark_token()}\').then(r=>r.text()).then(t=>alert(t)).catch(()=>alert(\'tinyweb not running\')))">+ save to {esc(name)}</a></p>'
f"<h2>reset</h2>"
f'<form method="post" action="/style/reset">'
f'{_csrf_field()}'
f'<button type="submit">reset template to default</button>'
f"</form>"
f"<p>{msg}</p>"
f'<a href="/">back</a>',
use_default=True,
)
def handle_style_submit(body):
template = body.get("template", [""])[0].replace("\r\n", "\n").replace("\r", "\n")
name = body.get("site_name", ["tinyweb"])[0].strip()
sharing = "1" if body.get("sharing_enabled") else "0"
semantic = "1" if body.get("semantic_search") else "0"
reranker = "1" if body.get("use_reranker") else "0"
transport_host = body.get("transport_host", [""])[0].strip()
transport_port = body.get("transport_port", [""])[0].strip()
set_setting("custom_template", template if template.strip() != DEFAULT_TEMPLATE.strip() else "")
set_setting("site_name", name or "tinyweb")
set_setting("sharing_enabled", sharing)
set_setting("semantic_search", semantic)
set_setting("use_reranker", reranker)
if transport_host:
set_setting("transport_host", transport_host)
if transport_port:
set_setting("transport_port", transport_port)
return handle_style_form("Saved. Restart TinyWeb for mesh network changes to take effect.")
def handle_about():
name = get_site_name()
dest_hash = get_setting("dest_hash")
sharing = get_setting("sharing_enabled", "0") == "1"
db = get_db()
try:
page_count = db.execute("SELECT count(*) FROM pages").fetchone()[0]
tag_count = db.execute("SELECT count(DISTINCT tag_id) FROM page_tags").fetchone()[0]
sub_count = db.execute("SELECT count(*) FROM subscriptions").fetchone()[0]
finally:
return_db(db)
sharing_html = (
'<p>This instance shares its index publicly. Subscribe to join the network.</p>'
if sharing else
'<p>This instance is private.</p>'
)
hash_html = ""
if dest_hash:
hash_html = (
f'<h2>subscribe</h2>'
f'<p>To subscribe to this instance, add this destination hash in your TinyWeb:</p>'
f'<pre>{esc(dest_hash)}</pre>'
)
return _respond(
f'<h1>{esc(name)}</h1>'
f'<p>A personal search engine, built for the slow web.</p>'
f'<p>TinyWeb is about taking back the internet. No algorithms, no ads, no tracking. '
f'Just human-curated pages shared freely across a mesh network.</p>'
f'<ul>'
f'<li><b>{page_count}</b> page(s) indexed</li>'
f'<li><b>{tag_count}</b> tag(s)</li>'
f'<li><b>{sub_count}</b> subscription(s)</li>'
f'</ul>'
f'{sharing_html}'
f'{hash_html}'
f'<h2>what is the slow web?</h2>'
f'<p>The slow web is a movement for intentionality over speed, '
f'human curation over algorithmic feeds, privacy over surveillance, '
f'and community over corporations. Every page in this index was saved by a person '
f'because they found it valuable — not because an algorithm told them to click.</p>'
f'<h2>how it works</h2>'
f'<ul>'
f'<li>Save pages you find valuable with the bookmarklet or /add</li>'
f'<li>Search your personal index — queries never leave your machine</li>'
f'<li>Subscribe to friends over Reticulum — encrypted, decentralized, works without the internet</li>'
f'<li>Tag and organize your collection into curated lists</li>'
f'</ul>'
f'<p><a href="/">search</a> | <a href="/pages">browse</a> | <a href="/tags">tags</a></p>'
)
def handle_tags():
db = get_db()
try:
rows = db.execute(
"SELECT t.name, COUNT(pt.page_id) AS cnt FROM tags t "
"JOIN page_tags pt ON t.id = pt.tag_id "
"GROUP BY t.id ORDER BY t.name"
).fetchall()
finally:
return_db(db)
items = ""
for r in rows:
items += f'<li><a href="/tags/{esc(r["name"])}">{esc(r["name"])}</a> ({r["cnt"]})</li>'
return _respond(
f"<h1>tags</h1>"
f"<ul>{items}</ul>" if items else "<p>No tags yet. Add tags when saving or editing pages.</p>"
f'<a href="/">back</a>'
)
def handle_tag_browse(tag_name, query=None):
page = _paginate(query or {})
offset = (page - 1) * BROWSE_PER_PAGE
db = get_db()
try:
total = db.execute(
"SELECT count(*) FROM page_tags pt JOIN tags t ON t.id = pt.tag_id WHERE t.name = ?",
(tag_name,),
).fetchone()[0]
rows = db.execute(
"SELECT p.id, p.url, p.title, p.note FROM pages p "
"JOIN page_tags pt ON p.id = pt.page_id "
"JOIN tags t ON t.id = pt.tag_id "
"WHERE t.name = ? ORDER BY p.id DESC LIMIT ? OFFSET ?",
(tag_name, BROWSE_PER_PAGE, offset),
).fetchall()
items = ""
for r in rows:
note_html = f' — <em>{esc(r["note"])}</em>' if r["note"] else ""
tags = _get_page_tags(r["id"], db)
tag_links = " ".join(f'<a href="/tags/{esc(t)}">[{esc(t)}]</a>' for t in tags)
items += (
f'<li>{esc(r["title"])}{note_html} {tag_links} '
f'<small>(<a href="{esc(r["url"])}" rel="noreferrer noopener">{esc(r["url"])}</a>)</small></li>'
)
finally:
return_db(db)
return _respond(
f'<h1>tag: {esc(tag_name)}</h1>'
f'<p>{total} page(s)</p>'
f'<ul>{items}</ul>'
f'{_page_nav(page, total, f"/tags/{esc(tag_name)}", BROWSE_PER_PAGE)}'
f'<a href="/tags">all tags</a> | <a href="/">back</a>'
)
def handle_api_sites(query=None):
if get_setting("sharing_enabled", "0") != "1":
return _json_response(
{"error": "sharing disabled"},
status=403,
headers={"Access-Control-Allow-Origin": "*"},
)
since = (query or {}).get("since", [""])[0].strip()
db = get_db()
try:
if since:
rows = db.execute(
"SELECT id, url, title, note, last_modified FROM pages "
"WHERE last_modified > ? ORDER BY id DESC",
(since,),
).fetchall()
else:
rows = db.execute("SELECT id, url, title, note, last_modified FROM pages ORDER BY id DESC").fetchall()
sites = []
for r in rows:
tags = _get_page_tags(r["id"], db)
sites.append({
"url": r["url"], "title": r["title"], "note": r["note"],
"tags": tags, "last_modified": r["last_modified"] or "",
})
# Include list of all current URLs so subscriber can detect deletions
all_urls = [r["url"] for r in db.execute("SELECT url FROM pages").fetchall()] if not since else None
finally:
return_db(db)
data = {"name": get_site_name(), "sites": sites}
if all_urls is not None:
data["all_urls"] = all_urls
return _json_response(data, headers={"Access-Control-Allow-Origin": "*"})
def handle_subscriptions(msg=""):
db = get_db()
try:
subs = db.execute("SELECT * FROM subscriptions ORDER BY id DESC").fetchall()
finally:
return_db(db)
cards = ""
for s in subs:
auto_label = "on" if s["auto_sync"] else "off"
last = s["last_sync"] or "never"
cards += (
f'<div style="border:1px solid #ddd;border-radius:4px;padding:0.9rem 1rem;margin-bottom:0.75rem">'
f'<div style="margin-bottom:0.4rem"><b>{esc(s["name"] or "unknown")}</b></div>'
f'<div><small>{esc(s["dest_hash"])}</small></div>'
f'<div style="margin-top:0.4rem;font-size:0.85rem;color:#606060">last sync: {esc(last)}</div>'
f'<div style="display:flex;gap:0.5rem;align-items:center;flex-wrap:wrap;margin-top:0.7rem">'
f'<a href="/subscriptions/browse/{s["id"]}">browse</a>'
f'<form method="post" action="/subscriptions/sync/{s["id"]}" style="display:inline">'
f'{_csrf_field()}<button>sync now</button></form>'
f'<form method="post" action="/subscriptions/autosync/{s["id"]}" style="display:inline">'
f'{_csrf_field()}<button>auto-sync: {auto_label}</button></form>'
f'<form method="post" action="/subscriptions/delete/{s["id"]}" style="display:inline">'
f'{_csrf_field()}<button>remove</button></form>'
f'</div>'
f'</div>'
)
listing = ""
if subs:
listing = (
f'{cards}'
f'<form method="post" action="/subscriptions/syncall">'
f'{_csrf_field()}<button>sync all</button></form>'
)
return _respond(
f"<h1>subscriptions</h1>"
f'<form method="post" action="/subscriptions/add">'
f'{_csrf_field()}'
f'<input name="dest_hash" placeholder="destination hash" size="40"> '
f'<button>subscribe</button>'
f'</form>'
f'<p><small>or <a href="/add?type=subscribe">subscribe to an instance</a></small></p>'
f'<p>{msg}</p>'
f'<hr>{listing}'
f'<br><a href="/">back</a>'
)
def handle_subscription_add(body):
dest_hash = body.get("dest_hash", [""])[0].strip().replace("<", "").replace(">", "")
if not dest_hash or len(dest_hash) != 32:
return handle_subscriptions("Enter a valid 32-character destination hash.")
try:
int(dest_hash, 16)
except ValueError:
return handle_subscriptions("Invalid destination hash (must be hex).")
try:
data = fetch_remote_sites(dest_hash)
name = data.get("name", "")
except PermissionError:
return handle_subscriptions("That instance has sharing disabled.")
except Exception:
return handle_subscriptions("Could not reach that instance.")
db = get_db()
try:
db.execute(
"INSERT INTO subscriptions (dest_hash, name) VALUES (?, ?) "
"ON CONFLICT(dest_hash) DO UPDATE SET name=excluded.name",
(dest_hash, name),
)
db.commit()
finally:
return_db(db)
return handle_subscriptions(f"Subscribed to {esc(name or dest_hash)}.")
def handle_subscription_browse(sub_id):
db = get_db()
try:
sub = db.execute("SELECT * FROM subscriptions WHERE id = ?", (sub_id,)).fetchone()
if not sub:
return _error(404)
local_urls = set(r["url"] for r in db.execute("SELECT url FROM pages").fetchall())
# Use locally synced data if available, otherwise fetch live
remote_rows = db.execute(
"SELECT url, title, note, tags FROM remote_pages WHERE subscription_id = ?",
(sub_id,),
).fetchall()
finally:
return_db(db)
if remote_rows:
sites = []
for r in remote_rows:
tags = [t for t in r["tags"].split(",") if t] if r["tags"] else []
sites.append({"url": r["url"], "title": r["title"], "note": r["note"], "tags": tags})
else:
try:
data = fetch_remote_sites(sub["dest_hash"])
sites = data.get("sites", [])
except PermissionError:
return handle_subscriptions("That instance has sharing disabled.")
except Exception:
return handle_subscriptions("Could not fetch sites from that instance.")
new_items = ""
existing_items = ""
new_count = 0
for s in sites:
if s["url"] in local_urls:
existing_items += (
f'<li style="opacity:0.5">{esc(s["title"])} '
f'<small>({esc(s["url"])})</small> — already indexed</li>'
)
else:
new_count += 1
note_html = f' — <em>{esc(s["note"])}</em>' if s.get("note") else ""
tags_html = ""
if s.get("tags"):
tags_html = " " + " ".join(f'[{esc(t)}]' for t in s["tags"])
new_items += (
f'<li><label><input type="checkbox" name="urls" value="{esc(s["url"])}">'
f' {esc(s["title"])}{note_html}{tags_html}'
f' <small>({esc(s["url"])})</small></label></li>'
)
buttons = ""
if new_count:
buttons = '<button>import selected</button> <button name="import_all" value="1">import all new</button>'
return _respond(
f'<h1>browsing: {esc(sub["name"] or sub["dest_hash"])}</h1>'
f'<p>{len(sites)} site(s) available, {new_count} new</p>'
f'<form method="post" action="/subscriptions/pick">'
f'{_csrf_field()}'
f'<input type="hidden" name="sub_id" value="{sub_id}">'
f'<ul>{new_items}</ul>'
f'{buttons}'
f'</form>'
f'<h3>already indexed</h3><ul>{existing_items}</ul>'
f'<a href="/subscriptions">back</a>'
)
def handle_subscription_pick(body):
sub_id = body.get("sub_id", [""])[0]
import_all = body.get("import_all", [""])[0]
# Build a url->tags map from remote_pages for this subscription
db = get_db()
try:
remote_rows = db.execute(
"SELECT url, tags FROM remote_pages WHERE subscription_id = ?", (sub_id,)
).fetchall()
remote_tags = {r["url"]: r["tags"] for r in remote_rows}
if import_all:
local_urls = set(r["url"] for r in db.execute("SELECT url FROM pages").fetchall())
urls = [r["url"] for r in remote_rows if r["url"] not in local_urls]
else:
urls = body.get("urls", [])
finally:
return_db(db)
if not urls:
return handle_subscriptions("No sites selected.")
imported = 0
errors = 0
for url in urls:
try:
index_url(url)
# Import tags from the remote page
tags_str = remote_tags.get(url, "")
if tags_str:
db = get_db()
try:
row = db.execute("SELECT id FROM pages WHERE url = ?", (url,)).fetchone()
if row:
_set_page_tags(row["id"], tags_str, db)
db.commit()
finally:
return_db(db)
imported += 1
except Exception:
errors += 1
return handle_subscriptions(f"Imported {imported} page(s). {errors} error(s).")
def handle_subscription_sync(sub_id):
db = get_db()
try:
sub = db.execute("SELECT * FROM subscriptions WHERE id = ?", (sub_id,)).fetchone()
if not sub:
return handle_subscriptions("Subscription not found.")
# Use last_sync for delta sync if available
since = sub["last_sync"].replace(" ", "T") if sub["last_sync"] else ""
try:
data = fetch_remote_sites(sub["dest_hash"], since=since)
sites = data.get("sites", [])
all_urls = data.get("all_urls")
remote_name = data.get("name", sub["name"])
except PermissionError:
return handle_subscriptions("That instance has sharing disabled.")
except Exception:
return handle_subscriptions("Could not sync with that instance.")
# If full sync (all_urls provided), remove pages no longer on remote
if all_urls is not None:
existing = db.execute(
"SELECT id, url FROM remote_pages WHERE subscription_id = ?", (sub_id,)
).fetchall()
remote_url_set = set(all_urls)
for row in existing:
if row["url"] not in remote_url_set:
db.execute("DELETE FROM remote_pages WHERE id = ?", (row["id"],))
# Upsert changed/new pages
synced = 0
for s in sites:
try:
tags_str = ",".join(s.get("tags", []))
db.execute(
"INSERT INTO remote_pages (subscription_id, url, title, note, tags) VALUES (?, ?, ?, ?, ?) "
"ON CONFLICT(subscription_id, url) DO UPDATE SET title=excluded.title, note=excluded.note, tags=excluded.tags",
(sub_id, s["url"], s["title"], s.get("note", ""), tags_str),
)
# Embed remote page for semantic search
if get_setting("semantic_search", "1") == "1":
try:
from embeddings import store_remote_embeddings
rp_id = db.execute(
"SELECT id FROM remote_pages WHERE subscription_id = ? AND url = ?",
(sub_id, s["url"]),
).fetchone()["id"]
store_remote_embeddings(rp_id, s["title"], s.get("note", ""), db)
except Exception:
pass
synced += 1
except Exception:
pass
now = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
db.execute("UPDATE subscriptions SET last_sync = ?, name = ? WHERE id = ?", (now, remote_name, sub_id))
db.commit()
finally:
return_db(db)
return handle_subscriptions(f"Synced {synced} site(s) from {esc(remote_name)}.")
def handle_subscription_autosync(sub_id):
db = get_db()
try:
db.execute("UPDATE subscriptions SET auto_sync = 1 - auto_sync WHERE id = ?", (sub_id,))
db.commit()
finally:
return_db(db)
return _redirect("/subscriptions")
def handle_subscription_delete(sub_id):
db = get_db()
try:
db.execute("DELETE FROM remote_pages WHERE subscription_id = ?", (sub_id,))
db.execute("DELETE FROM subscriptions WHERE id = ?", (sub_id,))
db.commit()
finally:
return_db(db)
return _redirect("/subscriptions")
def handle_subscription_syncall():
db = get_db()
try:
subs = db.execute("SELECT * FROM subscriptions WHERE auto_sync = 1").fetchall()
finally:
return_db(db)
if not subs:
return handle_subscriptions("No subscriptions have auto-sync enabled.")
total = 0
for sub in subs:
try:
since = sub["last_sync"].replace(" ", "T") if sub["last_sync"] else ""
data = fetch_remote_sites(sub["dest_hash"], since=since)
sites = data.get("sites", [])
all_urls = data.get("all_urls")
remote_name = data.get("name", sub["name"])
db = get_db()
try:
if all_urls is not None:
existing = db.execute(
"SELECT id, url FROM remote_pages WHERE subscription_id = ?", (sub["id"],)
).fetchall()
remote_url_set = set(all_urls)
for row in existing:
if row["url"] not in remote_url_set:
db.execute("DELETE FROM remote_pages WHERE id = ?", (row["id"],))
for s in sites:
try:
tags_str = ",".join(s.get("tags", []))
db.execute(
"INSERT INTO remote_pages (subscription_id, url, title, note, tags) VALUES (?, ?, ?, ?, ?) "
"ON CONFLICT(subscription_id, url) DO UPDATE SET title=excluded.title, note=excluded.note, tags=excluded.tags",
(sub["id"], s["url"], s["title"], s.get("note", ""), tags_str),
)
if get_setting("semantic_search", "1") == "1":
try:
from embeddings import store_remote_embeddings
rp_id = db.execute(
"SELECT id FROM remote_pages WHERE subscription_id = ? AND url = ?",
(sub["id"], s["url"]),
).fetchone()["id"]
store_remote_embeddings(rp_id, s["title"], s.get("note", ""), db)
except Exception:
pass
except Exception:
pass
now = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
db.execute("UPDATE subscriptions SET last_sync = ?, name = ? WHERE id = ?", (now, remote_name, sub["id"]))
db.commit()
finally:
return_db(db)
total += 1
except Exception:
pass
return handle_subscriptions(f"Synced {total} subscription(s).")
# --- Reindex (semantic search) ---
_reindex_thread = None
def handle_reindex_form():
if get_setting("semantic_search", "1") != "1":
return _respond(
f"<h2>semantic search index</h2>"
f"<p>Semantic search is disabled. Enable it in <a href=\"/style\">settings</a> to use embeddings.</p>"
f'<p><a href="/">back to search</a></p>'
)
db = get_db()
try:
total_pages = db.execute("SELECT count(*) FROM pages").fetchone()[0]
pages_with_chunks = db.execute(
"SELECT count(DISTINCT page_id) FROM chunks WHERE page_id IS NOT NULL"
).fetchone()[0]
finally:
return_db(db)
progress = get_setting("reindex_progress", "")
status_html = ""
if progress:
status_html = f'<p class="meta">Reindex in progress: {esc(progress)}</p>'
elif _reindex_thread and _reindex_thread.is_alive():
status_html = '<p class="meta">Reindex running...</p>'
return _respond(
f"<h2>semantic search index</h2>"
f"<p>{pages_with_chunks} of {total_pages} pages have embeddings.</p>"
f'{status_html}'
f'<form method="post" action="/reindex">'
f'{_csrf_field()}'
f'<button type="submit">reindex all pages</button>'
f'</form>'
f'<p><a href="/">back to search</a></p>'
)
def handle_reindex_submit(body):
global _reindex_thread
if _reindex_thread and _reindex_thread.is_alive():
return handle_reindex_form()
def _run():
try:
from embeddings import reindex_all
def progress(current, total):
set_setting("reindex_progress", f"{current}/{total}")
reindex_all(progress_callback=progress)
except Exception:
pass
finally:
set_setting("reindex_progress", "")
_reindex_thread = threading.Thread(target=_run, daemon=True)
_reindex_thread.start()
return _redirect("/reindex")
# --- Dispatcher ---
def _dispatch_inner(data):
method = data.get("method", "GET")
path = data.get("path", "/")
query = data.get("query", {})
body = data.get("body", {})
gateway_host = data.get("gateway_host", "")
def extract_id(prefix):
try:
return int(path[len(prefix):])
except (ValueError, IndexError):
return None
if method == "GET":
if path == "/":
return handle_search(query)
elif path == "/add":
action_type = query.get("type", ["index"])[0]
return handle_add_form(action_type=action_type if action_type == "subscribe" else "index")
elif path == "/pages":
return handle_pages(query)
elif path.startswith("/edit/"):
pid = extract_id("/edit/")
return handle_edit_form(pid) if pid is not None else _error(400)
elif path.startswith("/delete/"):
pid = extract_id("/delete/")
return handle_delete_confirm(pid) if pid is not None else _error(400)
elif path == "/bookmark":
return handle_bookmark(query)
elif path == "/style":
return handle_style_form()
elif path == "/about":
return handle_about()
elif path == "/export":
return handle_export()
elif path == "/import":
return handle_import_form()
elif path == "/tags":
return handle_tags()
elif path.startswith("/tags/"):
tag_name = unquote(path[len("/tags/"):])
return handle_tag_browse(tag_name, query) if tag_name else _error(400)
elif path == "/reindex":
return handle_reindex_form()
elif path == "/api/sites":
return handle_api_sites(query)
elif path == "/subscriptions":
return handle_subscriptions()
elif path.startswith("/subscriptions/browse/"):
sid = extract_id("/subscriptions/browse/")
return handle_subscription_browse(sid) if sid is not None else _error(400)
elif method == "POST":
if not _check_csrf(body):
return _respond("<h1>403 Forbidden</h1><p>Invalid or missing CSRF token.</p>", status=403)
if path == "/add":
return handle_add_submit(body)
elif path == "/pages/bulk":
return handle_bulk_action(body)
elif path == "/add/manual":
return handle_add_manual_submit(body)
elif path.startswith("/edit/"):
pid = extract_id("/edit/")
return handle_edit_submit(pid, body) if pid is not None else _error(400)
elif path.startswith("/delete/"):
pid = extract_id("/delete/")
return handle_delete(pid) if pid is not None else _error(400)
elif path == "/style":
return handle_style_submit(body)
elif path == "/style/reset":
set_setting("custom_template", "")
return handle_style_form("Template reset to default.")
elif path == "/import":
return handle_import_submit(body)
elif path == "/reindex":
return handle_reindex_submit(body)
elif path == "/subscriptions/add":
return handle_subscription_add(body)
elif path == "/subscriptions/pick":
return handle_subscription_pick(body)
elif path.startswith("/subscriptions/sync/"):
sid = extract_id("/subscriptions/sync/")
return handle_subscription_sync(sid) if sid is not None else _error(400)
elif path.startswith("/subscriptions/autosync/"):
sid = extract_id("/subscriptions/autosync/")
return handle_subscription_autosync(sid) if sid is not None else _error(400)
elif path.startswith("/subscriptions/delete/"):
sid = extract_id("/subscriptions/delete/")
return handle_subscription_delete(sid) if sid is not None else _error(400)
elif path == "/subscriptions/syncall":
return handle_subscription_syncall()
return _error(404)
def dispatch_request(data):
cookies = data.get("cookies", {})
csrf_token = cookies.get("_csrf", "")
if not csrf_token:
csrf_token = secrets.token_hex(32)
_request_local.csrf_token = csrf_token
resp = _dispatch_inner(data)
resp.setdefault("headers", {})
resp["headers"]["Set-Cookie"] = f"_csrf={csrf_token}; SameSite=Strict; HttpOnly; Path=/"
resp["headers"]["X-Frame-Options"] = "DENY"
resp["headers"]["X-Content-Type-Options"] = "nosniff"
if resp.get("content_type", "").startswith("text/html"):
resp["headers"]["Content-Security-Policy"] = (
"default-src 'self'; "
"script-src 'self' 'unsafe-inline'; "
"style-src 'self' 'unsafe-inline'; "
"img-src * data:; "
"frame-ancestors 'none'; "
"form-action 'self'; "
"base-uri 'self'"
)
return resp