import json
import secrets
import threading
from datetime import datetime
from urllib.parse import unquote
from db import get_db, get_setting, set_setting, get_site_name, index_url, clean_url
from templates import esc, snippet, wrap_page, DEFAULT_TEMPLATE
from rns_client import fetch_remote_sites
_request_local = threading.local()
def _get_csrf_token():
return getattr(_request_local, 'csrf_token', '')
def _csrf_field():
return f''
def _check_csrf(body):
token = body.get("_csrf", [""])[0]
expected = _get_csrf_token()
if not expected or not token:
return False
return secrets.compare_digest(token, expected)
def _sanitize_fts_query(query):
"""Escape user input for safe use in FTS5 MATCH."""
escaped = query.replace('"', '""')
return f'"{escaped}"'
def _get_bookmark_token():
token = get_setting("bookmark_token")
if not token:
token = secrets.token_hex(16)
set_setting("bookmark_token", token)
return token
def _respond(body_html, status=200, use_default=False):
return {
"status": status,
"content_type": "text/html; charset=utf-8",
"body": wrap_page(body_html, use_default=use_default),
"headers": {},
}
def _redirect(location):
if not location.startswith("/") or location.startswith("//"):
location = "/"
return {
"status": 302,
"content_type": "text/html; charset=utf-8",
"body": "",
"headers": {"Location": location},
}
def _json_response(data, status=200, headers=None):
return {
"status": status,
"content_type": "application/json",
"body": json.dumps(data, indent=2),
"headers": headers or {},
}
def _text_response(text, status=200, headers=None):
return {
"status": status,
"content_type": "text/plain",
"body": text,
"headers": headers or {},
}
def _error(status):
return _respond(f"
{status}
", status)
# --- Tag helpers ---
def _get_page_tags(page_id, db=None):
close = False
if db is None:
db = get_db()
close = True
rows = db.execute(
"SELECT t.name FROM tags t JOIN page_tags pt ON t.id = pt.tag_id "
"WHERE pt.page_id = ? ORDER BY t.name", (page_id,)
).fetchall()
if close:
db.close()
return [r["name"] for r in rows]
def _set_page_tags(page_id, tag_string, db=None):
close = False
if db is None:
db = get_db()
close = True
db.execute("DELETE FROM page_tags WHERE page_id = ?", (page_id,))
for name in (t.strip().lower() for t in tag_string.split(",") if t.strip()):
db.execute("INSERT OR IGNORE INTO tags (name) VALUES (?)", (name,))
tag_id = db.execute("SELECT id FROM tags WHERE name = ?", (name,)).fetchone()["id"]
db.execute("INSERT OR IGNORE INTO page_tags (page_id, tag_id) VALUES (?, ?)", (page_id, tag_id))
if close:
db.commit()
db.close()
# --- Route handlers ---
def handle_search(query):
q = query.get("q", [""])[0].strip()
db = get_db()
try:
count = db.execute("SELECT count(*) FROM pages").fetchone()[0]
name = get_site_name()
result_html = ""
trusted_html = ""
if q:
try:
rows = db.execute(
"SELECT p.id, p.url, p.title, p.body, p.note "
"FROM pages_fts f JOIN pages p ON f.rowid = p.id "
"WHERE pages_fts MATCH ? ORDER BY rank LIMIT 50",
(_sanitize_fts_query(q),),
).fetchall()
except Exception:
rows = []
if rows:
for r in rows:
note_html = ""
if r["note"]:
note_html = f'
{esc(r["note"])}
'
tags = _get_page_tags(r["id"], db)
tags_html = ""
if tags:
tag_links = " ".join(f'[{esc(t)}]' for t in tags)
tags_html = f'
{tag_links}
'
result_html += (
f'
'
f'{esc(r["title"])} '
f'{esc(r["url"])} '
f'{esc(snippet(r["body"], q))}'
f'{note_html}{tags_html}'
f'
'
)
else:
result_html = "
No results in your index.
"
# search all linked pages from trusted sites
words = q.lower().split()
all_links = db.execute(
"SELECT l.url, l.label, p.title AS source_title "
"FROM links l JOIN pages p ON l.page_id = p.id",
).fetchall()
indexed_urls = set(r["url"] for r in rows) if rows else set()
seen = set()
trusted = []
for l in all_links:
if l["url"] in indexed_urls or l["url"] in seen:
continue
if any(w in l["label"].lower() for w in words):
seen.add(l["url"])
trusted.append(l)
if len(trusted) >= 20:
break
if trusted:
items = ""
for l in trusted:
items += (
f'
'
)
trusted_html = (
f''
f'from your trusted sites ({len(trusted)})'
f'
{items}
'
f''
)
# search synced pages from subscriptions
try:
remote_rows = db.execute(
"SELECT rp.url, rp.title, rp.note, s.name AS source_name "
"FROM remote_pages_fts rpf "
"JOIN remote_pages rp ON rpf.rowid = rp.id "
"JOIN subscriptions s ON rp.subscription_id = s.id "
"WHERE remote_pages_fts MATCH ? ORDER BY rank LIMIT 50",
(_sanitize_fts_query(q),),
).fetchall()
except Exception:
remote_rows = []
remote_html = ""
if q and remote_rows:
# group by source
by_source = {}
for r in remote_rows:
source = r["source_name"] or "unknown"
by_source.setdefault(source, []).append(r)
for source, items in by_source.items():
source_items = ""
for r in items:
note_html = f' — {esc(r["note"])}' if r["note"] else ""
source_items += (
f'
"
f'back'
)
def handle_add_submit(body):
url = clean_url(body.get("url", [""])[0].strip())
note = body.get("note", [""])[0].strip()
tags = body.get("tags", [""])[0].strip()
if not url:
return handle_add_form("URL is required.")
if not url.startswith(("http://", "https://")):
return handle_add_form("URL must start with http:// or https://")
try:
title = index_url(url, note)
if tags:
db = get_db()
try:
row = db.execute("SELECT id FROM pages WHERE url = ?", (url,)).fetchone()
if row:
_set_page_tags(row["id"], tags, db)
db.commit()
finally:
db.close()
return handle_add_form(f'Indexed: {esc(title)}')
except ValueError as e:
return handle_add_form(f"Error: {esc(str(e))}")
except Exception:
return handle_add_form("Error: could not fetch or index that URL.")
def handle_pages():
db = get_db()
try:
rows = db.execute("SELECT id, url, title, note FROM pages ORDER BY id DESC").fetchall()
items = ""
for r in rows:
note_html = f' — {esc(r["note"])}' if r["note"] else ""
tags = _get_page_tags(r["id"], db)
tags_html = ""
if tags:
tag_links = " ".join(f'[{esc(t)}]' for t in tags)
tags_html = f' {tag_links}'
items += (
f'
"
f'back',
use_default=True,
)
def handle_style_submit(body):
template = body.get("template", [""])[0]
name = body.get("site_name", ["tinyweb"])[0].strip()
sharing = "1" if body.get("sharing_enabled") else "0"
set_setting("custom_template", template if template.strip() != DEFAULT_TEMPLATE.strip() else "")
set_setting("site_name", name or "tinyweb")
set_setting("sharing_enabled", sharing)
return handle_style_form("Saved.")
def handle_about():
name = get_site_name()
dest_hash = get_setting("dest_hash")
sharing = get_setting("sharing_enabled", "0") == "1"
db = get_db()
try:
page_count = db.execute("SELECT count(*) FROM pages").fetchone()[0]
tag_count = db.execute("SELECT count(DISTINCT tag_id) FROM page_tags").fetchone()[0]
sub_count = db.execute("SELECT count(*) FROM subscriptions").fetchone()[0]
finally:
db.close()
sharing_html = (
'
This instance shares its index publicly. Subscribe to join the network.
'
if sharing else
'
This instance is private.
'
)
hash_html = ""
if dest_hash:
hash_html = (
f'
subscribe
'
f'
To subscribe to this instance, add this destination hash in your TinyWeb:
'
f'
{esc(dest_hash)}
'
)
return _respond(
f'
{esc(name)}
'
f'
A personal search engine, built for the slow web.
'
f'
TinyWeb is about taking back the internet. No algorithms, no ads, no tracking. '
f'Just human-curated pages shared freely across a mesh network.
'
f'
'
f'
{page_count} page(s) indexed
'
f'
{tag_count} tag(s)
'
f'
{sub_count} subscription(s)
'
f'
'
f'{sharing_html}'
f'{hash_html}'
f'
what is the slow web?
'
f'
The slow web is a movement for intentionality over speed, '
f'human curation over algorithmic feeds, privacy over surveillance, '
f'and community over corporations. Every page in this index was saved by a person '
f'because they found it valuable — not because an algorithm told them to click.
'
f'
how it works
'
f'
'
f'
Save pages you find valuable with the bookmarklet or /add
'
f'
Search your personal index — queries never leave your machine
'
f'
Subscribe to friends over Reticulum — encrypted, decentralized, works without the internet
'
f'
Tag and organize your collection into curated lists
'
)
def handle_tags():
db = get_db()
try:
rows = db.execute(
"SELECT t.name, COUNT(pt.page_id) AS cnt FROM tags t "
"JOIN page_tags pt ON t.id = pt.tag_id "
"GROUP BY t.id ORDER BY t.name"
).fetchall()
finally:
db.close()
items = ""
for r in rows:
items += f'
No tags yet. Add tags when saving or editing pages.
"
f'back'
)
def handle_tag_browse(tag_name):
db = get_db()
try:
rows = db.execute(
"SELECT p.id, p.url, p.title, p.note FROM pages p "
"JOIN page_tags pt ON p.id = pt.page_id "
"JOIN tags t ON t.id = pt.tag_id "
"WHERE t.name = ? ORDER BY p.id DESC",
(tag_name,),
).fetchall()
items = ""
for r in rows:
note_html = f' — {esc(r["note"])}' if r["note"] else ""
tags = _get_page_tags(r["id"], db)
tag_links = " ".join(f'[{esc(t)}]' for t in tags)
items += (
f'
'
f'{table}'
f' back'
)
def handle_subscription_add(body):
dest_hash = body.get("dest_hash", [""])[0].strip().replace("<", "").replace(">", "")
if not dest_hash or len(dest_hash) != 32:
return handle_subscriptions("Enter a valid 32-character destination hash.")
try:
int(dest_hash, 16)
except ValueError:
return handle_subscriptions("Invalid destination hash (must be hex).")
try:
data = fetch_remote_sites(dest_hash)
name = data.get("name", "")
except PermissionError:
return handle_subscriptions("That instance has sharing disabled.")
except Exception:
return handle_subscriptions("Could not reach that instance.")
db = get_db()
try:
db.execute(
"INSERT INTO subscriptions (dest_hash, name) VALUES (?, ?) "
"ON CONFLICT(dest_hash) DO UPDATE SET name=excluded.name",
(dest_hash, name),
)
db.commit()
finally:
db.close()
return handle_subscriptions(f"Subscribed to {esc(name or dest_hash)}.")
def handle_subscription_browse(sub_id):
db = get_db()
try:
sub = db.execute("SELECT * FROM subscriptions WHERE id = ?", (sub_id,)).fetchone()
if not sub:
return _error(404)
local_urls = set(r["url"] for r in db.execute("SELECT url FROM pages").fetchall())
# Use locally synced data if available, otherwise fetch live
remote_rows = db.execute(
"SELECT url, title, note, tags FROM remote_pages WHERE subscription_id = ?",
(sub_id,),
).fetchall()
finally:
db.close()
if remote_rows:
sites = []
for r in remote_rows:
tags = [t for t in r["tags"].split(",") if t] if r["tags"] else []
sites.append({"url": r["url"], "title": r["title"], "note": r["note"], "tags": tags})
else:
try:
data = fetch_remote_sites(sub["dest_hash"])
sites = data.get("sites", [])
except PermissionError:
return handle_subscriptions("That instance has sharing disabled.")
except Exception:
return handle_subscriptions("Could not fetch sites from that instance.")
new_items = ""
existing_items = ""
new_count = 0
for s in sites:
if s["url"] in local_urls:
existing_items += (
f'