tinyweb/db.py
lichenblankie 4899819597 added bookmark auth, CSP, per-session CSRF
- Bookmark endpoint now requires a secret token (stored in settings)
- Style reset moved from GET to POST with CSRF protection
- Open redirect prevention in _redirect() helper
- Import capped at 100 URLs to prevent abuse
- page_tags cleaned up on delete + PRAGMA foreign_keys enabled
- CSP, X-Frame-Options, X-Content-Type-Options on all responses
- CSRF tokens now per-session via double-submit cookie pattern
- Tag names URL-decoded for special characters
- Gateway forwards cookies in request data
2026-06-05 05:29:35 +00:00

255 lines
9 KiB
Python

import socket
import ipaddress
import sqlite3
import requests
from urllib.parse import urlparse, urljoin, parse_qs, urlencode, urlunparse
from bs4 import BeautifulSoup
DATABASE = "index.db"
BLOCKED_NETWORKS = [
ipaddress.ip_network("127.0.0.0/8"),
ipaddress.ip_network("10.0.0.0/8"),
ipaddress.ip_network("172.16.0.0/12"),
ipaddress.ip_network("192.168.0.0/16"),
ipaddress.ip_network("169.254.0.0/16"),
ipaddress.ip_network("0.0.0.0/8"),
ipaddress.ip_network("::1/128"),
ipaddress.ip_network("fc00::/7"),
ipaddress.ip_network("fe80::/10"),
]
def _validate_url_target(url):
"""Resolve hostname and block private/internal IPs to prevent SSRF."""
parsed = urlparse(url)
hostname = parsed.hostname
port = parsed.port or (443 if parsed.scheme == "https" else 80)
if not hostname:
raise ValueError(f"No hostname in URL: {url}")
try:
addrs = socket.getaddrinfo(hostname, port, proto=socket.IPPROTO_TCP)
except socket.gaierror:
raise ValueError(f"Cannot resolve hostname: {hostname}")
for family, type_, proto, canonname, sockaddr in addrs:
ip = ipaddress.ip_address(sockaddr[0])
for network in BLOCKED_NETWORKS:
if ip in network:
raise ValueError(f"URL resolves to blocked address: {ip}")
SKIP_EXT = (".png", ".jpg", ".jpeg", ".gif", ".svg", ".pdf", ".zip", ".mp3", ".mp4", ".css", ".js", ".ico", ".xml", ".json")
TRACKING_PARAMS = {
"utm_source", "utm_medium", "utm_campaign", "utm_term", "utm_content",
"fbclid", "gclid", "msclkid", "mc_cid", "mc_eid", "ref", "ref_src",
"ref_url", "_ga", "_gl", "yclid", "twclid", "igshid",
}
def clean_url(url):
parsed = urlparse(url)
params = parse_qs(parsed.query)
cleaned = {k: v for k, v in params.items() if k.lower() not in TRACKING_PARAMS}
new_query = urlencode(cleaned, doseq=True)
return urlunparse(parsed._replace(query=new_query))
def get_db():
db = sqlite3.connect(DATABASE)
db.execute("PRAGMA foreign_keys = ON")
db.row_factory = sqlite3.Row
return db
def init_db():
db = sqlite3.connect(DATABASE)
db.execute(
"CREATE TABLE IF NOT EXISTS pages ("
" id INTEGER PRIMARY KEY AUTOINCREMENT,"
" url TEXT UNIQUE NOT NULL,"
" title TEXT,"
" body TEXT,"
" note TEXT DEFAULT ''"
")"
)
db.execute(
"CREATE VIRTUAL TABLE IF NOT EXISTS pages_fts "
"USING fts5(title, body, url, note, content=pages, content_rowid=id)"
)
db.execute(
"CREATE TABLE IF NOT EXISTS links ("
" id INTEGER PRIMARY KEY AUTOINCREMENT,"
" page_id INTEGER NOT NULL,"
" url TEXT NOT NULL,"
" label TEXT,"
" FOREIGN KEY (page_id) REFERENCES pages(id) ON DELETE CASCADE"
")"
)
db.execute(
"CREATE TABLE IF NOT EXISTS settings ("
" key TEXT PRIMARY KEY,"
" value TEXT"
")"
)
db.execute(
"CREATE TABLE IF NOT EXISTS subscriptions ("
" id INTEGER PRIMARY KEY AUTOINCREMENT,"
" dest_hash TEXT UNIQUE NOT NULL,"
" name TEXT DEFAULT '',"
" auto_sync INTEGER DEFAULT 0,"
" last_sync TEXT DEFAULT ''"
")"
)
db.execute(
"CREATE TABLE IF NOT EXISTS remote_pages ("
" id INTEGER PRIMARY KEY AUTOINCREMENT,"
" subscription_id INTEGER NOT NULL,"
" url TEXT NOT NULL,"
" title TEXT,"
" note TEXT DEFAULT '',"
" tags TEXT DEFAULT '',"
" FOREIGN KEY (subscription_id) REFERENCES subscriptions(id) ON DELETE CASCADE,"
" UNIQUE(subscription_id, url)"
")"
)
db.execute(
"CREATE VIRTUAL TABLE IF NOT EXISTS remote_pages_fts "
"USING fts5(title, url, note, content=remote_pages, content_rowid=id)"
)
db.execute(
"CREATE TABLE IF NOT EXISTS tags ("
" id INTEGER PRIMARY KEY AUTOINCREMENT,"
" name TEXT UNIQUE NOT NULL"
")"
)
db.execute(
"CREATE TABLE IF NOT EXISTS page_tags ("
" page_id INTEGER NOT NULL,"
" tag_id INTEGER NOT NULL,"
" PRIMARY KEY (page_id, tag_id),"
" FOREIGN KEY (page_id) REFERENCES pages(id) ON DELETE CASCADE,"
" FOREIGN KEY (tag_id) REFERENCES tags(id) ON DELETE CASCADE"
")"
)
db.executescript("""
CREATE TRIGGER IF NOT EXISTS pages_ai AFTER INSERT ON pages BEGIN
INSERT INTO pages_fts(rowid, title, body, url, note)
VALUES (new.id, new.title, new.body, new.url, new.note);
END;
CREATE TRIGGER IF NOT EXISTS pages_ad AFTER DELETE ON pages BEGIN
INSERT INTO pages_fts(pages_fts, rowid, title, body, url, note)
VALUES ('delete', old.id, old.title, old.body, old.url, old.note);
END;
CREATE TRIGGER IF NOT EXISTS pages_au AFTER UPDATE ON pages BEGIN
INSERT INTO pages_fts(pages_fts, rowid, title, body, url, note)
VALUES ('delete', old.id, old.title, old.body, old.url, old.note);
INSERT INTO pages_fts(rowid, title, body, url, note)
VALUES (new.id, new.title, new.body, new.url, new.note);
END;
CREATE TRIGGER IF NOT EXISTS remote_pages_ai AFTER INSERT ON remote_pages BEGIN
INSERT INTO remote_pages_fts(rowid, title, url, note)
VALUES (new.id, new.title, new.url, new.note);
END;
CREATE TRIGGER IF NOT EXISTS remote_pages_ad AFTER DELETE ON remote_pages BEGIN
INSERT INTO remote_pages_fts(remote_pages_fts, rowid, title, url, note)
VALUES ('delete', old.id, old.title, old.url, old.note);
END;
CREATE TRIGGER IF NOT EXISTS remote_pages_au AFTER UPDATE ON remote_pages BEGIN
INSERT INTO remote_pages_fts(remote_pages_fts, rowid, title, url, note)
VALUES ('delete', old.id, old.title, old.url, old.note);
INSERT INTO remote_pages_fts(rowid, title, url, note)
VALUES (new.id, new.title, new.url, new.note);
END;
""")
# Migrate old subscriptions table if needed
cols = [row[1] for row in db.execute("PRAGMA table_info(subscriptions)").fetchall()]
if "url" in cols and "dest_hash" not in cols:
db.execute("ALTER TABLE subscriptions RENAME COLUMN url TO dest_hash")
db.commit()
# Migrate remote_pages: add tags column if missing
rp_cols = [row[1] for row in db.execute("PRAGMA table_info(remote_pages)").fetchall()]
if "tags" not in rp_cols:
db.execute("ALTER TABLE remote_pages ADD COLUMN tags TEXT DEFAULT ''")
db.commit()
db.commit()
db.close()
def get_setting(key, default=""):
db = get_db()
row = db.execute("SELECT value FROM settings WHERE key = ?", (key,)).fetchone()
db.close()
return row["value"] if row else default
def set_setting(key, value):
db = get_db()
db.execute(
"INSERT INTO settings (key, value) VALUES (?, ?) "
"ON CONFLICT(key) DO UPDATE SET value=excluded.value",
(key, value),
)
db.commit()
db.close()
def get_site_name():
return get_setting("site_name", "tinyweb")
def fetch_page(url):
_validate_url_target(url)
resp = requests.get(url, timeout=10, headers={"User-Agent": "TinyWeb/1.0"})
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
# extract links before stripping tags
domain = urlparse(url).netloc
seen = set()
links = []
for a in soup.find_all("a", href=True):
href = urljoin(url, a["href"]).split("#")[0]
parsed = urlparse(href)
if parsed.netloc != domain:
continue
if any(href.lower().endswith(ext) for ext in SKIP_EXT):
continue
if parsed.query or "action=" in href:
continue
path = parsed.path.lower()
if any(s in path for s in ("/special:", "/talk:", "/user:", "/wikipedia:", "/help:", "/portal:", "/file:", "/category:")):
continue
if href in seen or href == url:
continue
seen.add(href)
label = a.get_text(strip=True) or href
links.append((href, label[:200]))
for tag in soup(["script", "style", "nav", "footer", "header"]):
tag.decompose()
title = soup.title.string.strip() if soup.title and soup.title.string else url
body = soup.get_text(separator=" ", strip=True)
return title, body, links
def index_url(url, note=""):
url = clean_url(url)
title, body, links = fetch_page(url)
db = get_db()
cur = db.execute(
"INSERT INTO pages (url, title, body, note) VALUES (?, ?, ?, ?) "
"ON CONFLICT(url) DO UPDATE SET title=excluded.title, body=excluded.body, note=excluded.note",
(url, title, body, note),
)
page_id = cur.lastrowid
db.execute("DELETE FROM links WHERE page_id = ?", (page_id,))
for href, label in links:
db.execute(
"INSERT INTO links (page_id, url, label) VALUES (?, ?, ?)",
(page_id, href, label),
)
db.commit()
db.close()
return title