import json import re import secrets import threading from datetime import datetime from urllib.parse import unquote from db import get_db, return_db, get_setting, set_setting, get_site_name, index_url, clean_url from templates import esc, wrap_page, DEFAULT_TEMPLATE from rns_client import fetch_remote_sites _request_local = threading.local() def _get_csrf_token(): return getattr(_request_local, 'csrf_token', '') def _csrf_field(): return f'' def _check_csrf(body): token = body.get("_csrf", [""])[0] expected = _get_csrf_token() if not expected or not token: return False return secrets.compare_digest(token, expected) _STOPWORDS = frozenset({ "a", "an", "the", "and", "or", "but", "is", "are", "was", "were", "in", "on", "at", "to", "for", "of", "with", "by", "from", "as", "into", "about", "how", "what", "which", "who", "where", "when", "do", "does", "did", "be", "been", "being", "have", "has", "had", "it", "its", "this", "that", "not", "no", "so", "if", "can", "will", "my", "your", "i", "me", "we", "you", "he", "she", "they", }) def _sanitize_fts_query(query): """Escape user input for safe use in FTS5 MATCH. Splits into individual quoted tokens joined by implicit AND, so all words must appear but in any order. Appends * to the last token for prefix matching. Stopwords are dropped to avoid overly strict matching. """ words = query.split() if not words: return '""' tokens = [] for i, w in enumerate(words): # Strip FTS5 special characters to prevent injection cleaned = re.sub(r'["\'\(\)\*\+\-\^~]', '', w).strip() if not cleaned: continue if cleaned.lower() in _STOPWORDS: continue if i == len(words) - 1: # Prefix match on the last token for partial word matching tokens.append(f"{cleaned}*") else: tokens.append(f'"{cleaned}"') return " ".join(tokens) if tokens else '""' def _get_bookmark_token(): token = get_setting("bookmark_token") if not token: token = secrets.token_hex(16) set_setting("bookmark_token", token) return token def _respond(body_html, status=200, use_default=False): return { "status": status, "content_type": "text/html; charset=utf-8", "body": wrap_page(body_html, use_default=use_default), "headers": {}, } def _redirect(location): if not location.startswith("/") or location.startswith("//"): location = "/" return { "status": 302, "content_type": "text/html; charset=utf-8", "body": "", "headers": {"Location": location}, } def _json_response(data, status=200, headers=None): return { "status": status, "content_type": "application/json", "body": json.dumps(data, indent=2), "headers": headers or {}, } def _text_response(text, status=200, headers=None): return { "status": status, "content_type": "text/plain", "body": text, "headers": headers or {}, } def _error(status): return _respond(f"
{" | ".join(parts)}
' # --- Tag helpers --- def _get_page_tags(page_id, db=None): close = False if db is None: db = get_db() close = True rows = db.execute( "SELECT t.name FROM tags t JOIN page_tags pt ON t.id = pt.tag_id " "WHERE pt.page_id = ? ORDER BY t.name", (page_id,) ).fetchall() if close: return_db(db) return [r["name"] for r in rows] def _set_page_tags(page_id, tag_string, db=None): close = False if db is None: db = get_db() close = True db.execute("DELETE FROM page_tags WHERE page_id = ?", (page_id,)) for name in (t.strip().lower() for t in tag_string.split(",") if t.strip()): db.execute("INSERT OR IGNORE INTO tags (name) VALUES (?)", (name,)) tag_id = db.execute("SELECT id FROM tags WHERE name = ?", (name,)).fetchone()["id"] db.execute("INSERT OR IGNORE INTO page_tags (page_id, tag_id) VALUES (?, ?)", (page_id, tag_id)) if close: db.commit() return_db(db) # --- Route handlers --- def handle_search(query): q = query.get("q", [""])[0].strip() page = _paginate(query) offset = (page - 1) * PER_PAGE db = get_db() try: count = db.execute("SELECT count(*) FROM pages").fetchone()[0] name = get_site_name() result_html = "" trusted_html = "" if q: # BM25 keyword search with column weights: title=10, body=1, url=5, note=3 try: fts_q = _sanitize_fts_query(q) bm25_rows = db.execute( "SELECT p.id, p.url, p.title, p.body, p.note " "FROM pages_fts f JOIN pages p ON f.rowid = p.id " "WHERE pages_fts MATCH ? " "ORDER BY bm25(pages_fts, 10.0, 1.0, 5.0, 3.0) LIMIT 100", (fts_q,), ).fetchall() except Exception: bm25_rows = [] # Hybrid search: merge BM25 + semantic via RRF bm25_ids = [r["id"] for r in bm25_rows] chunk_snippets = {} # page_id -> best chunk text if get_setting("semantic_search", "1") == "1": try: from embeddings import hybrid_search use_reranker = get_setting("use_reranker", "1") == "1" fused = hybrid_search(q, bm25_ids, limit=100, db=db, use_reranker=use_reranker) fused_ids = [pid for pid, _ in fused] chunk_snippets = {pid: text for pid, text in fused if text} except Exception: fused_ids = bm25_ids else: fused_ids = bm25_ids total_results = len(fused_ids) page_ids = fused_ids[offset:offset + PER_PAGE] if page_ids: # Fetch rows in fused order placeholders = ",".join("?" * len(page_ids)) all_rows = db.execute( f"SELECT id, url, title, body, note, summary FROM pages WHERE id IN ({placeholders})", page_ids, ).fetchall() row_map = {r["id"]: r for r in all_rows} rows = [row_map[pid] for pid in page_ids if pid in row_map] else: rows = [] if rows: for r in rows: note_html = "" if r["note"]: note_html = f'No results in your index.
" # search all linked pages from trusted sites words = q.lower().split() all_links = db.execute( "SELECT l.url, l.label, p.title AS source_title " "FROM links l JOIN pages p ON l.page_id = p.id", ).fetchall() indexed_urls = set(r["url"] for r in rows) if rows else set() seen = set() trusted = [] for l in all_links: if l["url"] in indexed_urls or l["url"] in seen: continue if any(w in l["label"].lower() for w in words): seen.add(l["url"]) trusted.append(l) if len(trusted) >= 20: break if trusted: items = "" for l in trusted: items += ( f'Subscribe to a friend's TinyWeb instance to sync their index
" f'" f"" f"{msg}
" f'back' ) return _respond( f"Add a site to your index
" f'" f"{msg}
" f'back' f'' ) def handle_add_submit(body): input_type = body.get("input_type", ["url"])[0] url = body.get("url", [""])[0].strip() reticulum_dest = body.get("reticulum_dest", [""])[0].strip().replace("<", "").replace(">", "") note = body.get("note", [""])[0].strip() tags = body.get("tags", [""])[0].strip() if input_type == "url": if not url: return handle_add_form("URL is required.") url = clean_url(url) if not url.startswith(("http://", "https://")): return handle_add_form("URL must start with http:// or https://") else: if not reticulum_dest: return handle_add_form("Reticulum destination hash is required.") if len(reticulum_dest) != 32 or not all(c in "0123456789abcdefABCDEF" for c in reticulum_dest): return handle_add_form("Invalid reticulum destination hash. Must be 32 hex characters.") url = f"reticulum:{reticulum_dest}" try: title = index_url(url, note, reticulum_dest if reticulum_dest else "") if tags: db = get_db() try: row = db.execute("SELECT id FROM pages WHERE url = ?", (url,)).fetchone() if row: _set_page_tags(row["id"], tags, db) db.commit() finally: return_db(db) return handle_add_form(f'Indexed: {esc(url)}') except ValueError as e: return handle_add_form(f"Error: {esc(str(e))}") except Exception as e: error_msg = str(e).lower() # Check if it's a block response if "block" in error_msg or "cloudflare" in error_msg or "403" in error_msg: # Show manual entry form for blocked sites return _respond( f"{esc(url)} blocks automated access. " f"You can still save it manually:
" f'" f'back' ) return handle_add_form(f"Error: could not fetch or index that URL. {esc(str(e)[:100])}") def handle_add_manual_submit(body): url = clean_url(body.get("url", [""])[0].strip()) note = body.get("note", [""])[0].strip() tags = body.get("tags", [""])[0].strip() manual_title = body.get("manual_title", [""])[0].strip() manual_desc = body.get("manual_description", [""])[0].strip() if not url: return handle_add_form("URL is required.") if not manual_title or not manual_desc: return handle_add_form("Title and description are required for manual entry.") db = get_db() try: now = __import__("datetime").datetime.now().strftime("%Y-%m-%dT%H:%M:%S") db.execute( "INSERT INTO pages (url, title, body, note, last_modified, summary) VALUES (?, ?, ?, ?, ?, ?) " "ON CONFLICT(url) DO UPDATE SET title=excluded.title, body=excluded.body, " "note=excluded.note, last_modified=excluded.last_modified, summary=excluded.summary", (url, manual_title, manual_desc, note, now, manual_desc[:200]), ) # Get the page ID page_id = db.execute("SELECT id FROM pages WHERE url = ?", (url,)).fetchone()[0] # Add tags if provided if tags: _set_page_tags(page_id, tags, db) db.commit() # Generate embeddings for this page (if semantic search is enabled) if get_setting("semantic_search", "1") == "1": try: from embeddings import store_embeddings # Pass the page_id, title, description, and db connection store_embeddings(page_id, manual_title, manual_desc, db) db.commit() except Exception as e: # Log error but don't fail the whole operation print(f"Error generating embeddings: {e}") return handle_add_form(f'Added manually: {esc(manual_title)}') finally: return_db(db) def handle_pages(query=None): msg = query.get("msg", [""])[0] if query else "" msg_html = f'{esc(msg)}
' if msg else "" page = _paginate(query or {}) offset = (page - 1) * BROWSE_PER_PAGE db = get_db() try: total = db.execute("SELECT count(*) FROM pages").fetchone()[0] rows = db.execute( "SELECT id, url, title, note FROM pages ORDER BY id DESC LIMIT ? OFFSET ?", (BROWSE_PER_PAGE, offset), ).fetchall() items = "" for r in rows: note_html = f' — {esc(r["note"])}' if r["note"] else "" tags = _get_page_tags(r["id"], db) tags_html = "" if tags: tag_links = " ".join(f'[{esc(t)}]' for t in tags) tags_html = f' {tag_links}' items += ( f'{esc(row['title'])}
"
f"{esc(row['url'])}
{msg}
" f'back' ) def handle_edit_submit(page_id, body): title = body.get("title", [""])[0].strip() summary = body.get("summary", [""])[0].strip() note = body.get("note", [""])[0].strip() tags = body.get("tags", [""])[0].strip() db = get_db() try: db.execute( "UPDATE pages SET title = ?, summary = ?, note = ? WHERE id = ?", (title, summary, note, page_id) ) _set_page_tags(page_id, tags, db) db.commit() finally: return_db(db) return _redirect("/pages") def handle_delete_confirm(page_id): db = get_db() try: row = db.execute("SELECT id, url, title FROM pages WHERE id = ?", (page_id,)).fetchone() finally: return_db(db) if not row: return _error(404) return _respond( f"Remove {esc(row['title'])}
"
f"{esc(row['url'])}
Paste the contents of a tinyweb export file (JSON).
" f'" f"{msg}
" f'back' ) def handle_import_submit(body): raw = body.get("data", [""])[0].strip() if not raw: return handle_import_form("Paste JSON data.") try: data = json.loads(raw) except json.JSONDecodeError: return handle_import_form("Invalid JSON.") if not isinstance(data, list): return handle_import_form("Expected a JSON array.") MAX_IMPORT = 100 if len(data) > MAX_IMPORT: return handle_import_form(f"Too many entries. Maximum is {MAX_IMPORT}.") imported = 0 errors = 0 for entry in data: url = entry.get("url", "").strip() note = entry.get("note", "").strip() if not url: continue try: index_url(url, note) imported += 1 except Exception: errors += 1 return handle_import_form(f"Imported {imported} page(s). {errors} error(s).") def handle_style_form(msg=""): template = get_setting("custom_template") or DEFAULT_TEMPLATE name = get_site_name() sharing = get_setting("sharing_enabled", "0") checked = " checked" if sharing == "1" else "" semantic = get_setting("semantic_search", "1") semantic_checked = " checked" if semantic == "1" else "" reranker = get_setting("use_reranker", "1") reranker_checked = " checked" if reranker == "1" else "" disabled = "" if semantic == "1" else " disabled" dimmed = ' style="opacity:0.4"' if semantic != "1" else "" return _respond( f"Drag this link to your bookmarks bar. Click it on any page to index it instantly.
" f'' f"{msg}
" f'back', use_default=True, ) def handle_style_submit(body): template = body.get("template", [""])[0].replace("\r\n", "\n").replace("\r", "\n") name = body.get("site_name", ["tinyweb"])[0].strip() sharing = "1" if body.get("sharing_enabled") else "0" semantic = "1" if body.get("semantic_search") else "0" reranker = "1" if body.get("use_reranker") else "0" set_setting("custom_template", template if template.strip() != DEFAULT_TEMPLATE.strip() else "") set_setting("site_name", name or "tinyweb") set_setting("sharing_enabled", sharing) set_setting("semantic_search", semantic) set_setting("use_reranker", reranker) return handle_style_form("Saved.") def handle_about(): name = get_site_name() dest_hash = get_setting("dest_hash") sharing = get_setting("sharing_enabled", "0") == "1" db = get_db() try: page_count = db.execute("SELECT count(*) FROM pages").fetchone()[0] tag_count = db.execute("SELECT count(DISTINCT tag_id) FROM page_tags").fetchone()[0] sub_count = db.execute("SELECT count(*) FROM subscriptions").fetchone()[0] finally: return_db(db) sharing_html = ( 'This instance shares its index publicly. Subscribe to join the network.
' if sharing else 'This instance is private.
' ) hash_html = "" if dest_hash: hash_html = ( f'To subscribe to this instance, add this destination hash in your TinyWeb:
' f'{esc(dest_hash)}'
)
return _respond(
f'A personal search engine, built for the slow web.
' f'TinyWeb is about taking back the internet. No algorithms, no ads, no tracking. ' f'Just human-curated pages shared freely across a mesh network.
' f'The slow web is a movement for intentionality over speed, ' f'human curation over algorithmic feeds, privacy over surveillance, ' f'and community over corporations. Every page in this index was saved by a person ' f'because they found it valuable — not because an algorithm told them to click.
' f'No tags yet. Add tags when saving or editing pages.
" f'back' ) def handle_tag_browse(tag_name, query=None): page = _paginate(query or {}) offset = (page - 1) * BROWSE_PER_PAGE db = get_db() try: total = db.execute( "SELECT count(*) FROM page_tags pt JOIN tags t ON t.id = pt.tag_id WHERE t.name = ?", (tag_name,), ).fetchone()[0] rows = db.execute( "SELECT p.id, p.url, p.title, p.note FROM pages p " "JOIN page_tags pt ON p.id = pt.page_id " "JOIN tags t ON t.id = pt.tag_id " "WHERE t.name = ? ORDER BY p.id DESC LIMIT ? OFFSET ?", (tag_name, BROWSE_PER_PAGE, offset), ).fetchall() items = "" for r in rows: note_html = f' — {esc(r["note"])}' if r["note"] else "" tags = _get_page_tags(r["id"], db) tag_links = " ".join(f'[{esc(t)}]' for t in tags) items += ( f'{total} page(s)
' f'{msg}
' f'{len(sites)} site(s) available, {new_count} new
' f'' f'Semantic search is disabled. Enable it in settings to use embeddings.
" f'' ) db = get_db() try: total_pages = db.execute("SELECT count(*) FROM pages").fetchone()[0] pages_with_chunks = db.execute( "SELECT count(DISTINCT page_id) FROM chunks WHERE page_id IS NOT NULL" ).fetchone()[0] finally: return_db(db) progress = get_setting("reindex_progress", "") status_html = "" if progress: status_html = f'' elif _reindex_thread and _reindex_thread.is_alive(): status_html = '' return _respond( f"{pages_with_chunks} of {total_pages} pages have embeddings.
" f'{status_html}' f'' f'' ) def handle_reindex_submit(body): global _reindex_thread if _reindex_thread and _reindex_thread.is_alive(): return handle_reindex_form() def _run(): try: from embeddings import reindex_all def progress(current, total): set_setting("reindex_progress", f"{current}/{total}") reindex_all(progress_callback=progress) except Exception: pass finally: set_setting("reindex_progress", "") _reindex_thread = threading.Thread(target=_run, daemon=True) _reindex_thread.start() return _redirect("/reindex") # --- Dispatcher --- def _dispatch_inner(data): method = data.get("method", "GET") path = data.get("path", "/") query = data.get("query", {}) body = data.get("body", {}) gateway_host = data.get("gateway_host", "") def extract_id(prefix): try: return int(path[len(prefix):]) except (ValueError, IndexError): return None if method == "GET": if path == "/": return handle_search(query) elif path == "/add": action_type = query.get("type", ["index"])[0] return handle_add_form(action_type=action_type if action_type == "subscribe" else "index") elif path == "/pages": return handle_pages(query) elif path.startswith("/edit/"): pid = extract_id("/edit/") return handle_edit_form(pid) if pid is not None else _error(400) elif path.startswith("/delete/"): pid = extract_id("/delete/") return handle_delete_confirm(pid) if pid is not None else _error(400) elif path == "/bookmark": return handle_bookmark(query) elif path == "/style": return handle_style_form() elif path == "/about": return handle_about() elif path == "/export": return handle_export() elif path == "/import": return handle_import_form() elif path == "/tags": return handle_tags() elif path.startswith("/tags/"): tag_name = unquote(path[len("/tags/"):]) return handle_tag_browse(tag_name, query) if tag_name else _error(400) elif path == "/reindex": return handle_reindex_form() elif path == "/api/sites": return handle_api_sites(query) elif path == "/subscriptions": return handle_subscriptions() elif path.startswith("/subscriptions/browse/"): sid = extract_id("/subscriptions/browse/") return handle_subscription_browse(sid) if sid is not None else _error(400) elif method == "POST": if not _check_csrf(body): return _respond("Invalid or missing CSRF token.
", status=403) if path == "/add": return handle_add_submit(body) elif path == "/add/manual": return handle_add_manual_submit(body) elif path.startswith("/edit/"): pid = extract_id("/edit/") return handle_edit_submit(pid, body) if pid is not None else _error(400) elif path.startswith("/delete/"): pid = extract_id("/delete/") return handle_delete(pid) if pid is not None else _error(400) elif path == "/style": return handle_style_submit(body) elif path == "/style/reset": set_setting("custom_template", "") return handle_style_form("Template reset to default.") elif path == "/import": return handle_import_submit(body) elif path == "/reindex": return handle_reindex_submit(body) elif path == "/subscriptions/add": return handle_subscription_add(body) elif path == "/subscriptions/pick": return handle_subscription_pick(body) elif path.startswith("/subscriptions/sync/"): sid = extract_id("/subscriptions/sync/") return handle_subscription_sync(sid) if sid is not None else _error(400) elif path.startswith("/subscriptions/autosync/"): sid = extract_id("/subscriptions/autosync/") return handle_subscription_autosync(sid) if sid is not None else _error(400) elif path.startswith("/subscriptions/delete/"): sid = extract_id("/subscriptions/delete/") return handle_subscription_delete(sid) if sid is not None else _error(400) elif path == "/subscriptions/syncall": return handle_subscription_syncall() return _error(404) def dispatch_request(data): cookies = data.get("cookies", {}) csrf_token = cookies.get("_csrf", "") if not csrf_token: csrf_token = secrets.token_hex(32) _request_local.csrf_token = csrf_token resp = _dispatch_inner(data) resp.setdefault("headers", {}) resp["headers"]["Set-Cookie"] = f"_csrf={csrf_token}; SameSite=Strict; HttpOnly; Path=/" resp["headers"]["X-Frame-Options"] = "DENY" resp["headers"]["X-Content-Type-Options"] = "nosniff" if resp.get("content_type", "").startswith("text/html"): resp["headers"]["Content-Security-Policy"] = ( "default-src 'self'; " "script-src 'self' 'unsafe-inline'; " "style-src 'self' 'unsafe-inline' https://fonts.googleapis.com; " "font-src 'self' https://fonts.gstatic.com; " "img-src * data:; " "frame-ancestors 'none'; " "form-action 'self'; " "base-uri 'self'" ) return resp