import json from datetime import datetime import requests from db import get_db, get_setting, set_setting, get_site_name, index_url from templates import esc, snippet, wrap_page def _respond(body_html, status=200): return { "status": status, "content_type": "text/html; charset=utf-8", "body": wrap_page(body_html), "headers": {}, } def _redirect(location): return { "status": 302, "content_type": "text/html; charset=utf-8", "body": "", "headers": {"Location": location}, } def _json_response(data, status=200, headers=None): return { "status": status, "content_type": "application/json", "body": json.dumps(data, indent=2), "headers": headers or {}, } def _text_response(text, status=200, headers=None): return { "status": status, "content_type": "text/plain", "body": text, "headers": headers or {}, } def _error(status): return _respond(f"

{status}

", status) # --- Route handlers --- def handle_search(query): q = query.get("q", [""])[0].strip() db = get_db() count = db.execute("SELECT count(*) FROM pages").fetchone()[0] name = get_site_name() result_html = "" trusted_html = "" if q: rows = db.execute( "SELECT p.id, p.url, p.title, p.body, p.note " "FROM pages_fts f JOIN pages p ON f.rowid = p.id " "WHERE pages_fts MATCH ? ORDER BY rank LIMIT 50", (q,), ).fetchall() if rows: for r in rows: note_html = "" if r["note"]: note_html = f'
{esc(r["note"])}
' result_html += ( f'
' f'{esc(r["title"])}
' f'{esc(r["url"])}
' f'{esc(snippet(r["body"], q))}' f'{note_html}' f'
' ) else: result_html = "

No results in your index.

" # search all linked pages from trusted sites words = q.lower().split() all_links = db.execute( "SELECT l.url, l.label, p.title AS source_title " "FROM links l JOIN pages p ON l.page_id = p.id", ).fetchall() indexed_urls = set(r["url"] for r in rows) if rows else set() seen = set() trusted = [] for l in all_links: if l["url"] in indexed_urls or l["url"] in seen: continue if any(w in l["label"].lower() for w in words): seen.add(l["url"]) trusted.append(l) if len(trusted) >= 20: break if trusted: items = "" for l in trusted: items += ( f'
  • {esc(l["label"])} ' f'— from {esc(l["source_title"])}
  • ' ) trusted_html = ( f'
    ' f'from your trusted sites ({len(trusted)})' f'' f'
    ' ) db.close() return _respond( f'

    {esc(name)}

    ' f'
    ' f'' f' ' f'
    ' f'

    {count} page(s) indexed.' f' + add url' f' | browse' f' | subscriptions' f' | customize

    ' f'
    {result_html}{trusted_html}' ) def handle_add_form(msg=""): return _respond( f"

    add url

    " f'
    ' f'

    ' f'

    ' f'' f"
    " f"

    {msg}

    " f'back' ) def handle_add_submit(body): url = body.get("url", [""])[0].strip() note = body.get("note", [""])[0].strip() if not url: return handle_add_form("URL is required.") if not url.startswith(("http://", "https://")): return handle_add_form("URL must start with http:// or https://") try: title = index_url(url, note) return handle_add_form(f'Indexed: {esc(title)}') except Exception as e: return handle_add_form(f"Error: {esc(str(e))}") def handle_pages(): db = get_db() rows = db.execute("SELECT id, url, title, note FROM pages ORDER BY id DESC").fetchall() db.close() items = "" for r in rows: note_html = f' — {esc(r["note"])}' if r["note"] else "" items += ( f'
  • {esc(r["title"])}{note_html} ' f'({esc(r["url"])}) ' f'edit ' f'remove
  • ' ) return _respond( f"

    indexed pages ({len(rows)})

    " f"" f'

    export | import

    ' f'back' ) def handle_edit_form(page_id, msg=""): db = get_db() row = db.execute("SELECT id, url, title, note FROM pages WHERE id = ?", (page_id,)).fetchone() db.close() if not row: return _error(404) return _respond( f"

    edit note

    " f"

    {esc(row['title'])}
    " f"{esc(row['url'])}

    " f'
    ' f'

    ' f'' f"
    " f"

    {msg}

    " f'back' ) def handle_edit_submit(page_id, body): note = body.get("note", [""])[0].strip() db = get_db() db.execute("UPDATE pages SET note = ? WHERE id = ?", (note, page_id)) db.commit() db.close() return _redirect("/pages") def handle_delete(page_id): db = get_db() db.execute("DELETE FROM links WHERE page_id = ?", (page_id,)) db.execute("DELETE FROM pages WHERE id = ?", (page_id,)) db.commit() db.close() return _redirect("/pages") def handle_bookmark(query): url = query.get("url", [""])[0].strip() if not url or not url.startswith(("http://", "https://")): return _text_response("error: invalid url", headers={"Access-Control-Allow-Origin": "*"}) try: title = index_url(url) msg = f"ok: {title}" except Exception as e: msg = f"error: {e}" return _text_response(msg, headers={"Access-Control-Allow-Origin": "*"}) def handle_export(): db = get_db() rows = db.execute("SELECT url, title, note FROM pages ORDER BY id").fetchall() db.close() data = [{"url": r["url"], "title": r["title"], "note": r["note"]} for r in rows] return _json_response(data, headers={"Content-Disposition": "attachment; filename=tinyweb-export.json"}) def handle_import_form(msg=""): return _respond( f"

    import

    " f"

    Paste the contents of a tinyweb export file (JSON).

    " f'
    ' f'

    ' f'' f"
    " f"

    {msg}

    " f'back' ) def handle_import_submit(body): raw = body.get("data", [""])[0].strip() if not raw: return handle_import_form("Paste JSON data.") try: data = json.loads(raw) except json.JSONDecodeError: return handle_import_form("Invalid JSON.") if not isinstance(data, list): return handle_import_form("Expected a JSON array.") imported = 0 errors = 0 for entry in data: url = entry.get("url", "").strip() note = entry.get("note", "").strip() if not url: continue try: index_url(url, note) imported += 1 except Exception: errors += 1 return handle_import_form(f"Imported {imported} page(s). {errors} error(s).") def handle_style_form(msg="", gateway_host=""): css = get_setting("custom_css") name = get_site_name() sharing = get_setting("sharing_enabled", "0") checked = " checked" if sharing == "1" else "" host = gateway_host or "localhost:8080" return _respond( f"

    customize

    " f"

    name your search engine

    " f'
    ' f'

    ' f"

    sharing

    " f'

    " f"

    custom css

    " f"

    Some classes you can target:

    " f"
    "
            f"body          - page background, font\n"
            f"h1            - page titles\n"
            f"input, button - search bar\n"
            f"a             - links\n"
            f".result       - each search result\n"
            f".note         - your notes on results\n"
            f".trusted      - trusted sites dropdown\n"
            f"small         - url text\n"
            f"ul, li        - browse page list"
            f"
    " f'

    ' f'' f"
    " f"

    bookmarklet

    " f"

    Drag this link to your bookmarks bar. Click it on any page to index it instantly.

    " f'

    + save to {esc(name)}

    ' f"

    {msg}

    " f'back' ) def handle_style_submit(body): css = body.get("css", [""])[0] name = body.get("site_name", ["tinyweb"])[0].strip() sharing = "1" if body.get("sharing_enabled") else "0" set_setting("custom_css", css) set_setting("site_name", name or "tinyweb") set_setting("sharing_enabled", sharing) return handle_style_form("Saved.") def handle_api_sites(): if get_setting("sharing_enabled", "0") != "1": return _json_response( {"error": "sharing disabled"}, status=403, headers={"Access-Control-Allow-Origin": "*"}, ) db = get_db() rows = db.execute("SELECT url, title, note FROM pages ORDER BY id DESC").fetchall() db.close() data = { "name": get_site_name(), "sites": [{"url": r["url"], "title": r["title"], "note": r["note"]} for r in rows], } return _json_response(data, headers={"Access-Control-Allow-Origin": "*"}) def handle_subscriptions(msg=""): db = get_db() subs = db.execute("SELECT * FROM subscriptions ORDER BY id DESC").fetchall() db.close() items = "" for s in subs: auto_label = "on" if s["auto_sync"] else "off" last = s["last_sync"] or "never" items += ( f'' f'{esc(s["name"] or "unknown")}
    {esc(s["url"])}' f'{esc(last)}' f'' f'
    ' f'
    ' f'' f'' f'browse ' f'
    ' f'
    ' f'
    ' f'
    ' f'' f'' ) table = "" if subs: table = ( f'' f'{items}
    instancelast syncauto-syncactions
    ' f'
    ' f'
    ' ) return _respond( f"

    subscriptions

    " f'
    ' f' ' f'' f'
    ' f'

    {msg}

    ' f'
    {table}' f'
    back' ) def handle_subscription_add(body): url = body.get("url", [""])[0].strip().rstrip("/") if not url or not url.startswith(("http://", "https://")): return handle_subscriptions("URL must start with http:// or https://") try: resp = requests.get(f"{url}/api/sites", timeout=5) if resp.status_code == 403: return handle_subscriptions("That instance has sharing disabled.") resp.raise_for_status() data = resp.json() name = data.get("name", "") except Exception as e: return handle_subscriptions(f"Could not reach that instance: {esc(str(e))}") db = get_db() try: db.execute( "INSERT INTO subscriptions (url, name) VALUES (?, ?) " "ON CONFLICT(url) DO UPDATE SET name=excluded.name", (url, name), ) db.commit() finally: db.close() return handle_subscriptions(f"Subscribed to {esc(name or url)}.") def handle_subscription_browse(sub_id): db = get_db() sub = db.execute("SELECT * FROM subscriptions WHERE id = ?", (sub_id,)).fetchone() if not sub: db.close() return _error(404) local_urls = set(r["url"] for r in db.execute("SELECT url FROM pages").fetchall()) db.close() try: resp = requests.get(f"{sub['url']}/api/sites", timeout=5) if resp.status_code == 403: return handle_subscriptions("That instance has sharing disabled.") resp.raise_for_status() sites = resp.json().get("sites", []) except Exception as e: return handle_subscriptions(f"Could not fetch sites: {esc(str(e))}") new_items = "" existing_items = "" new_count = 0 for s in sites: if s["url"] in local_urls: existing_items += ( f'
  • {esc(s["title"])} ' f'({esc(s["url"])}) — already indexed
  • ' ) else: new_count += 1 note_html = f' — {esc(s["note"])}' if s.get("note") else "" new_items += ( f'
  • ' ) buttons = "" if new_count: buttons = ' ' return _respond( f'

    browsing: {esc(sub["name"] or sub["url"])}

    ' f'

    {len(sites)} site(s) available, {new_count} new

    ' f'
    ' f'' f'' f'{buttons}' f'
    ' f'

    already indexed

    ' f'back' ) def handle_subscription_pick(body): sub_id = body.get("sub_id", [""])[0] import_all = body.get("import_all", [""])[0] if import_all: db = get_db() sub = db.execute("SELECT * FROM subscriptions WHERE id = ?", (sub_id,)).fetchone() local_urls = set(r["url"] for r in db.execute("SELECT url FROM pages").fetchall()) db.close() if not sub: return handle_subscriptions("Subscription not found.") try: resp = requests.get(f"{sub['url']}/api/sites", timeout=5) resp.raise_for_status() sites = resp.json().get("sites", []) except Exception as e: return handle_subscriptions(f"Error: {esc(str(e))}") urls = [s["url"] for s in sites if s["url"] not in local_urls] else: urls = body.get("urls", []) if not urls: return handle_subscriptions("No sites selected.") imported = 0 errors = 0 for url in urls: try: index_url(url) imported += 1 except Exception: errors += 1 return handle_subscriptions(f"Imported {imported} page(s). {errors} error(s).") def handle_subscription_sync(sub_id): db = get_db() sub = db.execute("SELECT * FROM subscriptions WHERE id = ?", (sub_id,)).fetchone() if not sub: db.close() return handle_subscriptions("Subscription not found.") try: resp = requests.get(f"{sub['url']}/api/sites", timeout=5) if resp.status_code == 403: db.close() return handle_subscriptions("That instance has sharing disabled.") resp.raise_for_status() data = resp.json() sites = data.get("sites", []) remote_name = data.get("name", sub["name"]) except Exception as e: db.close() return handle_subscriptions(f"Could not sync: {esc(str(e))}") local_urls = set(r["url"] for r in db.execute("SELECT url FROM pages").fetchall()) synced = 0 for s in sites: if s["url"] in local_urls: continue try: db.execute( "INSERT INTO pages (url, title, body, note) VALUES (?, ?, ?, ?)", (s["url"], s["title"], f"[synced from {remote_name}]", s.get("note", "")), ) synced += 1 except Exception: pass now = datetime.now().strftime("%Y-%m-%d %H:%M") db.execute("UPDATE subscriptions SET last_sync = ?, name = ? WHERE id = ?", (now, remote_name, sub_id)) db.commit() db.close() return handle_subscriptions(f"Synced {synced} new site(s) from {esc(remote_name)}.") def handle_subscription_autosync(sub_id): db = get_db() db.execute("UPDATE subscriptions SET auto_sync = 1 - auto_sync WHERE id = ?", (sub_id,)) db.commit() db.close() return _redirect("/subscriptions") def handle_subscription_delete(sub_id): db = get_db() db.execute("DELETE FROM subscriptions WHERE id = ?", (sub_id,)) db.commit() db.close() return _redirect("/subscriptions") def handle_subscription_syncall(): db = get_db() subs = db.execute("SELECT * FROM subscriptions WHERE auto_sync = 1").fetchall() db.close() if not subs: return handle_subscriptions("No subscriptions have auto-sync enabled.") total = 0 for sub in subs: try: resp = requests.get(f"{sub['url']}/api/sites", timeout=5) if resp.status_code != 200: continue data = resp.json() sites = data.get("sites", []) remote_name = data.get("name", sub["name"]) db = get_db() local_urls = set(r["url"] for r in db.execute("SELECT url FROM pages").fetchall()) for s in sites: if s["url"] in local_urls: continue try: db.execute( "INSERT INTO pages (url, title, body, note) VALUES (?, ?, ?, ?)", (s["url"], s["title"], f"[synced from {remote_name}]", s.get("note", "")), ) except Exception: pass now = datetime.now().strftime("%Y-%m-%d %H:%M") db.execute("UPDATE subscriptions SET last_sync = ?, name = ? WHERE id = ?", (now, remote_name, sub["id"])) db.commit() db.close() total += 1 except Exception: pass return handle_subscriptions(f"Synced {total} subscription(s).") # --- Dispatcher --- def dispatch_request(data): method = data.get("method", "GET") path = data.get("path", "/") query = data.get("query", {}) body = data.get("body", {}) gateway_host = data.get("gateway_host", "") def extract_id(prefix): try: return int(path[len(prefix):]) except (ValueError, IndexError): return None if method == "GET": if path == "/": return handle_search(query) elif path == "/add": return handle_add_form() elif path == "/pages": return handle_pages() elif path.startswith("/edit/"): pid = extract_id("/edit/") return handle_edit_form(pid) if pid is not None else _error(400) elif path.startswith("/delete/"): pid = extract_id("/delete/") return handle_delete(pid) if pid is not None else _error(400) elif path == "/bookmark": return handle_bookmark(query) elif path == "/style": return handle_style_form(gateway_host=gateway_host) elif path == "/export": return handle_export() elif path == "/import": return handle_import_form() elif path == "/api/sites": return handle_api_sites() elif path == "/subscriptions": return handle_subscriptions() elif path.startswith("/subscriptions/browse/"): sid = extract_id("/subscriptions/browse/") return handle_subscription_browse(sid) if sid is not None else _error(400) elif method == "POST": if path == "/add": return handle_add_submit(body) elif path.startswith("/edit/"): pid = extract_id("/edit/") return handle_edit_submit(pid, body) if pid is not None else _error(400) elif path == "/style": return handle_style_submit(body) elif path == "/import": return handle_import_submit(body) elif path == "/subscriptions/add": return handle_subscription_add(body) elif path == "/subscriptions/pick": return handle_subscription_pick(body) elif path.startswith("/subscriptions/sync/"): sid = extract_id("/subscriptions/sync/") return handle_subscription_sync(sid) if sid is not None else _error(400) elif path.startswith("/subscriptions/autosync/"): sid = extract_id("/subscriptions/autosync/") return handle_subscription_autosync(sid) if sid is not None else _error(400) elif path.startswith("/subscriptions/delete/"): sid = extract_id("/subscriptions/delete/") return handle_subscription_delete(sid) if sid is not None else _error(400) elif path == "/subscriptions/syncall": return handle_subscription_syncall() return _error(404)