This commit is contained in:
blankie 2026-03-24 20:35:20 -07:00
parent 607c99d5a3
commit 3bd8601c63
3 changed files with 526 additions and 0 deletions

524
app.py Normal file
View file

@ -0,0 +1,524 @@
import json
import sqlite3
import html
import requests
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse, urljoin
from bs4 import BeautifulSoup
DATABASE = "index.db"
def get_db():
db = sqlite3.connect(DATABASE)
db.row_factory = sqlite3.Row
return db
def init_db():
db = sqlite3.connect(DATABASE)
db.execute(
"CREATE TABLE IF NOT EXISTS pages ("
" id INTEGER PRIMARY KEY AUTOINCREMENT,"
" url TEXT UNIQUE NOT NULL,"
" title TEXT,"
" body TEXT,"
" note TEXT DEFAULT ''"
")"
)
db.execute(
"CREATE VIRTUAL TABLE IF NOT EXISTS pages_fts "
"USING fts5(title, body, url, note, content=pages, content_rowid=id)"
)
db.execute(
"CREATE TABLE IF NOT EXISTS links ("
" id INTEGER PRIMARY KEY AUTOINCREMENT,"
" page_id INTEGER NOT NULL,"
" url TEXT NOT NULL,"
" label TEXT,"
" FOREIGN KEY (page_id) REFERENCES pages(id) ON DELETE CASCADE"
")"
)
db.execute(
"CREATE TABLE IF NOT EXISTS settings ("
" key TEXT PRIMARY KEY,"
" value TEXT"
")"
)
db.executescript("""
CREATE TRIGGER IF NOT EXISTS pages_ai AFTER INSERT ON pages BEGIN
INSERT INTO pages_fts(rowid, title, body, url, note)
VALUES (new.id, new.title, new.body, new.url, new.note);
END;
CREATE TRIGGER IF NOT EXISTS pages_ad AFTER DELETE ON pages BEGIN
INSERT INTO pages_fts(pages_fts, rowid, title, body, url, note)
VALUES ('delete', old.id, old.title, old.body, old.url, old.note);
END;
CREATE TRIGGER IF NOT EXISTS pages_au AFTER UPDATE ON pages BEGIN
INSERT INTO pages_fts(pages_fts, rowid, title, body, url, note)
VALUES ('delete', old.id, old.title, old.body, old.url, old.note);
INSERT INTO pages_fts(rowid, title, body, url, note)
VALUES (new.id, new.title, new.body, new.url, new.note);
END;
""")
db.commit()
db.close()
SKIP_EXT = (".png", ".jpg", ".jpeg", ".gif", ".svg", ".pdf", ".zip", ".mp3", ".mp4", ".css", ".js", ".ico", ".xml", ".json")
def fetch_page(url):
resp = requests.get(url, timeout=10, headers={"User-Agent": "TinyWeb/1.0"}, verify=False)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
# extract links before stripping tags
domain = urlparse(url).netloc
seen = set()
links = []
for a in soup.find_all("a", href=True):
href = urljoin(url, a["href"]).split("#")[0]
parsed = urlparse(href)
if parsed.netloc != domain:
continue
if any(href.lower().endswith(ext) for ext in SKIP_EXT):
continue
if parsed.query or "action=" in href:
continue
path = parsed.path.lower()
if any(s in path for s in ("/special:", "/talk:", "/user:", "/wikipedia:", "/help:", "/portal:", "/file:", "/category:")):
continue
if href in seen or href == url:
continue
seen.add(href)
label = a.get_text(strip=True) or href
links.append((href, label[:200]))
for tag in soup(["script", "style", "nav", "footer", "header"]):
tag.decompose()
title = soup.title.string.strip() if soup.title and soup.title.string else url
body = soup.get_text(separator=" ", strip=True)
return title, body, links
def snippet(text, query, ctx=80):
pos = text.lower().find(query.lower())
if pos == -1:
return text[:200]
start = max(0, pos - ctx)
end = min(len(text), pos + len(query) + ctx)
return ("..." if start > 0 else "") + text[start:end] + ("..." if end < len(text) else "")
def esc(s):
return html.escape(str(s))
def get_setting(key, default=""):
db = get_db()
row = db.execute("SELECT value FROM settings WHERE key = ?", (key,)).fetchone()
db.close()
return row["value"] if row else default
def set_setting(key, value):
db = get_db()
db.execute(
"INSERT INTO settings (key, value) VALUES (?, ?) "
"ON CONFLICT(key) DO UPDATE SET value=excluded.value",
(key, value),
)
db.commit()
db.close()
def get_site_name():
return get_setting("site_name", "tinyweb")
def wrap_page(body_html):
css = get_setting("custom_css")
style = f"<style>{css}</style>" if css else ""
return f"<html><head>{style}</head><body>{body_html}</body></html>"
class Handler(BaseHTTPRequestHandler):
def respond(self, body, status=200):
self.send_response(status)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.end_headers()
self.wfile.write(wrap_page(body).encode())
def do_GET(self):
parsed = urlparse(self.path)
path = parsed.path
params = parse_qs(parsed.query)
if path == "/":
self.handle_search(params)
elif path == "/add":
self.handle_add_form()
elif path == "/pages":
self.handle_pages()
elif path.startswith("/delete/"):
self.handle_delete(path)
elif path.startswith("/edit/"):
self.handle_edit_form(path)
elif path == "/style":
self.handle_style_form()
elif path == "/bookmark":
self.handle_bookmark(params)
elif path == "/export":
self.handle_export()
elif path == "/import":
self.handle_import_form()
else:
self.respond("<h1>404</h1>", 404)
def do_POST(self):
length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(length).decode()
params = parse_qs(body)
if self.path == "/add":
self.handle_add_submit(params)
elif self.path.startswith("/edit/"):
self.handle_edit_submit(self.path, params)
elif self.path == "/style":
self.handle_style_submit(params)
elif self.path == "/import":
self.handle_import_submit(params)
else:
self.respond("<h1>404</h1>", 404)
def handle_search(self, params):
q = params.get("q", [""])[0].strip()
db = get_db()
count = db.execute("SELECT count(*) FROM pages").fetchone()[0]
name = get_site_name()
result_html = ""
trusted_html = ""
if q:
rows = db.execute(
"SELECT p.id, p.url, p.title, p.body, p.note "
"FROM pages_fts f JOIN pages p ON f.rowid = p.id "
"WHERE pages_fts MATCH ? ORDER BY rank LIMIT 50",
(q,),
).fetchall()
if rows:
for r in rows:
note_html = ""
if r["note"]:
note_html = f'<div class="note"><em>{esc(r["note"])}</em></div>'
result_html += (
f'<div class="result">'
f'<a href="{esc(r["url"])}">{esc(r["title"])}</a><br>'
f'<small>{esc(r["url"])}</small><br>'
f'{esc(snippet(r["body"], q))}'
f'{note_html}'
f'</div>'
)
else:
result_html = "<p>No results in your index.</p>"
# search all linked pages from trusted sites
words = q.lower().split()
all_links = db.execute(
"SELECT l.url, l.label, p.title AS source_title "
"FROM links l JOIN pages p ON l.page_id = p.id",
).fetchall()
indexed_urls = set(r["url"] for r in rows) if rows else set()
seen = set()
trusted = []
for l in all_links:
if l["url"] in indexed_urls or l["url"] in seen:
continue
if any(w in l["label"].lower() for w in words):
seen.add(l["url"])
trusted.append(l)
if len(trusted) >= 20:
break
if trusted:
items = ""
for l in trusted:
items += (
f'<li><a href="{esc(l["url"])}">{esc(l["label"])}</a> '
f'<small>— from {esc(l["source_title"])}</small></li>'
)
trusted_html = (
f'<details class="trusted">'
f'<summary>from your trusted sites ({len(trusted)})</summary>'
f'<ul>{items}</ul>'
f'</details>'
)
db.close()
self.respond(
f'<h1><a href="/">{esc(name)}</a></h1>'
f'<form method="get" action="/">'
f'<input name="q" value="{esc(q)}" placeholder="search your index" size="40">'
f' <button type="submit">search</button>'
f'</form>'
f'<p>{count} page(s) indexed.'
f' <a href="/add">+ add url</a>'
f' | <a href="/pages">browse</a>'
f' | <a href="/style">customize</a></p>'
f'<hr>{result_html}{trusted_html}'
)
def handle_add_form(self, msg=""):
self.respond(
f"<h1>add url</h1>"
f'<form method="post" action="/add">'
f'<input name="url" placeholder="https://example.com" size="50"><br><br>'
f'<input name="note" placeholder="why are you saving this? (optional)" size="50"><br><br>'
f'<button type="submit">index</button>'
f"</form>"
f"<p>{msg}</p>"
f'<a href="/">back</a>'
)
def handle_add_submit(self, params):
url = params.get("url", [""])[0].strip()
note = params.get("note", [""])[0].strip()
if not url:
return self.handle_add_form("URL is required.")
if not url.startswith(("http://", "https://")):
return self.handle_add_form("URL must start with http:// or https://")
try:
title, body, links = fetch_page(url)
db = get_db()
cur = db.execute(
"INSERT INTO pages (url, title, body, note) VALUES (?, ?, ?, ?) "
"ON CONFLICT(url) DO UPDATE SET title=excluded.title, body=excluded.body, note=excluded.note",
(url, title, body, note),
)
page_id = cur.lastrowid
db.execute("DELETE FROM links WHERE page_id = ?", (page_id,))
for href, label in links:
db.execute(
"INSERT INTO links (page_id, url, label) VALUES (?, ?, ?)",
(page_id, href, label),
)
db.commit()
db.close()
self.handle_add_form(f'Indexed: <a href="{esc(url)}">{esc(title)}</a>')
except Exception as e:
self.handle_add_form(f"Error: {esc(str(e))}")
def handle_pages(self):
db = get_db()
rows = db.execute("SELECT id, url, title, note FROM pages ORDER BY id DESC").fetchall()
db.close()
items = ""
for r in rows:
note_html = f' — <em>{esc(r["note"])}</em>' if r["note"] else ""
items += (
f'<li>{esc(r["title"])}{note_html} '
f'<small>(<a href="{esc(r["url"])}">{esc(r["url"])}</a>)</small> '
f'<a href="/edit/{r["id"]}">edit</a> '
f'<a href="/delete/{r["id"]}">remove</a></li>'
)
self.respond(
f"<h1>indexed pages ({len(rows)})</h1>"
f"<ul>{items}</ul>"
f'<p><a href="/export">export</a> | <a href="/import">import</a></p>'
f'<a href="/">back</a>'
)
def handle_edit_form(self, path, msg=""):
try:
page_id = int(path.split("/")[-1])
except ValueError:
return self.respond("<h1>400</h1>", 400)
db = get_db()
row = db.execute("SELECT id, url, title, note FROM pages WHERE id = ?", (page_id,)).fetchone()
db.close()
if not row:
return self.respond("<h1>404</h1>", 404)
self.respond(
f"<h1>edit note</h1>"
f"<p><b>{esc(row['title'])}</b><br>"
f"<small>{esc(row['url'])}</small></p>"
f'<form method="post" action="/edit/{row["id"]}">'
f'<input name="note" value="{esc(row["note"])}" placeholder="why did you save this?" size="50"><br><br>'
f'<button type="submit">save</button>'
f"</form>"
f"<p>{msg}</p>"
f'<a href="/pages">back</a>'
)
def handle_edit_submit(self, path, params):
try:
page_id = int(path.split("/")[-1])
except ValueError:
return self.respond("<h1>400</h1>", 400)
note = params.get("note", [""])[0].strip()
db = get_db()
db.execute("UPDATE pages SET note = ? WHERE id = ?", (note, page_id))
db.commit()
db.close()
self.send_response(302)
self.send_header("Location", "/pages")
self.end_headers()
def handle_delete(self, path):
try:
page_id = int(path.split("/")[-1])
except ValueError:
return self.respond("<h1>400</h1>", 400)
db = get_db()
db.execute("DELETE FROM links WHERE page_id = ?", (page_id,))
db.execute("DELETE FROM pages WHERE id = ?", (page_id,))
db.commit()
db.close()
self.send_response(302)
self.send_header("Location", "/pages")
self.end_headers()
def handle_bookmark(self, params):
url = params.get("url", [""])[0].strip()
if not url or not url.startswith(("http://", "https://")):
self.send_response(200)
self.send_header("Content-Type", "text/plain")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(b"error: invalid url")
return
try:
title, body, links = fetch_page(url)
db = get_db()
cur = db.execute(
"INSERT INTO pages (url, title, body, note) VALUES (?, ?, ?, '') "
"ON CONFLICT(url) DO UPDATE SET title=excluded.title, body=excluded.body",
(url, title, body),
)
page_id = cur.lastrowid
db.execute("DELETE FROM links WHERE page_id = ?", (page_id,))
for href, label in links:
db.execute(
"INSERT INTO links (page_id, url, label) VALUES (?, ?, ?)",
(page_id, href, label),
)
db.commit()
db.close()
msg = f"ok: {title}"
except Exception as e:
msg = f"error: {e}"
self.send_response(200)
self.send_header("Content-Type", "text/plain")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(msg.encode())
def handle_export(self):
db = get_db()
rows = db.execute("SELECT url, title, note FROM pages ORDER BY id").fetchall()
db.close()
data = [{"url": r["url"], "title": r["title"], "note": r["note"]} for r in rows]
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Disposition", "attachment; filename=tinyweb-export.json")
self.end_headers()
self.wfile.write(json.dumps(data, indent=2).encode())
def handle_import_form(self, msg=""):
self.respond(
f"<h1>import</h1>"
f"<p>Paste the contents of a tinyweb export file (JSON).</p>"
f'<form method="post" action="/import">'
f'<textarea name="data" rows="12" cols="60" placeholder=\'[{{"url": "...", "note": "..."}}]\'></textarea><br><br>'
f'<button type="submit">import</button>'
f"</form>"
f"<p>{msg}</p>"
f'<a href="/pages">back</a>'
)
def handle_import_submit(self, params):
raw = params.get("data", [""])[0].strip()
if not raw:
return self.handle_import_form("Paste JSON data.")
try:
data = json.loads(raw)
except json.JSONDecodeError:
return self.handle_import_form("Invalid JSON.")
if not isinstance(data, list):
return self.handle_import_form("Expected a JSON array.")
imported = 0
errors = 0
for entry in data:
url = entry.get("url", "").strip()
note = entry.get("note", "").strip()
if not url:
continue
try:
title, body, links = fetch_page(url)
db = get_db()
cur = db.execute(
"INSERT INTO pages (url, title, body, note) VALUES (?, ?, ?, ?) "
"ON CONFLICT(url) DO UPDATE SET title=excluded.title, body=excluded.body, note=excluded.note",
(url, title, body, note),
)
page_id = cur.lastrowid
db.execute("DELETE FROM links WHERE page_id = ?", (page_id,))
for href, label in links:
db.execute(
"INSERT INTO links (page_id, url, label) VALUES (?, ?, ?)",
(page_id, href, label),
)
db.commit()
db.close()
imported += 1
except Exception:
errors += 1
self.handle_import_form(f"Imported {imported} page(s). {errors} error(s).")
def handle_style_form(self, msg=""):
css = get_setting("custom_css")
name = get_site_name()
self.respond(
f"<h1>customize</h1>"
f"<h2>name your search engine</h2>"
f'<form method="post" action="/style">'
f'<input name="site_name" value="{esc(name)}" placeholder="tinyweb" size="30"><br><br>'
f"<h2>custom css</h2>"
f"<p>Some classes you can target:</p>"
f"<pre>"
f"body - page background, font\n"
f"h1 - page titles\n"
f"input, button - search bar\n"
f"a - links\n"
f".result - each search result\n"
f".note - your notes on results\n"
f".trusted - trusted sites dropdown\n"
f"small - url text\n"
f"ul, li - browse page list"
f"</pre>"
f'<textarea name="css" rows="16" cols="60">{esc(css)}</textarea><br><br>'
f'<button type="submit">save</button>'
f"</form>"
f"<h2>bookmarklet</h2>"
f"<p>Drag this link to your bookmarks bar. Click it on any page to index it instantly.</p>"
f'<p><a href="javascript:void(fetch(\'http://localhost:5001/bookmark?url=\'+encodeURIComponent(location.href)).then(r=>r.text()).then(t=>alert(t)).catch(()=>alert(\'tinyweb not running\')))">+ save to {esc(name)}</a></p>'
f"<p>{msg}</p>"
f'<a href="/">back</a>'
)
def handle_style_submit(self, params):
css = params.get("css", [""])[0]
name = params.get("site_name", ["tinyweb"])[0].strip()
set_setting("custom_css", css)
set_setting("site_name", name or "tinyweb")
self.handle_style_form("Saved.")
if __name__ == "__main__":
init_db()
print("running on http://localhost:5001")
HTTPServer(("localhost", 5001), Handler).serve_forever()