Add security hardening: CSRF, SSRF, FTS5, and DELETE via POST

- CSRF: Generate random token at startup, include as hidden field in
  all 11 POST forms, validate at top of POST dispatch (returns 403)
- SSRF: Block private/internal IP ranges (127/8, 10/8, 172.16/12,
  192.168/16, 169.254/16, ::1, fc00::/7) by resolving hostname before
  fetch. Remove verify=False from requests.get().
- DELETE: Change /delete/<id> from GET (instant delete) to GET
  (confirmation page) + POST (actual delete) to prevent accidental
  deletion from prefetchers/crawlers.
- FTS5: Wrap search input in double quotes to neutralize FTS5
  operators (AND, OR, NOT, *, column:). Add try/except fallback.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Derick Phan 2026-03-26 10:54:22 -07:00
parent 9c4ed9ac9e
commit 9ddecf71db
No known key found for this signature in database
2 changed files with 106 additions and 20 deletions

View file

@ -1,10 +1,28 @@
import json
import secrets
from datetime import datetime
from db import get_db, get_setting, set_setting, get_site_name, index_url, clean_url
from templates import esc, snippet, wrap_page, DEFAULT_TEMPLATE
from rns_client import fetch_remote_sites
_csrf_token = secrets.token_hex(32)
def _csrf_field():
return f'<input type="hidden" name="_csrf" value="{_csrf_token}">'
def _check_csrf(body):
token = body.get("_csrf", [""])[0]
return secrets.compare_digest(token, _csrf_token)
def _sanitize_fts_query(query):
"""Escape user input for safe use in FTS5 MATCH."""
escaped = query.replace('"', '""')
return f'"{escaped}"'
def _respond(body_html, status=200, use_default=False):
return {
@ -90,12 +108,15 @@ def handle_search(query):
result_html = ""
trusted_html = ""
if q:
rows = db.execute(
"SELECT p.id, p.url, p.title, p.body, p.note "
"FROM pages_fts f JOIN pages p ON f.rowid = p.id "
"WHERE pages_fts MATCH ? ORDER BY rank LIMIT 50",
(q,),
).fetchall()
try:
rows = db.execute(
"SELECT p.id, p.url, p.title, p.body, p.note "
"FROM pages_fts f JOIN pages p ON f.rowid = p.id "
"WHERE pages_fts MATCH ? ORDER BY rank LIMIT 50",
(_sanitize_fts_query(q),),
).fetchall()
except Exception:
rows = []
if rows:
for r in rows:
note_html = ""
@ -150,14 +171,17 @@ def handle_search(query):
)
# search synced pages from subscriptions
remote_rows = db.execute(
"SELECT rp.url, rp.title, rp.note, s.name AS source_name "
"FROM remote_pages_fts rpf "
"JOIN remote_pages rp ON rpf.rowid = rp.id "
"JOIN subscriptions s ON rp.subscription_id = s.id "
"WHERE remote_pages_fts MATCH ? ORDER BY rank LIMIT 50",
(q,),
).fetchall()
try:
remote_rows = db.execute(
"SELECT rp.url, rp.title, rp.note, s.name AS source_name "
"FROM remote_pages_fts rpf "
"JOIN remote_pages rp ON rpf.rowid = rp.id "
"JOIN subscriptions s ON rp.subscription_id = s.id "
"WHERE remote_pages_fts MATCH ? ORDER BY rank LIMIT 50",
(_sanitize_fts_query(q),),
).fetchall()
except Exception:
remote_rows = []
remote_html = ""
if q and remote_rows:
@ -200,6 +224,7 @@ def handle_add_form(msg=""):
return _respond(
f"<h1>add url</h1>"
f'<form method="post" action="/add">'
f'{_csrf_field()}'
f'<input name="url" placeholder="https://example.com" size="50"><br><br>'
f'<input name="note" placeholder="why are you saving this? (optional)" size="50"><br><br>'
f'<input name="tags" placeholder="tags (comma-separated, e.g. solarpunk, mesh)" size="50"><br><br>'
@ -271,6 +296,7 @@ def handle_edit_form(page_id, msg=""):
f"<p><b>{esc(row['title'])}</b><br>"
f"<small>{esc(row['url'])}</small></p>"
f'<form method="post" action="/edit/{row["id"]}">'
f'{_csrf_field()}'
f'<input name="note" value="{esc(row["note"])}" placeholder="why did you save this?" size="50"><br><br>'
f'<input name="tags" value="{esc(tags)}" placeholder="tags (comma-separated)" size="50"><br><br>'
f'<button type="submit">save</button>'
@ -291,6 +317,24 @@ def handle_edit_submit(page_id, body):
return _redirect("/pages")
def handle_delete_confirm(page_id):
db = get_db()
row = db.execute("SELECT id, url, title FROM pages WHERE id = ?", (page_id,)).fetchone()
db.close()
if not row:
return _error(404)
return _respond(
f"<h1>confirm delete</h1>"
f"<p>Remove <b>{esc(row['title'])}</b><br>"
f"<small>{esc(row['url'])}</small></p>"
f'<form method="post" action="/delete/{row["id"]}">'
f'{_csrf_field()}'
f'<button type="submit">yes, delete</button>'
f"</form>"
f' <a href="/pages">cancel</a>'
)
def handle_delete(page_id):
db = get_db()
db.execute("DELETE FROM links WHERE page_id = ?", (page_id,))
@ -325,6 +369,7 @@ def handle_import_form(msg=""):
f"<h1>import</h1>"
f"<p>Paste the contents of a tinyweb export file (JSON).</p>"
f'<form method="post" action="/import">'
f'{_csrf_field()}'
f'<textarea name="data" rows="12" cols="60" placeholder=\'[{{"url": "...", "note": "..."}}]\'></textarea><br><br>'
f'<button type="submit">import</button>'
f"</form>"
@ -372,6 +417,7 @@ def handle_style_form(msg="", query=None):
f"<h1>customize</h1>"
f"<h2>name your search engine</h2>"
f'<form method="post" action="/style">'
f'{_csrf_field()}'
f'<input name="site_name" value="{esc(name)}" placeholder="tinyweb" size="30"><br><br>'
f"<h2>sharing</h2>"
f'<label><input type="checkbox" name="sharing_enabled" value="1"{checked}>'
@ -530,14 +576,14 @@ def handle_subscriptions(msg=""):
f'<td>{esc(last)}</td>'
f'<td>'
f'<form method="post" action="/subscriptions/autosync/{s["id"]}" style="display:inline">'
f'<button>auto-sync: {auto_label}</button></form>'
f'{_csrf_field()}<button>auto-sync: {auto_label}</button></form>'
f'</td>'
f'<td>'
f'<a href="/subscriptions/browse/{s["id"]}">browse</a> '
f'<form method="post" action="/subscriptions/sync/{s["id"]}" style="display:inline">'
f'<button>sync now</button></form> '
f'{_csrf_field()}<button>sync now</button></form> '
f'<form method="post" action="/subscriptions/delete/{s["id"]}" style="display:inline">'
f'<button>remove</button></form>'
f'{_csrf_field()}<button>remove</button></form>'
f'</td>'
f'</tr>'
)
@ -547,11 +593,12 @@ def handle_subscriptions(msg=""):
f'<table><tr><th>instance</th><th>last sync</th><th>auto-sync</th><th>actions</th></tr>'
f'{items}</table>'
f'<form method="post" action="/subscriptions/syncall">'
f'<button>sync all</button></form>'
f'{_csrf_field()}<button>sync all</button></form>'
)
return _respond(
f"<h1>subscriptions</h1>"
f'<form method="post" action="/subscriptions/add">'
f'{_csrf_field()}'
f'<input name="dest_hash" placeholder="destination hash" size="40"> '
f'<button>subscribe</button>'
f'</form>'
@ -646,6 +693,7 @@ def handle_subscription_browse(sub_id):
f'<h1>browsing: {esc(sub["name"] or sub["dest_hash"])}</h1>'
f'<p>{len(sites)} site(s) available, {new_count} new</p>'
f'<form method="post" action="/subscriptions/pick">'
f'{_csrf_field()}'
f'<input type="hidden" name="sub_id" value="{sub_id}">'
f'<ul>{new_items}</ul>'
f'{buttons}'
@ -811,7 +859,7 @@ def dispatch_request(data):
return handle_edit_form(pid) if pid is not None else _error(400)
elif path.startswith("/delete/"):
pid = extract_id("/delete/")
return handle_delete(pid) if pid is not None else _error(400)
return handle_delete_confirm(pid) if pid is not None else _error(400)
elif path == "/bookmark":
return handle_bookmark(query)
elif path == "/style":
@ -835,11 +883,16 @@ def dispatch_request(data):
sid = extract_id("/subscriptions/browse/")
return handle_subscription_browse(sid) if sid is not None else _error(400)
elif method == "POST":
if not _check_csrf(body):
return _respond("<h1>403 Forbidden</h1><p>Invalid or missing CSRF token.</p>", status=403)
if path == "/add":
return handle_add_submit(body)
elif path.startswith("/edit/"):
pid = extract_id("/edit/")
return handle_edit_submit(pid, body) if pid is not None else _error(400)
elif path.startswith("/delete/"):
pid = extract_id("/delete/")
return handle_delete(pid) if pid is not None else _error(400)
elif path == "/style":
return handle_style_submit(body)
elif path == "/import":