From 0981c2e0a937147523881d62b2377f929ccf7c5d Mon Sep 17 00:00:00 2001 From: lichenblankie Date: Thu, 26 Mar 2026 10:54:22 -0700 Subject: [PATCH] hardened CSRF, SSRF, FTS5 - CSRF: Generate random token at startup, include as hidden field in all 11 POST forms, validate at top of POST dispatch (returns 403) - SSRF: Block private/internal IP ranges (127/8, 10/8, 172.16/12, 192.168/16, 169.254/16, ::1, fc00::/7) by resolving hostname before fetch. Remove verify=False from requests.get(). - DELETE: Change /delete/ from GET (instant delete) to GET (confirmation page) + POST (actual delete) to prevent accidental deletion from prefetchers/crawlers. - FTS5: Wrap search input in double quotes to neutralize FTS5 operators (AND, OR, NOT, *, column:). Add try/except fallback. --- db.py | 35 ++++++++++++++++++++- handlers.py | 91 ++++++++++++++++++++++++++++++++++++++++++----------- 2 files changed, 106 insertions(+), 20 deletions(-) diff --git a/db.py b/db.py index b523c79..c9ea195 100644 --- a/db.py +++ b/db.py @@ -1,3 +1,5 @@ +import socket +import ipaddress import sqlite3 import requests from urllib.parse import urlparse, urljoin, parse_qs, urlencode, urlunparse @@ -5,6 +7,36 @@ from bs4 import BeautifulSoup DATABASE = "index.db" +BLOCKED_NETWORKS = [ + ipaddress.ip_network("127.0.0.0/8"), + ipaddress.ip_network("10.0.0.0/8"), + ipaddress.ip_network("172.16.0.0/12"), + ipaddress.ip_network("192.168.0.0/16"), + ipaddress.ip_network("169.254.0.0/16"), + ipaddress.ip_network("0.0.0.0/8"), + ipaddress.ip_network("::1/128"), + ipaddress.ip_network("fc00::/7"), + ipaddress.ip_network("fe80::/10"), +] + + +def _validate_url_target(url): + """Resolve hostname and block private/internal IPs to prevent SSRF.""" + parsed = urlparse(url) + hostname = parsed.hostname + port = parsed.port or (443 if parsed.scheme == "https" else 80) + if not hostname: + raise ValueError(f"No hostname in URL: {url}") + try: + addrs = socket.getaddrinfo(hostname, port, proto=socket.IPPROTO_TCP) + except socket.gaierror: + raise ValueError(f"Cannot resolve hostname: {hostname}") + for family, type_, proto, canonname, sockaddr in addrs: + ip = ipaddress.ip_address(sockaddr[0]) + for network in BLOCKED_NETWORKS: + if ip in network: + raise ValueError(f"URL resolves to blocked address: {ip}") + SKIP_EXT = (".png", ".jpg", ".jpeg", ".gif", ".svg", ".pdf", ".zip", ".mp3", ".mp4", ".css", ".js", ".ico", ".xml", ".json") TRACKING_PARAMS = { @@ -167,7 +199,8 @@ def get_site_name(): def fetch_page(url): - resp = requests.get(url, timeout=10, headers={"User-Agent": "TinyWeb/1.0"}, verify=False) + _validate_url_target(url) + resp = requests.get(url, timeout=10, headers={"User-Agent": "TinyWeb/1.0"}) resp.raise_for_status() soup = BeautifulSoup(resp.text, "html.parser") diff --git a/handlers.py b/handlers.py index 4ef8939..0045767 100644 --- a/handlers.py +++ b/handlers.py @@ -1,10 +1,28 @@ import json +import secrets from datetime import datetime from db import get_db, get_setting, set_setting, get_site_name, index_url, clean_url from templates import esc, snippet, wrap_page, DEFAULT_TEMPLATE from rns_client import fetch_remote_sites +_csrf_token = secrets.token_hex(32) + + +def _csrf_field(): + return f'' + + +def _check_csrf(body): + token = body.get("_csrf", [""])[0] + return secrets.compare_digest(token, _csrf_token) + + +def _sanitize_fts_query(query): + """Escape user input for safe use in FTS5 MATCH.""" + escaped = query.replace('"', '""') + return f'"{escaped}"' + def _respond(body_html, status=200, use_default=False): return { @@ -90,12 +108,15 @@ def handle_search(query): result_html = "" trusted_html = "" if q: - rows = db.execute( - "SELECT p.id, p.url, p.title, p.body, p.note " - "FROM pages_fts f JOIN pages p ON f.rowid = p.id " - "WHERE pages_fts MATCH ? ORDER BY rank LIMIT 50", - (q,), - ).fetchall() + try: + rows = db.execute( + "SELECT p.id, p.url, p.title, p.body, p.note " + "FROM pages_fts f JOIN pages p ON f.rowid = p.id " + "WHERE pages_fts MATCH ? ORDER BY rank LIMIT 50", + (_sanitize_fts_query(q),), + ).fetchall() + except Exception: + rows = [] if rows: for r in rows: note_html = "" @@ -150,14 +171,17 @@ def handle_search(query): ) # search synced pages from subscriptions - remote_rows = db.execute( - "SELECT rp.url, rp.title, rp.note, s.name AS source_name " - "FROM remote_pages_fts rpf " - "JOIN remote_pages rp ON rpf.rowid = rp.id " - "JOIN subscriptions s ON rp.subscription_id = s.id " - "WHERE remote_pages_fts MATCH ? ORDER BY rank LIMIT 50", - (q,), - ).fetchall() + try: + remote_rows = db.execute( + "SELECT rp.url, rp.title, rp.note, s.name AS source_name " + "FROM remote_pages_fts rpf " + "JOIN remote_pages rp ON rpf.rowid = rp.id " + "JOIN subscriptions s ON rp.subscription_id = s.id " + "WHERE remote_pages_fts MATCH ? ORDER BY rank LIMIT 50", + (_sanitize_fts_query(q),), + ).fetchall() + except Exception: + remote_rows = [] remote_html = "" if q and remote_rows: @@ -200,6 +224,7 @@ def handle_add_form(msg=""): return _respond( f"

add url

" f'
' + f'{_csrf_field()}' f'

' f'

' f'

' @@ -271,6 +296,7 @@ def handle_edit_form(page_id, msg=""): f"

{esc(row['title'])}
" f"{esc(row['url'])}

" f'' + f'{_csrf_field()}' f'

' f'

' f'' @@ -291,6 +317,24 @@ def handle_edit_submit(page_id, body): return _redirect("/pages") +def handle_delete_confirm(page_id): + db = get_db() + row = db.execute("SELECT id, url, title FROM pages WHERE id = ?", (page_id,)).fetchone() + db.close() + if not row: + return _error(404) + return _respond( + f"

confirm delete

" + f"

Remove {esc(row['title'])}
" + f"{esc(row['url'])}

" + f'' + f'{_csrf_field()}' + f'' + f"
" + f' cancel' + ) + + def handle_delete(page_id): db = get_db() db.execute("DELETE FROM links WHERE page_id = ?", (page_id,)) @@ -325,6 +369,7 @@ def handle_import_form(msg=""): f"

import

" f"

Paste the contents of a tinyweb export file (JSON).

" f'
' + f'{_csrf_field()}' f'

' f'' f"
" @@ -372,6 +417,7 @@ def handle_style_form(msg="", query=None): f"

customize

" f"

name your search engine

" f'
' + f'{_csrf_field()}' f'

' f"

sharing

" f'