import socket import ipaddress import sqlite3 import requests from urllib.parse import urlparse, urljoin, parse_qs, urlencode, urlunparse, quote from bs4 import BeautifulSoup DATABASE = "index.db" BLOCKED_NETWORKS = [ ipaddress.ip_network("127.0.0.0/8"), ipaddress.ip_network("10.0.0.0/8"), ipaddress.ip_network("172.16.0.0/12"), ipaddress.ip_network("192.168.0.0/16"), ipaddress.ip_network("169.254.0.0/16"), ipaddress.ip_network("0.0.0.0/8"), ipaddress.ip_network("::1/128"), ipaddress.ip_network("fc00::/7"), ipaddress.ip_network("fe80::/10"), ] def _validate_url_target(url): """Resolve hostname and block private/internal IPs to prevent SSRF.""" parsed = urlparse(url) hostname = parsed.hostname port = parsed.port or (443 if parsed.scheme == "https" else 80) if not hostname: raise ValueError(f"No hostname in URL: {url}") try: addrs = socket.getaddrinfo(hostname, port, proto=socket.IPPROTO_TCP) except socket.gaierror: raise ValueError(f"Cannot resolve hostname: {hostname}") for family, type_, proto, canonname, sockaddr in addrs: ip = ipaddress.ip_address(sockaddr[0]) for network in BLOCKED_NETWORKS: if ip in network: raise ValueError(f"URL resolves to blocked address: {ip}") SKIP_EXT = (".png", ".jpg", ".jpeg", ".gif", ".svg", ".pdf", ".zip", ".mp3", ".mp4", ".css", ".js", ".ico", ".xml", ".json") TRACKING_PARAMS = { "utm_source", "utm_medium", "utm_campaign", "utm_term", "utm_content", "fbclid", "gclid", "msclkid", "mc_cid", "mc_eid", "ref", "ref_src", "ref_url", "_ga", "_gl", "yclid", "twclid", "igshid", } def clean_url(url): parsed = urlparse(url) # Prefer https scheme = "https" if parsed.scheme in ("http", "https") else parsed.scheme # Normalize hostname: lowercase, strip www. hostname = (parsed.hostname or "").lower() if hostname.startswith("www."): hostname = hostname[4:] # Preserve explicit non-default ports port = parsed.port if port and ((scheme == "https" and port == 443) or (scheme == "http" and port == 80)): port = None netloc = f"{hostname}:{port}" if port else hostname # Strip trailing slash (keep root "/" as-is) path = parsed.path.rstrip("/") or "/" # Remove tracking params and sort remaining for consistent ordering params = parse_qs(parsed.query) cleaned = sorted( ((k, sorted(v)) for k, v in params.items() if k.lower() not in TRACKING_PARAMS), key=lambda x: x[0], ) new_query = urlencode(cleaned, doseq=True, quote_via=quote) return urlunparse((scheme, netloc, path, "", new_query, "")) _pool = [] _pool_lock = __import__("threading").Lock() _POOL_SIZE = 4 def get_db(): with _pool_lock: if _pool: db = _pool.pop() try: db.execute("SELECT 1") return db except Exception: pass db = sqlite3.connect(DATABASE, timeout=10) db.execute("PRAGMA journal_mode=WAL") db.execute("PRAGMA foreign_keys = ON") db.row_factory = sqlite3.Row return db def return_db(db): with _pool_lock: if len(_pool) < _POOL_SIZE: _pool.append(db) else: db.close() def init_db(): db = sqlite3.connect(DATABASE) db.execute( "CREATE TABLE IF NOT EXISTS pages (" " id INTEGER PRIMARY KEY AUTOINCREMENT," " url TEXT UNIQUE NOT NULL," " title TEXT," " body TEXT," " note TEXT DEFAULT ''," " last_modified TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%S','now'))" ")" ) db.execute( "CREATE VIRTUAL TABLE IF NOT EXISTS pages_fts " "USING fts5(title, body, url, note, content=pages, content_rowid=id)" ) db.execute( "CREATE TABLE IF NOT EXISTS links (" " id INTEGER PRIMARY KEY AUTOINCREMENT," " page_id INTEGER NOT NULL," " url TEXT NOT NULL," " label TEXT," " FOREIGN KEY (page_id) REFERENCES pages(id) ON DELETE CASCADE" ")" ) db.execute( "CREATE TABLE IF NOT EXISTS settings (" " key TEXT PRIMARY KEY," " value TEXT" ")" ) db.execute( "CREATE TABLE IF NOT EXISTS subscriptions (" " id INTEGER PRIMARY KEY AUTOINCREMENT," " dest_hash TEXT UNIQUE NOT NULL," " name TEXT DEFAULT ''," " auto_sync INTEGER DEFAULT 0," " last_sync TEXT DEFAULT ''" ")" ) db.execute( "CREATE TABLE IF NOT EXISTS remote_pages (" " id INTEGER PRIMARY KEY AUTOINCREMENT," " subscription_id INTEGER NOT NULL," " url TEXT NOT NULL," " title TEXT," " note TEXT DEFAULT ''," " tags TEXT DEFAULT ''," " FOREIGN KEY (subscription_id) REFERENCES subscriptions(id) ON DELETE CASCADE," " UNIQUE(subscription_id, url)" ")" ) db.execute( "CREATE VIRTUAL TABLE IF NOT EXISTS remote_pages_fts " "USING fts5(title, url, note, content=remote_pages, content_rowid=id)" ) db.execute( "CREATE TABLE IF NOT EXISTS tags (" " id INTEGER PRIMARY KEY AUTOINCREMENT," " name TEXT UNIQUE NOT NULL" ")" ) db.execute( "CREATE TABLE IF NOT EXISTS page_tags (" " page_id INTEGER NOT NULL," " tag_id INTEGER NOT NULL," " PRIMARY KEY (page_id, tag_id)," " FOREIGN KEY (page_id) REFERENCES pages(id) ON DELETE CASCADE," " FOREIGN KEY (tag_id) REFERENCES tags(id) ON DELETE CASCADE" ")" ) db.executescript(""" CREATE TRIGGER IF NOT EXISTS pages_ai AFTER INSERT ON pages BEGIN INSERT INTO pages_fts(rowid, title, body, url, note) VALUES (new.id, new.title, new.body, new.url, new.note); END; CREATE TRIGGER IF NOT EXISTS pages_ad AFTER DELETE ON pages BEGIN INSERT INTO pages_fts(pages_fts, rowid, title, body, url, note) VALUES ('delete', old.id, old.title, old.body, old.url, old.note); END; CREATE TRIGGER IF NOT EXISTS pages_au AFTER UPDATE ON pages BEGIN INSERT INTO pages_fts(pages_fts, rowid, title, body, url, note) VALUES ('delete', old.id, old.title, old.body, old.url, old.note); INSERT INTO pages_fts(rowid, title, body, url, note) VALUES (new.id, new.title, new.body, new.url, new.note); END; CREATE TRIGGER IF NOT EXISTS remote_pages_ai AFTER INSERT ON remote_pages BEGIN INSERT INTO remote_pages_fts(rowid, title, url, note) VALUES (new.id, new.title, new.url, new.note); END; CREATE TRIGGER IF NOT EXISTS remote_pages_ad AFTER DELETE ON remote_pages BEGIN INSERT INTO remote_pages_fts(remote_pages_fts, rowid, title, url, note) VALUES ('delete', old.id, old.title, old.url, old.note); END; CREATE TRIGGER IF NOT EXISTS remote_pages_au AFTER UPDATE ON remote_pages BEGIN INSERT INTO remote_pages_fts(remote_pages_fts, rowid, title, url, note) VALUES ('delete', old.id, old.title, old.url, old.note); INSERT INTO remote_pages_fts(rowid, title, url, note) VALUES (new.id, new.title, new.url, new.note); END; """) # Migrate old subscriptions table if needed cols = [row[1] for row in db.execute("PRAGMA table_info(subscriptions)").fetchall()] if "url" in cols and "dest_hash" not in cols: db.execute("ALTER TABLE subscriptions RENAME COLUMN url TO dest_hash") db.commit() # Migrate remote_pages: add tags column if missing rp_cols = [row[1] for row in db.execute("PRAGMA table_info(remote_pages)").fetchall()] if "tags" not in rp_cols: db.execute("ALTER TABLE remote_pages ADD COLUMN tags TEXT DEFAULT ''") db.commit() # Migrate pages: add last_modified column if missing page_cols = [row[1] for row in db.execute("PRAGMA table_info(pages)").fetchall()] if "last_modified" not in page_cols: db.execute("ALTER TABLE pages ADD COLUMN last_modified TEXT DEFAULT ''") db.execute("UPDATE pages SET last_modified = strftime('%Y-%m-%dT%H:%M:%S','now') WHERE last_modified = ''") db.commit() # Migrate pages: add summary column if missing if "summary" not in page_cols: db.execute("ALTER TABLE pages ADD COLUMN summary TEXT DEFAULT ''") db.commit() # Chunks table for semantic search embeddings db.execute( "CREATE TABLE IF NOT EXISTS chunks (" " id INTEGER PRIMARY KEY AUTOINCREMENT," " page_id INTEGER," " remote_page_id INTEGER," " chunk_index INTEGER NOT NULL," " chunk_text TEXT NOT NULL," " embedding BLOB NOT NULL," " FOREIGN KEY (page_id) REFERENCES pages(id) ON DELETE CASCADE," " FOREIGN KEY (remote_page_id) REFERENCES remote_pages(id) ON DELETE CASCADE" ")" ) db.execute("CREATE INDEX IF NOT EXISTS idx_chunks_page ON chunks(page_id)") db.execute("CREATE INDEX IF NOT EXISTS idx_chunks_remote ON chunks(remote_page_id)") db.execute("PRAGMA journal_mode=WAL") db.commit() db.close() def get_setting(key, default=""): db = get_db() try: row = db.execute("SELECT value FROM settings WHERE key = ?", (key,)).fetchone() return row["value"] if row else default finally: return_db(db) def set_setting(key, value): db = get_db() try: db.execute( "INSERT INTO settings (key, value) VALUES (?, ?) " "ON CONFLICT(key) DO UPDATE SET value=excluded.value", (key, value), ) db.commit() finally: return_db(db) def get_site_name(): return get_setting("site_name", "tinyweb") def fetch_page(url): _validate_url_target(url) resp = requests.get(url, timeout=10, headers={"User-Agent": "TinyWeb/1.0"}, allow_redirects=False) # Follow redirects manually, re-validating each target max_redirects = 5 while resp.is_redirect and max_redirects > 0: redirect_url = resp.headers.get("Location") if not redirect_url: break redirect_url = urljoin(url, redirect_url) _validate_url_target(redirect_url) url = redirect_url resp = requests.get(url, timeout=10, headers={"User-Agent": "TinyWeb/1.0"}, allow_redirects=False) max_redirects -= 1 resp.raise_for_status() soup = BeautifulSoup(resp.text, "html.parser") # extract links before stripping tags domain = urlparse(url).netloc seen = set() links = [] for a in soup.find_all("a", href=True): href = urljoin(url, a["href"]).split("#")[0] parsed = urlparse(href) if parsed.netloc != domain: continue if any(href.lower().endswith(ext) for ext in SKIP_EXT): continue if parsed.query or "action=" in href: continue path = parsed.path.lower() if any(s in path for s in ("/special:", "/talk:", "/user:", "/wikipedia:", "/help:", "/portal:", "/file:", "/category:")): continue if href in seen or href == url: continue seen.add(href) label = a.get_text(strip=True) or href links.append((href, label[:200])) # Extract meta description before stripping tags meta_desc = "" meta_tag = soup.find("meta", attrs={"name": "description"}) if meta_tag and meta_tag.get("content"): meta_desc = meta_tag["content"].strip() if not meta_desc: # Try og:description as fallback og_tag = soup.find("meta", attrs={"property": "og:description"}) if og_tag and og_tag.get("content"): meta_desc = og_tag["content"].strip() for tag in soup(["script", "style", "nav", "footer", "header", "noscript"]): tag.decompose() title = soup.title.string.strip() if soup.title and soup.title.string else url body = soup.get_text(separator=" ", strip=True) return title, body, links, meta_desc def _generate_summary(title, body): """Generate a summary from body text using centroid extractive method. Filters out UI debris, embeds remaining sentences, finds the one closest to the centroid (most representative of the page). """ import re # Split on sentence boundaries raw = re.split(r'(?<=[.!?])\s+', body) sentences = [] noise_patterns = re.compile( r'arrow-|fedilink|message-square|link-external|' r'skip to|cookie|subscribe|sign up|log in|' r'privacy policy|terms of|©|\bads?\b', re.IGNORECASE ) for s in raw: s = s.strip() if len(s) < 40: continue words = s.split() if len(words) < 7: continue # Skip if mostly non-alpha (icons, arrows, encoded chars) alpha_chars = sum(1 for c in s if c.isalpha() or c == ' ') if alpha_chars < len(s) * 0.6: continue # Skip nav/menu patterns if s.count('|') > 2 or s.count('·') > 2 or s.count('►') > 0: continue # Skip UI debris if noise_patterns.search(s): continue sentences.append(s) if not sentences: # Last resort: take the first chunk of body that looks like prose clean = re.sub(r'\s+', ' ', body).strip() return clean[:160] + "..." if len(clean) > 160 else clean if len(sentences) == 1: s = sentences[0] return s[:200] if len(s) > 200 else s try: from embeddings import embed import numpy as np embs = embed(sentences[:50]) # cap to avoid embedding too many centroid = embs.mean(axis=0, keepdims=True) centroid = centroid / max(np.linalg.norm(centroid), 1e-12) scores = (embs @ centroid.T).flatten() best_idx = int(np.argmax(scores)) result = sentences[best_idx] # Try to add a second sentence if it fits if best_idx + 1 < len(sentences) and len(result) + len(sentences[best_idx + 1]) + 1 <= 200: result += " " + sentences[best_idx + 1] return result[:200] if len(result) > 200 else result except Exception: return sentences[0][:200] def index_url(url, note=""): url = clean_url(url) title, body, links, meta_desc = fetch_page(url) # Use meta description if available, otherwise generate from body summary = meta_desc if meta_desc else _generate_summary(title, body) db = get_db() try: now = __import__("datetime").datetime.now().strftime("%Y-%m-%dT%H:%M:%S") db.execute( "INSERT INTO pages (url, title, body, note, last_modified, summary) VALUES (?, ?, ?, ?, ?, ?) " "ON CONFLICT(url) DO UPDATE SET title=excluded.title, body=excluded.body, " "note=excluded.note, last_modified=excluded.last_modified, summary=excluded.summary", (url, title, body, note, now, summary), ) page_id = db.execute("SELECT id FROM pages WHERE url = ?", (url,)).fetchone()[0] db.execute("DELETE FROM links WHERE page_id = ?", (page_id,)) for href, label in links: db.execute( "INSERT INTO links (page_id, url, label) VALUES (?, ?, ?)", (page_id, href, label), ) db.commit() try: from embeddings import store_embeddings store_embeddings(page_id, title, body, db) except Exception: pass # embedding generation is best-effort finally: return_db(db) return title