import json import secrets from datetime import datetime from db import get_db, get_setting, set_setting, get_site_name, index_url, clean_url from templates import esc, snippet, wrap_page, DEFAULT_TEMPLATE from rns_client import fetch_remote_sites _csrf_token = secrets.token_hex(32) def _csrf_field(): return f'' def _check_csrf(body): token = body.get("_csrf", [""])[0] return secrets.compare_digest(token, _csrf_token) def _sanitize_fts_query(query): """Escape user input for safe use in FTS5 MATCH.""" escaped = query.replace('"', '""') return f'"{escaped}"' def _respond(body_html, status=200, use_default=False): return { "status": status, "content_type": "text/html; charset=utf-8", "body": wrap_page(body_html, use_default=use_default), "headers": {}, } def _redirect(location): return { "status": 302, "content_type": "text/html; charset=utf-8", "body": "", "headers": {"Location": location}, } def _json_response(data, status=200, headers=None): return { "status": status, "content_type": "application/json", "body": json.dumps(data, indent=2), "headers": headers or {}, } def _text_response(text, status=200, headers=None): return { "status": status, "content_type": "text/plain", "body": text, "headers": headers or {}, } def _error(status): return _respond(f"

{status}

", status) # --- Tag helpers --- def _get_page_tags(page_id, db=None): close = False if db is None: db = get_db() close = True rows = db.execute( "SELECT t.name FROM tags t JOIN page_tags pt ON t.id = pt.tag_id " "WHERE pt.page_id = ? ORDER BY t.name", (page_id,) ).fetchall() if close: db.close() return [r["name"] for r in rows] def _set_page_tags(page_id, tag_string, db=None): close = False if db is None: db = get_db() close = True db.execute("DELETE FROM page_tags WHERE page_id = ?", (page_id,)) for name in (t.strip().lower() for t in tag_string.split(",") if t.strip()): db.execute("INSERT OR IGNORE INTO tags (name) VALUES (?)", (name,)) tag_id = db.execute("SELECT id FROM tags WHERE name = ?", (name,)).fetchone()["id"] db.execute("INSERT OR IGNORE INTO page_tags (page_id, tag_id) VALUES (?, ?)", (page_id, tag_id)) if close: db.commit() db.close() # --- Route handlers --- def handle_search(query): q = query.get("q", [""])[0].strip() db = get_db() count = db.execute("SELECT count(*) FROM pages").fetchone()[0] name = get_site_name() result_html = "" trusted_html = "" if q: try: rows = db.execute( "SELECT p.id, p.url, p.title, p.body, p.note " "FROM pages_fts f JOIN pages p ON f.rowid = p.id " "WHERE pages_fts MATCH ? ORDER BY rank LIMIT 50", (_sanitize_fts_query(q),), ).fetchall() except Exception: rows = [] if rows: for r in rows: note_html = "" if r["note"]: note_html = f'
{esc(r["note"])}
' tags = _get_page_tags(r["id"], db) tags_html = "" if tags: tag_links = " ".join(f'[{esc(t)}]' for t in tags) tags_html = f'
{tag_links}
' result_html += ( f'
' f'{esc(r["title"])}
' f'{esc(r["url"])}
' f'{esc(snippet(r["body"], q))}' f'{note_html}{tags_html}' f'
' ) else: result_html = "

No results in your index.

" # search all linked pages from trusted sites words = q.lower().split() all_links = db.execute( "SELECT l.url, l.label, p.title AS source_title " "FROM links l JOIN pages p ON l.page_id = p.id", ).fetchall() indexed_urls = set(r["url"] for r in rows) if rows else set() seen = set() trusted = [] for l in all_links: if l["url"] in indexed_urls or l["url"] in seen: continue if any(w in l["label"].lower() for w in words): seen.add(l["url"]) trusted.append(l) if len(trusted) >= 20: break if trusted: items = "" for l in trusted: items += ( f'
  • {esc(l["label"])} ' f'— from {esc(l["source_title"])}
  • ' ) trusted_html = ( f'
    ' f'from your trusted sites ({len(trusted)})' f'' f'
    ' ) # search synced pages from subscriptions try: remote_rows = db.execute( "SELECT rp.url, rp.title, rp.note, s.name AS source_name " "FROM remote_pages_fts rpf " "JOIN remote_pages rp ON rpf.rowid = rp.id " "JOIN subscriptions s ON rp.subscription_id = s.id " "WHERE remote_pages_fts MATCH ? ORDER BY rank LIMIT 50", (_sanitize_fts_query(q),), ).fetchall() except Exception: remote_rows = [] remote_html = "" if q and remote_rows: # group by source by_source = {} for r in remote_rows: source = r["source_name"] or "unknown" by_source.setdefault(source, []).append(r) for source, items in by_source.items(): source_items = "" for r in items: note_html = f' — {esc(r["note"])}' if r["note"] else "" source_items += ( f'
  • {esc(r["title"])}' f'{note_html} ({esc(r["url"])})
  • ' ) remote_html += ( f'
    ' f'from {esc(source)} ({len(items)})' f'' f'
    ' ) db.close() sub_count = "" if q and remote_rows: sub_count = f" + {len(remote_rows)} from subscriptions" return _respond( f'
    ' f'' f' ' f'
    ' f'

    {count} pages indexed' f' · + add url

    ' f'{result_html}{trusted_html}{remote_html}' ) def handle_add_form(msg=""): return _respond( f"

    add url

    " f'
    ' f'{_csrf_field()}' f'

    ' f'

    ' f'

    ' f'' f"
    " f"

    {msg}

    " f'back' ) def handle_add_submit(body): url = clean_url(body.get("url", [""])[0].strip()) note = body.get("note", [""])[0].strip() tags = body.get("tags", [""])[0].strip() if not url: return handle_add_form("URL is required.") if not url.startswith(("http://", "https://")): return handle_add_form("URL must start with http:// or https://") try: title = index_url(url, note) if tags: db = get_db() row = db.execute("SELECT id FROM pages WHERE url = ?", (url,)).fetchone() if row: _set_page_tags(row["id"], tags, db) db.commit() db.close() return handle_add_form(f'Indexed: {esc(title)}') except Exception as e: return handle_add_form(f"Error: {esc(str(e))}") def handle_pages(): db = get_db() rows = db.execute("SELECT id, url, title, note FROM pages ORDER BY id DESC").fetchall() items = "" for r in rows: note_html = f' — {esc(r["note"])}' if r["note"] else "" tags = _get_page_tags(r["id"], db) tags_html = "" if tags: tag_links = " ".join(f'[{esc(t)}]' for t in tags) tags_html = f' {tag_links}' items += ( f'
  • {esc(r["title"])}{note_html}{tags_html} ' f'({esc(r["url"])}) ' f'edit ' f'remove
  • ' ) db.close() return _respond( f"

    indexed pages ({len(rows)})

    " f"" f'

    export | import

    ' f'back' ) def handle_edit_form(page_id, msg=""): db = get_db() row = db.execute("SELECT id, url, title, note FROM pages WHERE id = ?", (page_id,)).fetchone() if not row: db.close() return _error(404) tags = ", ".join(_get_page_tags(page_id, db)) db.close() return _respond( f"

    edit page

    " f"

    {esc(row['title'])}
    " f"{esc(row['url'])}

    " f'
    ' f'{_csrf_field()}' f'

    ' f'

    ' f'' f"
    " f"

    {msg}

    " f'back' ) def handle_edit_submit(page_id, body): note = body.get("note", [""])[0].strip() tags = body.get("tags", [""])[0].strip() db = get_db() db.execute("UPDATE pages SET note = ? WHERE id = ?", (note, page_id)) _set_page_tags(page_id, tags, db) db.commit() db.close() return _redirect("/pages") def handle_delete_confirm(page_id): db = get_db() row = db.execute("SELECT id, url, title FROM pages WHERE id = ?", (page_id,)).fetchone() db.close() if not row: return _error(404) return _respond( f"

    confirm delete

    " f"

    Remove {esc(row['title'])}
    " f"{esc(row['url'])}

    " f'
    ' f'{_csrf_field()}' f'' f"
    " f' cancel' ) def handle_delete(page_id): db = get_db() db.execute("DELETE FROM links WHERE page_id = ?", (page_id,)) db.execute("DELETE FROM pages WHERE id = ?", (page_id,)) db.commit() db.close() return _redirect("/pages") def handle_bookmark(query): url = clean_url(query.get("url", [""])[0].strip()) if not url or not url.startswith(("http://", "https://")): return _text_response("error: invalid url", headers={"Access-Control-Allow-Origin": "*"}) try: title = index_url(url) msg = f"ok: {title}" except Exception as e: msg = f"error: {e}" return _text_response(msg, headers={"Access-Control-Allow-Origin": "*"}) def handle_export(): db = get_db() rows = db.execute("SELECT url, title, note FROM pages ORDER BY id").fetchall() db.close() data = [{"url": r["url"], "title": r["title"], "note": r["note"]} for r in rows] return _json_response(data, headers={"Content-Disposition": "attachment; filename=tinyweb-export.json"}) def handle_import_form(msg=""): return _respond( f"

    import

    " f"

    Paste the contents of a tinyweb export file (JSON).

    " f'
    ' f'{_csrf_field()}' f'

    ' f'' f"
    " f"

    {msg}

    " f'back' ) def handle_import_submit(body): raw = body.get("data", [""])[0].strip() if not raw: return handle_import_form("Paste JSON data.") try: data = json.loads(raw) except json.JSONDecodeError: return handle_import_form("Invalid JSON.") if not isinstance(data, list): return handle_import_form("Expected a JSON array.") imported = 0 errors = 0 for entry in data: url = entry.get("url", "").strip() note = entry.get("note", "").strip() if not url: continue try: index_url(url, note) imported += 1 except Exception: errors += 1 return handle_import_form(f"Imported {imported} page(s). {errors} error(s).") def handle_style_form(msg="", query=None): if query and "reset" in query: set_setting("custom_template", "") msg = "Template reset to default." template = get_setting("custom_template") or DEFAULT_TEMPLATE name = get_site_name() sharing = get_setting("sharing_enabled", "0") checked = " checked" if sharing == "1" else "" return _respond( f"

    customize

    " f"

    name your search engine

    " f'
    ' f'{_csrf_field()}' f'

    ' f"

    sharing

    " f'

    " f"

    custom html

    " f"

    Edit the full page template. Use {esc('{{content}}')} " f"where page content should appear.

    " f'

    ' f'' f"
    " f"

    bookmarklet

    " f"

    Drag this link to your bookmarks bar. Click it on any page to index it instantly.

    " f'

    + save to {esc(name)}

    ' f"

    {msg}

    " f'back', use_default=True, ) def handle_style_submit(body): template = body.get("template", [""])[0] name = body.get("site_name", ["tinyweb"])[0].strip() sharing = "1" if body.get("sharing_enabled") else "0" set_setting("custom_template", template if template.strip() != DEFAULT_TEMPLATE.strip() else "") set_setting("site_name", name or "tinyweb") set_setting("sharing_enabled", sharing) return handle_style_form("Saved.") def handle_about(): name = get_site_name() dest_hash = get_setting("dest_hash") sharing = get_setting("sharing_enabled", "0") == "1" db = get_db() page_count = db.execute("SELECT count(*) FROM pages").fetchone()[0] tag_count = db.execute("SELECT count(DISTINCT tag_id) FROM page_tags").fetchone()[0] sub_count = db.execute("SELECT count(*) FROM subscriptions").fetchone()[0] db.close() sharing_html = ( '

    This instance shares its index publicly. Subscribe to join the network.

    ' if sharing else '

    This instance is private.

    ' ) hash_html = "" if dest_hash: hash_html = ( f'

    subscribe

    ' f'

    To subscribe to this instance, add this destination hash in your TinyWeb:

    ' f'
    {esc(dest_hash)}
    ' ) return _respond( f'

    {esc(name)}

    ' f'

    A personal search engine, built for the slow web.

    ' f'

    TinyWeb is about taking back the internet. No algorithms, no ads, no tracking. ' f'Just human-curated pages shared freely across a mesh network.

    ' f'' f'{sharing_html}' f'{hash_html}' f'

    what is the slow web?

    ' f'

    The slow web is a movement for intentionality over speed, ' f'human curation over algorithmic feeds, privacy over surveillance, ' f'and community over corporations. Every page in this index was saved by a person ' f'because they found it valuable — not because an algorithm told them to click.

    ' f'

    how it works

    ' f'' f'

    search | browse | tags

    ' ) def handle_tags(): db = get_db() rows = db.execute( "SELECT t.name, COUNT(pt.page_id) AS cnt FROM tags t " "JOIN page_tags pt ON t.id = pt.tag_id " "GROUP BY t.id ORDER BY t.name" ).fetchall() db.close() items = "" for r in rows: items += f'
  • {esc(r["name"])} ({r["cnt"]})
  • ' return _respond( f"

    tags

    " f"" if items else "

    No tags yet. Add tags when saving or editing pages.

    " f'back' ) def handle_tag_browse(tag_name): db = get_db() rows = db.execute( "SELECT p.id, p.url, p.title, p.note FROM pages p " "JOIN page_tags pt ON p.id = pt.page_id " "JOIN tags t ON t.id = pt.tag_id " "WHERE t.name = ? ORDER BY p.id DESC", (tag_name,), ).fetchall() items = "" for r in rows: note_html = f' — {esc(r["note"])}' if r["note"] else "" tags = _get_page_tags(r["id"], db) tag_links = " ".join(f'[{esc(t)}]' for t in tags) items += ( f'
  • {esc(r["title"])}{note_html} {tag_links} ' f'({esc(r["url"])})
  • ' ) db.close() return _respond( f'

    tag: {esc(tag_name)}

    ' f'

    {len(rows)} page(s)

    ' f'' f'all tags | back' ) def handle_api_sites(): if get_setting("sharing_enabled", "0") != "1": return _json_response( {"error": "sharing disabled"}, status=403, headers={"Access-Control-Allow-Origin": "*"}, ) db = get_db() rows = db.execute("SELECT id, url, title, note FROM pages ORDER BY id DESC").fetchall() sites = [] for r in rows: tags = _get_page_tags(r["id"], db) sites.append({"url": r["url"], "title": r["title"], "note": r["note"], "tags": tags}) db.close() data = {"name": get_site_name(), "sites": sites} return _json_response(data, headers={"Access-Control-Allow-Origin": "*"}) def handle_subscriptions(msg=""): db = get_db() subs = db.execute("SELECT * FROM subscriptions ORDER BY id DESC").fetchall() db.close() items = "" for s in subs: auto_label = "on" if s["auto_sync"] else "off" last = s["last_sync"] or "never" items += ( f'' f'{esc(s["name"] or "unknown")}
    {esc(s["dest_hash"])}' f'{esc(last)}' f'' f'
    ' f'{_csrf_field()}
    ' f'' f'' f'browse ' f'
    ' f'{_csrf_field()}
    ' f'
    ' f'{_csrf_field()}
    ' f'' f'' ) table = "" if subs: table = ( f'' f'{items}
    instancelast syncauto-syncactions
    ' f'
    ' f'{_csrf_field()}
    ' ) return _respond( f"

    subscriptions

    " f'
    ' f'{_csrf_field()}' f' ' f'' f'
    ' f'

    {msg}

    ' f'
    {table}' f'
    back' ) def handle_subscription_add(body): dest_hash = body.get("dest_hash", [""])[0].strip().replace("<", "").replace(">", "") if not dest_hash or len(dest_hash) != 32: return handle_subscriptions("Enter a valid 32-character destination hash.") try: int(dest_hash, 16) except ValueError: return handle_subscriptions("Invalid destination hash (must be hex).") try: data = fetch_remote_sites(dest_hash) name = data.get("name", "") except PermissionError: return handle_subscriptions("That instance has sharing disabled.") except Exception as e: return handle_subscriptions(f"Could not reach that instance: {esc(str(e))}") db = get_db() try: db.execute( "INSERT INTO subscriptions (dest_hash, name) VALUES (?, ?) " "ON CONFLICT(dest_hash) DO UPDATE SET name=excluded.name", (dest_hash, name), ) db.commit() finally: db.close() return handle_subscriptions(f"Subscribed to {esc(name or dest_hash)}.") def handle_subscription_browse(sub_id): db = get_db() sub = db.execute("SELECT * FROM subscriptions WHERE id = ?", (sub_id,)).fetchone() if not sub: db.close() return _error(404) local_urls = set(r["url"] for r in db.execute("SELECT url FROM pages").fetchall()) # Use locally synced data if available, otherwise fetch live remote_rows = db.execute( "SELECT url, title, note, tags FROM remote_pages WHERE subscription_id = ?", (sub_id,), ).fetchall() db.close() if remote_rows: sites = [] for r in remote_rows: tags = [t for t in r["tags"].split(",") if t] if r["tags"] else [] sites.append({"url": r["url"], "title": r["title"], "note": r["note"], "tags": tags}) else: try: data = fetch_remote_sites(sub["dest_hash"]) sites = data.get("sites", []) except PermissionError: return handle_subscriptions("That instance has sharing disabled.") except Exception as e: return handle_subscriptions(f"Could not fetch sites: {esc(str(e))}") new_items = "" existing_items = "" new_count = 0 for s in sites: if s["url"] in local_urls: existing_items += ( f'
  • {esc(s["title"])} ' f'({esc(s["url"])}) — already indexed
  • ' ) else: new_count += 1 note_html = f' — {esc(s["note"])}' if s.get("note") else "" tags_html = "" if s.get("tags"): tags_html = " " + " ".join(f'[{esc(t)}]' for t in s["tags"]) new_items += ( f'
  • ' ) buttons = "" if new_count: buttons = ' ' return _respond( f'

    browsing: {esc(sub["name"] or sub["dest_hash"])}

    ' f'

    {len(sites)} site(s) available, {new_count} new

    ' f'
    ' f'{_csrf_field()}' f'' f'' f'{buttons}' f'
    ' f'

    already indexed

    ' f'back' ) def handle_subscription_pick(body): sub_id = body.get("sub_id", [""])[0] import_all = body.get("import_all", [""])[0] # Build a url->tags map from remote_pages for this subscription db = get_db() remote_rows = db.execute( "SELECT url, tags FROM remote_pages WHERE subscription_id = ?", (sub_id,) ).fetchall() remote_tags = {r["url"]: r["tags"] for r in remote_rows} if import_all: local_urls = set(r["url"] for r in db.execute("SELECT url FROM pages").fetchall()) urls = [r["url"] for r in remote_rows if r["url"] not in local_urls] else: urls = body.get("urls", []) db.close() if not urls: return handle_subscriptions("No sites selected.") imported = 0 errors = 0 for url in urls: try: index_url(url) # Import tags from the remote page tags_str = remote_tags.get(url, "") if tags_str: db = get_db() row = db.execute("SELECT id FROM pages WHERE url = ?", (url,)).fetchone() if row: _set_page_tags(row["id"], tags_str, db) db.commit() db.close() imported += 1 except Exception: errors += 1 return handle_subscriptions(f"Imported {imported} page(s). {errors} error(s).") def handle_subscription_sync(sub_id): db = get_db() sub = db.execute("SELECT * FROM subscriptions WHERE id = ?", (sub_id,)).fetchone() if not sub: db.close() return handle_subscriptions("Subscription not found.") try: data = fetch_remote_sites(sub["dest_hash"]) sites = data.get("sites", []) remote_name = data.get("name", sub["name"]) except PermissionError: db.close() return handle_subscriptions("That instance has sharing disabled.") except Exception as e: db.close() return handle_subscriptions(f"Could not sync: {esc(str(e))}") # Clear old remote pages for this subscription and re-insert db.execute("DELETE FROM remote_pages WHERE subscription_id = ?", (sub_id,)) synced = 0 for s in sites: try: tags_str = ",".join(s.get("tags", [])) db.execute( "INSERT INTO remote_pages (subscription_id, url, title, note, tags) VALUES (?, ?, ?, ?, ?)", (sub_id, s["url"], s["title"], s.get("note", ""), tags_str), ) synced += 1 except Exception: pass now = datetime.now().strftime("%Y-%m-%d %H:%M") db.execute("UPDATE subscriptions SET last_sync = ?, name = ? WHERE id = ?", (now, remote_name, sub_id)) db.commit() db.close() return handle_subscriptions(f"Synced {synced} site(s) from {esc(remote_name)}.") def handle_subscription_autosync(sub_id): db = get_db() db.execute("UPDATE subscriptions SET auto_sync = 1 - auto_sync WHERE id = ?", (sub_id,)) db.commit() db.close() return _redirect("/subscriptions") def handle_subscription_delete(sub_id): db = get_db() db.execute("DELETE FROM remote_pages WHERE subscription_id = ?", (sub_id,)) db.execute("DELETE FROM subscriptions WHERE id = ?", (sub_id,)) db.commit() db.close() return _redirect("/subscriptions") def handle_subscription_syncall(): db = get_db() subs = db.execute("SELECT * FROM subscriptions WHERE auto_sync = 1").fetchall() db.close() if not subs: return handle_subscriptions("No subscriptions have auto-sync enabled.") total = 0 for sub in subs: try: data = fetch_remote_sites(sub["dest_hash"]) sites = data.get("sites", []) remote_name = data.get("name", sub["name"]) db = get_db() db.execute("DELETE FROM remote_pages WHERE subscription_id = ?", (sub["id"],)) for s in sites: try: tags_str = ",".join(s.get("tags", [])) db.execute( "INSERT INTO remote_pages (subscription_id, url, title, note, tags) VALUES (?, ?, ?, ?, ?)", (sub["id"], s["url"], s["title"], s.get("note", ""), tags_str), ) except Exception: pass now = datetime.now().strftime("%Y-%m-%d %H:%M") db.execute("UPDATE subscriptions SET last_sync = ?, name = ? WHERE id = ?", (now, remote_name, sub["id"])) db.commit() db.close() total += 1 except Exception: pass return handle_subscriptions(f"Synced {total} subscription(s).") # --- Dispatcher --- def dispatch_request(data): method = data.get("method", "GET") path = data.get("path", "/") query = data.get("query", {}) body = data.get("body", {}) gateway_host = data.get("gateway_host", "") def extract_id(prefix): try: return int(path[len(prefix):]) except (ValueError, IndexError): return None if method == "GET": if path == "/": return handle_search(query) elif path == "/add": return handle_add_form() elif path == "/pages": return handle_pages() elif path.startswith("/edit/"): pid = extract_id("/edit/") return handle_edit_form(pid) if pid is not None else _error(400) elif path.startswith("/delete/"): pid = extract_id("/delete/") return handle_delete_confirm(pid) if pid is not None else _error(400) elif path == "/bookmark": return handle_bookmark(query) elif path == "/style": return handle_style_form(query=query) elif path == "/about": return handle_about() elif path == "/export": return handle_export() elif path == "/import": return handle_import_form() elif path == "/tags": return handle_tags() elif path.startswith("/tags/"): tag_name = path[len("/tags/"):] return handle_tag_browse(tag_name) if tag_name else _error(400) elif path == "/api/sites": return handle_api_sites() elif path == "/subscriptions": return handle_subscriptions() elif path.startswith("/subscriptions/browse/"): sid = extract_id("/subscriptions/browse/") return handle_subscription_browse(sid) if sid is not None else _error(400) elif method == "POST": if not _check_csrf(body): return _respond("

    403 Forbidden

    Invalid or missing CSRF token.

    ", status=403) if path == "/add": return handle_add_submit(body) elif path.startswith("/edit/"): pid = extract_id("/edit/") return handle_edit_submit(pid, body) if pid is not None else _error(400) elif path.startswith("/delete/"): pid = extract_id("/delete/") return handle_delete(pid) if pid is not None else _error(400) elif path == "/style": return handle_style_submit(body) elif path == "/import": return handle_import_submit(body) elif path == "/subscriptions/add": return handle_subscription_add(body) elif path == "/subscriptions/pick": return handle_subscription_pick(body) elif path.startswith("/subscriptions/sync/"): sid = extract_id("/subscriptions/sync/") return handle_subscription_sync(sid) if sid is not None else _error(400) elif path.startswith("/subscriptions/autosync/"): sid = extract_id("/subscriptions/autosync/") return handle_subscription_autosync(sid) if sid is not None else _error(400) elif path.startswith("/subscriptions/delete/"): sid = extract_id("/subscriptions/delete/") return handle_subscription_delete(sid) if sid is not None else _error(400) elif path == "/subscriptions/syncall": return handle_subscription_syncall() return _error(404)