tinyweb/app.py
Derick Phan c30fccb7a1
Bind to 0.0.0.0 and use dynamic Host header for bookmarklet
Makes the server accessible from other devices on the network
instead of only localhost. The bookmarklet now uses the Host header
from the request so it works regardless of how the server is accessed.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-24 20:45:14 -07:00

524 lines
19 KiB
Python

import json
import sqlite3
import html
import requests
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse, urljoin
from bs4 import BeautifulSoup
DATABASE = "index.db"
def get_db():
db = sqlite3.connect(DATABASE)
db.row_factory = sqlite3.Row
return db
def init_db():
db = sqlite3.connect(DATABASE)
db.execute(
"CREATE TABLE IF NOT EXISTS pages ("
" id INTEGER PRIMARY KEY AUTOINCREMENT,"
" url TEXT UNIQUE NOT NULL,"
" title TEXT,"
" body TEXT,"
" note TEXT DEFAULT ''"
")"
)
db.execute(
"CREATE VIRTUAL TABLE IF NOT EXISTS pages_fts "
"USING fts5(title, body, url, note, content=pages, content_rowid=id)"
)
db.execute(
"CREATE TABLE IF NOT EXISTS links ("
" id INTEGER PRIMARY KEY AUTOINCREMENT,"
" page_id INTEGER NOT NULL,"
" url TEXT NOT NULL,"
" label TEXT,"
" FOREIGN KEY (page_id) REFERENCES pages(id) ON DELETE CASCADE"
")"
)
db.execute(
"CREATE TABLE IF NOT EXISTS settings ("
" key TEXT PRIMARY KEY,"
" value TEXT"
")"
)
db.executescript("""
CREATE TRIGGER IF NOT EXISTS pages_ai AFTER INSERT ON pages BEGIN
INSERT INTO pages_fts(rowid, title, body, url, note)
VALUES (new.id, new.title, new.body, new.url, new.note);
END;
CREATE TRIGGER IF NOT EXISTS pages_ad AFTER DELETE ON pages BEGIN
INSERT INTO pages_fts(pages_fts, rowid, title, body, url, note)
VALUES ('delete', old.id, old.title, old.body, old.url, old.note);
END;
CREATE TRIGGER IF NOT EXISTS pages_au AFTER UPDATE ON pages BEGIN
INSERT INTO pages_fts(pages_fts, rowid, title, body, url, note)
VALUES ('delete', old.id, old.title, old.body, old.url, old.note);
INSERT INTO pages_fts(rowid, title, body, url, note)
VALUES (new.id, new.title, new.body, new.url, new.note);
END;
""")
db.commit()
db.close()
SKIP_EXT = (".png", ".jpg", ".jpeg", ".gif", ".svg", ".pdf", ".zip", ".mp3", ".mp4", ".css", ".js", ".ico", ".xml", ".json")
def fetch_page(url):
resp = requests.get(url, timeout=10, headers={"User-Agent": "TinyWeb/1.0"}, verify=False)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
# extract links before stripping tags
domain = urlparse(url).netloc
seen = set()
links = []
for a in soup.find_all("a", href=True):
href = urljoin(url, a["href"]).split("#")[0]
parsed = urlparse(href)
if parsed.netloc != domain:
continue
if any(href.lower().endswith(ext) for ext in SKIP_EXT):
continue
if parsed.query or "action=" in href:
continue
path = parsed.path.lower()
if any(s in path for s in ("/special:", "/talk:", "/user:", "/wikipedia:", "/help:", "/portal:", "/file:", "/category:")):
continue
if href in seen or href == url:
continue
seen.add(href)
label = a.get_text(strip=True) or href
links.append((href, label[:200]))
for tag in soup(["script", "style", "nav", "footer", "header"]):
tag.decompose()
title = soup.title.string.strip() if soup.title and soup.title.string else url
body = soup.get_text(separator=" ", strip=True)
return title, body, links
def snippet(text, query, ctx=80):
pos = text.lower().find(query.lower())
if pos == -1:
return text[:200]
start = max(0, pos - ctx)
end = min(len(text), pos + len(query) + ctx)
return ("..." if start > 0 else "") + text[start:end] + ("..." if end < len(text) else "")
def esc(s):
return html.escape(str(s))
def get_setting(key, default=""):
db = get_db()
row = db.execute("SELECT value FROM settings WHERE key = ?", (key,)).fetchone()
db.close()
return row["value"] if row else default
def set_setting(key, value):
db = get_db()
db.execute(
"INSERT INTO settings (key, value) VALUES (?, ?) "
"ON CONFLICT(key) DO UPDATE SET value=excluded.value",
(key, value),
)
db.commit()
db.close()
def get_site_name():
return get_setting("site_name", "tinyweb")
def wrap_page(body_html):
css = get_setting("custom_css")
style = f"<style>{css}</style>" if css else ""
return f"<html><head>{style}</head><body>{body_html}</body></html>"
class Handler(BaseHTTPRequestHandler):
def respond(self, body, status=200):
self.send_response(status)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.end_headers()
self.wfile.write(wrap_page(body).encode())
def do_GET(self):
parsed = urlparse(self.path)
path = parsed.path
params = parse_qs(parsed.query)
if path == "/":
self.handle_search(params)
elif path == "/add":
self.handle_add_form()
elif path == "/pages":
self.handle_pages()
elif path.startswith("/delete/"):
self.handle_delete(path)
elif path.startswith("/edit/"):
self.handle_edit_form(path)
elif path == "/style":
self.handle_style_form()
elif path == "/bookmark":
self.handle_bookmark(params)
elif path == "/export":
self.handle_export()
elif path == "/import":
self.handle_import_form()
else:
self.respond("<h1>404</h1>", 404)
def do_POST(self):
length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(length).decode()
params = parse_qs(body)
if self.path == "/add":
self.handle_add_submit(params)
elif self.path.startswith("/edit/"):
self.handle_edit_submit(self.path, params)
elif self.path == "/style":
self.handle_style_submit(params)
elif self.path == "/import":
self.handle_import_submit(params)
else:
self.respond("<h1>404</h1>", 404)
def handle_search(self, params):
q = params.get("q", [""])[0].strip()
db = get_db()
count = db.execute("SELECT count(*) FROM pages").fetchone()[0]
name = get_site_name()
result_html = ""
trusted_html = ""
if q:
rows = db.execute(
"SELECT p.id, p.url, p.title, p.body, p.note "
"FROM pages_fts f JOIN pages p ON f.rowid = p.id "
"WHERE pages_fts MATCH ? ORDER BY rank LIMIT 50",
(q,),
).fetchall()
if rows:
for r in rows:
note_html = ""
if r["note"]:
note_html = f'<div class="note"><em>{esc(r["note"])}</em></div>'
result_html += (
f'<div class="result">'
f'<a href="{esc(r["url"])}">{esc(r["title"])}</a><br>'
f'<small>{esc(r["url"])}</small><br>'
f'{esc(snippet(r["body"], q))}'
f'{note_html}'
f'</div>'
)
else:
result_html = "<p>No results in your index.</p>"
# search all linked pages from trusted sites
words = q.lower().split()
all_links = db.execute(
"SELECT l.url, l.label, p.title AS source_title "
"FROM links l JOIN pages p ON l.page_id = p.id",
).fetchall()
indexed_urls = set(r["url"] for r in rows) if rows else set()
seen = set()
trusted = []
for l in all_links:
if l["url"] in indexed_urls or l["url"] in seen:
continue
if any(w in l["label"].lower() for w in words):
seen.add(l["url"])
trusted.append(l)
if len(trusted) >= 20:
break
if trusted:
items = ""
for l in trusted:
items += (
f'<li><a href="{esc(l["url"])}">{esc(l["label"])}</a> '
f'<small>— from {esc(l["source_title"])}</small></li>'
)
trusted_html = (
f'<details class="trusted">'
f'<summary>from your trusted sites ({len(trusted)})</summary>'
f'<ul>{items}</ul>'
f'</details>'
)
db.close()
self.respond(
f'<h1><a href="/">{esc(name)}</a></h1>'
f'<form method="get" action="/">'
f'<input name="q" value="{esc(q)}" placeholder="search your index" size="40">'
f' <button type="submit">search</button>'
f'</form>'
f'<p>{count} page(s) indexed.'
f' <a href="/add">+ add url</a>'
f' | <a href="/pages">browse</a>'
f' | <a href="/style">customize</a></p>'
f'<hr>{result_html}{trusted_html}'
)
def handle_add_form(self, msg=""):
self.respond(
f"<h1>add url</h1>"
f'<form method="post" action="/add">'
f'<input name="url" placeholder="https://example.com" size="50"><br><br>'
f'<input name="note" placeholder="why are you saving this? (optional)" size="50"><br><br>'
f'<button type="submit">index</button>'
f"</form>"
f"<p>{msg}</p>"
f'<a href="/">back</a>'
)
def handle_add_submit(self, params):
url = params.get("url", [""])[0].strip()
note = params.get("note", [""])[0].strip()
if not url:
return self.handle_add_form("URL is required.")
if not url.startswith(("http://", "https://")):
return self.handle_add_form("URL must start with http:// or https://")
try:
title, body, links = fetch_page(url)
db = get_db()
cur = db.execute(
"INSERT INTO pages (url, title, body, note) VALUES (?, ?, ?, ?) "
"ON CONFLICT(url) DO UPDATE SET title=excluded.title, body=excluded.body, note=excluded.note",
(url, title, body, note),
)
page_id = cur.lastrowid
db.execute("DELETE FROM links WHERE page_id = ?", (page_id,))
for href, label in links:
db.execute(
"INSERT INTO links (page_id, url, label) VALUES (?, ?, ?)",
(page_id, href, label),
)
db.commit()
db.close()
self.handle_add_form(f'Indexed: <a href="{esc(url)}">{esc(title)}</a>')
except Exception as e:
self.handle_add_form(f"Error: {esc(str(e))}")
def handle_pages(self):
db = get_db()
rows = db.execute("SELECT id, url, title, note FROM pages ORDER BY id DESC").fetchall()
db.close()
items = ""
for r in rows:
note_html = f' — <em>{esc(r["note"])}</em>' if r["note"] else ""
items += (
f'<li>{esc(r["title"])}{note_html} '
f'<small>(<a href="{esc(r["url"])}">{esc(r["url"])}</a>)</small> '
f'<a href="/edit/{r["id"]}">edit</a> '
f'<a href="/delete/{r["id"]}">remove</a></li>'
)
self.respond(
f"<h1>indexed pages ({len(rows)})</h1>"
f"<ul>{items}</ul>"
f'<p><a href="/export">export</a> | <a href="/import">import</a></p>'
f'<a href="/">back</a>'
)
def handle_edit_form(self, path, msg=""):
try:
page_id = int(path.split("/")[-1])
except ValueError:
return self.respond("<h1>400</h1>", 400)
db = get_db()
row = db.execute("SELECT id, url, title, note FROM pages WHERE id = ?", (page_id,)).fetchone()
db.close()
if not row:
return self.respond("<h1>404</h1>", 404)
self.respond(
f"<h1>edit note</h1>"
f"<p><b>{esc(row['title'])}</b><br>"
f"<small>{esc(row['url'])}</small></p>"
f'<form method="post" action="/edit/{row["id"]}">'
f'<input name="note" value="{esc(row["note"])}" placeholder="why did you save this?" size="50"><br><br>'
f'<button type="submit">save</button>'
f"</form>"
f"<p>{msg}</p>"
f'<a href="/pages">back</a>'
)
def handle_edit_submit(self, path, params):
try:
page_id = int(path.split("/")[-1])
except ValueError:
return self.respond("<h1>400</h1>", 400)
note = params.get("note", [""])[0].strip()
db = get_db()
db.execute("UPDATE pages SET note = ? WHERE id = ?", (note, page_id))
db.commit()
db.close()
self.send_response(302)
self.send_header("Location", "/pages")
self.end_headers()
def handle_delete(self, path):
try:
page_id = int(path.split("/")[-1])
except ValueError:
return self.respond("<h1>400</h1>", 400)
db = get_db()
db.execute("DELETE FROM links WHERE page_id = ?", (page_id,))
db.execute("DELETE FROM pages WHERE id = ?", (page_id,))
db.commit()
db.close()
self.send_response(302)
self.send_header("Location", "/pages")
self.end_headers()
def handle_bookmark(self, params):
url = params.get("url", [""])[0].strip()
if not url or not url.startswith(("http://", "https://")):
self.send_response(200)
self.send_header("Content-Type", "text/plain")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(b"error: invalid url")
return
try:
title, body, links = fetch_page(url)
db = get_db()
cur = db.execute(
"INSERT INTO pages (url, title, body, note) VALUES (?, ?, ?, '') "
"ON CONFLICT(url) DO UPDATE SET title=excluded.title, body=excluded.body",
(url, title, body),
)
page_id = cur.lastrowid
db.execute("DELETE FROM links WHERE page_id = ?", (page_id,))
for href, label in links:
db.execute(
"INSERT INTO links (page_id, url, label) VALUES (?, ?, ?)",
(page_id, href, label),
)
db.commit()
db.close()
msg = f"ok: {title}"
except Exception as e:
msg = f"error: {e}"
self.send_response(200)
self.send_header("Content-Type", "text/plain")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(msg.encode())
def handle_export(self):
db = get_db()
rows = db.execute("SELECT url, title, note FROM pages ORDER BY id").fetchall()
db.close()
data = [{"url": r["url"], "title": r["title"], "note": r["note"]} for r in rows]
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Disposition", "attachment; filename=tinyweb-export.json")
self.end_headers()
self.wfile.write(json.dumps(data, indent=2).encode())
def handle_import_form(self, msg=""):
self.respond(
f"<h1>import</h1>"
f"<p>Paste the contents of a tinyweb export file (JSON).</p>"
f'<form method="post" action="/import">'
f'<textarea name="data" rows="12" cols="60" placeholder=\'[{{"url": "...", "note": "..."}}]\'></textarea><br><br>'
f'<button type="submit">import</button>'
f"</form>"
f"<p>{msg}</p>"
f'<a href="/pages">back</a>'
)
def handle_import_submit(self, params):
raw = params.get("data", [""])[0].strip()
if not raw:
return self.handle_import_form("Paste JSON data.")
try:
data = json.loads(raw)
except json.JSONDecodeError:
return self.handle_import_form("Invalid JSON.")
if not isinstance(data, list):
return self.handle_import_form("Expected a JSON array.")
imported = 0
errors = 0
for entry in data:
url = entry.get("url", "").strip()
note = entry.get("note", "").strip()
if not url:
continue
try:
title, body, links = fetch_page(url)
db = get_db()
cur = db.execute(
"INSERT INTO pages (url, title, body, note) VALUES (?, ?, ?, ?) "
"ON CONFLICT(url) DO UPDATE SET title=excluded.title, body=excluded.body, note=excluded.note",
(url, title, body, note),
)
page_id = cur.lastrowid
db.execute("DELETE FROM links WHERE page_id = ?", (page_id,))
for href, label in links:
db.execute(
"INSERT INTO links (page_id, url, label) VALUES (?, ?, ?)",
(page_id, href, label),
)
db.commit()
db.close()
imported += 1
except Exception:
errors += 1
self.handle_import_form(f"Imported {imported} page(s). {errors} error(s).")
def handle_style_form(self, msg=""):
css = get_setting("custom_css")
name = get_site_name()
self.respond(
f"<h1>customize</h1>"
f"<h2>name your search engine</h2>"
f'<form method="post" action="/style">'
f'<input name="site_name" value="{esc(name)}" placeholder="tinyweb" size="30"><br><br>'
f"<h2>custom css</h2>"
f"<p>Some classes you can target:</p>"
f"<pre>"
f"body - page background, font\n"
f"h1 - page titles\n"
f"input, button - search bar\n"
f"a - links\n"
f".result - each search result\n"
f".note - your notes on results\n"
f".trusted - trusted sites dropdown\n"
f"small - url text\n"
f"ul, li - browse page list"
f"</pre>"
f'<textarea name="css" rows="16" cols="60">{esc(css)}</textarea><br><br>'
f'<button type="submit">save</button>'
f"</form>"
f"<h2>bookmarklet</h2>"
f"<p>Drag this link to your bookmarks bar. Click it on any page to index it instantly.</p>"
f'<p><a href="javascript:void(fetch(\'http://{esc(self.headers.get("Host", "localhost:5001"))}/bookmark?url=\'+encodeURIComponent(location.href)).then(r=>r.text()).then(t=>alert(t)).catch(()=>alert(\'tinyweb not running\')))">+ save to {esc(name)}</a></p>'
f"<p>{msg}</p>"
f'<a href="/">back</a>'
)
def handle_style_submit(self, params):
css = params.get("css", [""])[0]
name = params.get("site_name", ["tinyweb"])[0].strip()
set_setting("custom_css", css)
set_setting("site_name", name or "tinyweb")
self.handle_style_form("Saved.")
if __name__ == "__main__":
init_db()
print("running on http://0.0.0.0:5001")
HTTPServer(("0.0.0.0", 5001), Handler).serve_forever()