From f2e8dd042a29bda47068de4809b9d71ec27e066a Mon Sep 17 00:00:00 2001 From: Derick Phan Date: Thu, 26 Mar 2026 12:00:43 -0700 Subject: [PATCH] Add WAL mode, connection pooling, pagination, and delta sync WAL + pooling: - Enable WAL journal mode for concurrent read/write support - Add connection pool (size 4) with return_db() to reuse connections instead of opening/closing on every request Pagination: - Search results, /pages, and /tags/ now paginate at 50 per page - Prev/next navigation links appear when results exceed one page Delta sync: - Pages table gains last_modified timestamp, set on insert/update - /api/sites accepts ?since= param to return only changed pages - Subscription sync uses last_sync timestamp for incremental fetches - Remote pages upserted instead of delete-all/re-insert - Full sync includes all_urls list for detecting remote deletions Co-Authored-By: Claude Opus 4.6 --- db.py | 69 ++++++++++++++----- handlers.py | 186 ++++++++++++++++++++++++++++++++++++-------------- rns_client.py | 7 +- 3 files changed, 193 insertions(+), 69 deletions(-) diff --git a/db.py b/db.py index 72128a3..0d32b3a 100644 --- a/db.py +++ b/db.py @@ -77,13 +77,35 @@ def clean_url(url): return urlunparse((scheme, netloc, path, "", new_query, "")) +_pool = [] +_pool_lock = __import__("threading").Lock() +_POOL_SIZE = 4 + + def get_db(): - db = sqlite3.connect(DATABASE) + with _pool_lock: + if _pool: + db = _pool.pop() + try: + db.execute("SELECT 1") + return db + except Exception: + pass + db = sqlite3.connect(DATABASE, timeout=10) + db.execute("PRAGMA journal_mode=WAL") db.execute("PRAGMA foreign_keys = ON") db.row_factory = sqlite3.Row return db +def return_db(db): + with _pool_lock: + if len(_pool) < _POOL_SIZE: + _pool.append(db) + else: + db.close() + + def init_db(): db = sqlite3.connect(DATABASE) db.execute( @@ -92,7 +114,8 @@ def init_db(): " url TEXT UNIQUE NOT NULL," " title TEXT," " body TEXT," - " note TEXT DEFAULT ''" + " note TEXT DEFAULT ''," + " last_modified TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%S','now'))" ")" ) db.execute( @@ -196,26 +219,38 @@ def init_db(): db.execute("ALTER TABLE remote_pages ADD COLUMN tags TEXT DEFAULT ''") db.commit() + # Migrate pages: add last_modified column if missing + page_cols = [row[1] for row in db.execute("PRAGMA table_info(pages)").fetchall()] + if "last_modified" not in page_cols: + db.execute("ALTER TABLE pages ADD COLUMN last_modified TEXT DEFAULT ''") + db.execute("UPDATE pages SET last_modified = strftime('%Y-%m-%dT%H:%M:%S','now') WHERE last_modified = ''") + db.commit() + + db.execute("PRAGMA journal_mode=WAL") db.commit() db.close() def get_setting(key, default=""): db = get_db() - row = db.execute("SELECT value FROM settings WHERE key = ?", (key,)).fetchone() - db.close() - return row["value"] if row else default + try: + row = db.execute("SELECT value FROM settings WHERE key = ?", (key,)).fetchone() + return row["value"] if row else default + finally: + return_db(db) def set_setting(key, value): db = get_db() - db.execute( - "INSERT INTO settings (key, value) VALUES (?, ?) " - "ON CONFLICT(key) DO UPDATE SET value=excluded.value", - (key, value), - ) - db.commit() - db.close() + try: + db.execute( + "INSERT INTO settings (key, value) VALUES (?, ?) " + "ON CONFLICT(key) DO UPDATE SET value=excluded.value", + (key, value), + ) + db.commit() + finally: + return_db(db) def get_site_name(): @@ -273,10 +308,12 @@ def index_url(url, note=""): title, body, links = fetch_page(url) db = get_db() try: + now = __import__("datetime").datetime.now().strftime("%Y-%m-%dT%H:%M:%S") db.execute( - "INSERT INTO pages (url, title, body, note) VALUES (?, ?, ?, ?) " - "ON CONFLICT(url) DO UPDATE SET title=excluded.title, body=excluded.body, note=excluded.note", - (url, title, body, note), + "INSERT INTO pages (url, title, body, note, last_modified) VALUES (?, ?, ?, ?, ?) " + "ON CONFLICT(url) DO UPDATE SET title=excluded.title, body=excluded.body, " + "note=excluded.note, last_modified=excluded.last_modified", + (url, title, body, note, now), ) page_id = db.execute("SELECT id FROM pages WHERE url = ?", (url,)).fetchone()[0] db.execute("DELETE FROM links WHERE page_id = ?", (page_id,)) @@ -287,5 +324,5 @@ def index_url(url, note=""): ) db.commit() finally: - db.close() + return_db(db) return title diff --git a/handlers.py b/handlers.py index c47f670..eeb9b29 100644 --- a/handlers.py +++ b/handlers.py @@ -4,7 +4,7 @@ import threading from datetime import datetime from urllib.parse import unquote -from db import get_db, get_setting, set_setting, get_site_name, index_url, clean_url +from db import get_db, return_db, get_setting, set_setting, get_site_name, index_url, clean_url from templates import esc, snippet, wrap_page, DEFAULT_TEMPLATE from rns_client import fetch_remote_sites @@ -83,6 +83,31 @@ def _error(status): return _respond(f"

{status}

", status) +PER_PAGE = 50 + + +def _paginate(query, key="p"): + try: + page = int(query.get(key, ["1"])[0]) + except (ValueError, IndexError): + page = 1 + return max(1, page) + + +def _page_nav(page, total, base_url): + if total <= PER_PAGE: + return "" + total_pages = (total + PER_PAGE - 1) // PER_PAGE + sep = "&" if "?" in base_url else "?" + parts = [] + if page > 1: + parts.append(f'« prev') + parts.append(f"page {page} of {total_pages}") + if page < total_pages: + parts.append(f'next »') + return f'

{" | ".join(parts)}

' + + # --- Tag helpers --- @@ -96,7 +121,7 @@ def _get_page_tags(page_id, db=None): "WHERE pt.page_id = ? ORDER BY t.name", (page_id,) ).fetchall() if close: - db.close() + return_db(db) return [r["name"] for r in rows] @@ -112,7 +137,7 @@ def _set_page_tags(page_id, tag_string, db=None): db.execute("INSERT OR IGNORE INTO page_tags (page_id, tag_id) VALUES (?, ?)", (page_id, tag_id)) if close: db.commit() - db.close() + return_db(db) # --- Route handlers --- @@ -120,6 +145,8 @@ def _set_page_tags(page_id, tag_string, db=None): def handle_search(query): q = query.get("q", [""])[0].strip() + page = _paginate(query) + offset = (page - 1) * PER_PAGE db = get_db() try: count = db.execute("SELECT count(*) FROM pages").fetchone()[0] @@ -129,14 +156,19 @@ def handle_search(query): trusted_html = "" if q: try: + total_results = db.execute( + "SELECT count(*) FROM pages_fts WHERE pages_fts MATCH ?", + (_sanitize_fts_query(q),), + ).fetchone()[0] rows = db.execute( "SELECT p.id, p.url, p.title, p.body, p.note " "FROM pages_fts f JOIN pages p ON f.rowid = p.id " - "WHERE pages_fts MATCH ? ORDER BY rank LIMIT 50", - (_sanitize_fts_query(q),), + "WHERE pages_fts MATCH ? ORDER BY rank LIMIT ? OFFSET ?", + (_sanitize_fts_query(q), PER_PAGE, offset), ).fetchall() except Exception: rows = [] + total_results = 0 if rows: for r in rows: note_html = "" @@ -225,7 +257,7 @@ def handle_search(query): f'' ) finally: - db.close() + return_db(db) sub_count = "" if q and remote_rows: sub_count = f" + {len(remote_rows)} from subscriptions" @@ -236,7 +268,9 @@ def handle_search(query): f'' f'

{count} pages indexed' f' · + add url

' - f'{result_html}{trusted_html}{remote_html}' + f'{result_html}' + f'{_page_nav(page, total_results, f"/?q={esc(q)}") if q else ""}' + f'{trusted_html}{remote_html}' ) @@ -273,7 +307,7 @@ def handle_add_submit(body): _set_page_tags(row["id"], tags, db) db.commit() finally: - db.close() + return_db(db) return handle_add_form(f'Indexed: {esc(title)}') except ValueError as e: return handle_add_form(f"Error: {esc(str(e))}") @@ -281,10 +315,16 @@ def handle_add_submit(body): return handle_add_form("Error: could not fetch or index that URL.") -def handle_pages(): +def handle_pages(query=None): + page = _paginate(query or {}) + offset = (page - 1) * PER_PAGE db = get_db() try: - rows = db.execute("SELECT id, url, title, note FROM pages ORDER BY id DESC").fetchall() + total = db.execute("SELECT count(*) FROM pages").fetchone()[0] + rows = db.execute( + "SELECT id, url, title, note FROM pages ORDER BY id DESC LIMIT ? OFFSET ?", + (PER_PAGE, offset), + ).fetchall() items = "" for r in rows: note_html = f' — {esc(r["note"])}' if r["note"] else "" @@ -300,10 +340,11 @@ def handle_pages(): f'remove' ) finally: - db.close() + return_db(db) return _respond( - f"

indexed pages ({len(rows)})

" + f"

indexed pages ({total})

" f"
    {items}
" + f'{_page_nav(page, total, "/pages")}' f'

export | import

' f'back' ) @@ -317,7 +358,7 @@ def handle_edit_form(page_id, msg=""): return _error(404) tags = ", ".join(_get_page_tags(page_id, db)) finally: - db.close() + return_db(db) return _respond( f"

edit page

" f"

{esc(row['title'])}
" @@ -342,7 +383,7 @@ def handle_edit_submit(page_id, body): _set_page_tags(page_id, tags, db) db.commit() finally: - db.close() + return_db(db) return _redirect("/pages") @@ -351,7 +392,7 @@ def handle_delete_confirm(page_id): try: row = db.execute("SELECT id, url, title FROM pages WHERE id = ?", (page_id,)).fetchone() finally: - db.close() + return_db(db) if not row: return _error(404) return _respond( @@ -374,7 +415,7 @@ def handle_delete(page_id): db.execute("DELETE FROM pages WHERE id = ?", (page_id,)) db.commit() finally: - db.close() + return_db(db) return _redirect("/pages") @@ -399,7 +440,7 @@ def handle_export(): try: rows = db.execute("SELECT url, title, note FROM pages ORDER BY id").fetchall() finally: - db.close() + return_db(db) data = [{"url": r["url"], "title": r["title"], "note": r["note"]} for r in rows] return _json_response(data, headers={"Content-Disposition": "attachment; filename=tinyweb-export.json"}) @@ -503,7 +544,7 @@ def handle_about(): tag_count = db.execute("SELECT count(DISTINCT tag_id) FROM page_tags").fetchone()[0] sub_count = db.execute("SELECT count(*) FROM subscriptions").fetchone()[0] finally: - db.close() + return_db(db) sharing_html = ( '

This instance shares its index publicly. Subscribe to join the network.

' @@ -556,7 +597,7 @@ def handle_tags(): "GROUP BY t.id ORDER BY t.name" ).fetchall() finally: - db.close() + return_db(db) items = "" for r in rows: items += f'
  • {esc(r["name"])} ({r["cnt"]})
  • ' @@ -567,15 +608,21 @@ def handle_tags(): ) -def handle_tag_browse(tag_name): +def handle_tag_browse(tag_name, query=None): + page = _paginate(query or {}) + offset = (page - 1) * PER_PAGE db = get_db() try: + total = db.execute( + "SELECT count(*) FROM page_tags pt JOIN tags t ON t.id = pt.tag_id WHERE t.name = ?", + (tag_name,), + ).fetchone()[0] rows = db.execute( "SELECT p.id, p.url, p.title, p.note FROM pages p " "JOIN page_tags pt ON p.id = pt.page_id " "JOIN tags t ON t.id = pt.tag_id " - "WHERE t.name = ? ORDER BY p.id DESC", - (tag_name,), + "WHERE t.name = ? ORDER BY p.id DESC LIMIT ? OFFSET ?", + (tag_name, PER_PAGE, offset), ).fetchall() items = "" for r in rows: @@ -587,32 +634,48 @@ def handle_tag_browse(tag_name): f'({esc(r["url"])})' ) finally: - db.close() + return_db(db) return _respond( f'

    tag: {esc(tag_name)}

    ' - f'

    {len(rows)} page(s)

    ' + f'

    {total} page(s)

    ' f'
      {items}
    ' + f'{_page_nav(page, total, f"/tags/{esc(tag_name)}")}' f'all tags | back' ) -def handle_api_sites(): +def handle_api_sites(query=None): if get_setting("sharing_enabled", "0") != "1": return _json_response( {"error": "sharing disabled"}, status=403, headers={"Access-Control-Allow-Origin": "*"}, ) + since = (query or {}).get("since", [""])[0].strip() db = get_db() try: - rows = db.execute("SELECT id, url, title, note FROM pages ORDER BY id DESC").fetchall() + if since: + rows = db.execute( + "SELECT id, url, title, note, last_modified FROM pages " + "WHERE last_modified > ? ORDER BY id DESC", + (since,), + ).fetchall() + else: + rows = db.execute("SELECT id, url, title, note, last_modified FROM pages ORDER BY id DESC").fetchall() sites = [] for r in rows: tags = _get_page_tags(r["id"], db) - sites.append({"url": r["url"], "title": r["title"], "note": r["note"], "tags": tags}) + sites.append({ + "url": r["url"], "title": r["title"], "note": r["note"], + "tags": tags, "last_modified": r["last_modified"] or "", + }) + # Include list of all current URLs so subscriber can detect deletions + all_urls = [r["url"] for r in db.execute("SELECT url FROM pages").fetchall()] if not since else None finally: - db.close() + return_db(db) data = {"name": get_site_name(), "sites": sites} + if all_urls is not None: + data["all_urls"] = all_urls return _json_response(data, headers={"Access-Control-Allow-Origin": "*"}) @@ -621,7 +684,7 @@ def handle_subscriptions(msg=""): try: subs = db.execute("SELECT * FROM subscriptions ORDER BY id DESC").fetchall() finally: - db.close() + return_db(db) items = "" for s in subs: auto_label = "on" if s["auto_sync"] else "off" @@ -688,7 +751,7 @@ def handle_subscription_add(body): ) db.commit() finally: - db.close() + return_db(db) return handle_subscriptions(f"Subscribed to {esc(name or dest_hash)}.") @@ -706,7 +769,7 @@ def handle_subscription_browse(sub_id): (sub_id,), ).fetchall() finally: - db.close() + return_db(db) if remote_rows: sites = [] @@ -778,7 +841,7 @@ def handle_subscription_pick(body): else: urls = body.get("urls", []) finally: - db.close() + return_db(db) if not urls: return handle_subscriptions("No sites selected.") @@ -798,7 +861,7 @@ def handle_subscription_pick(body): _set_page_tags(row["id"], tags_str, db) db.commit() finally: - db.close() + return_db(db) imported += 1 except Exception: errors += 1 @@ -811,33 +874,46 @@ def handle_subscription_sync(sub_id): sub = db.execute("SELECT * FROM subscriptions WHERE id = ?", (sub_id,)).fetchone() if not sub: return handle_subscriptions("Subscription not found.") + # Use last_sync for delta sync if available + since = sub["last_sync"].replace(" ", "T") if sub["last_sync"] else "" try: - data = fetch_remote_sites(sub["dest_hash"]) + data = fetch_remote_sites(sub["dest_hash"], since=since) sites = data.get("sites", []) + all_urls = data.get("all_urls") remote_name = data.get("name", sub["name"]) except PermissionError: return handle_subscriptions("That instance has sharing disabled.") except Exception: return handle_subscriptions("Could not sync with that instance.") - # Clear old remote pages for this subscription and re-insert - db.execute("DELETE FROM remote_pages WHERE subscription_id = ?", (sub_id,)) + # If full sync (all_urls provided), remove pages no longer on remote + if all_urls is not None: + existing = db.execute( + "SELECT id, url FROM remote_pages WHERE subscription_id = ?", (sub_id,) + ).fetchall() + remote_url_set = set(all_urls) + for row in existing: + if row["url"] not in remote_url_set: + db.execute("DELETE FROM remote_pages WHERE id = ?", (row["id"],)) + + # Upsert changed/new pages synced = 0 for s in sites: try: tags_str = ",".join(s.get("tags", [])) db.execute( - "INSERT INTO remote_pages (subscription_id, url, title, note, tags) VALUES (?, ?, ?, ?, ?)", + "INSERT INTO remote_pages (subscription_id, url, title, note, tags) VALUES (?, ?, ?, ?, ?) " + "ON CONFLICT(subscription_id, url) DO UPDATE SET title=excluded.title, note=excluded.note, tags=excluded.tags", (sub_id, s["url"], s["title"], s.get("note", ""), tags_str), ) synced += 1 except Exception: pass - now = datetime.now().strftime("%Y-%m-%d %H:%M") + now = datetime.now().strftime("%Y-%m-%dT%H:%M:%S") db.execute("UPDATE subscriptions SET last_sync = ?, name = ? WHERE id = ?", (now, remote_name, sub_id)) db.commit() finally: - db.close() + return_db(db) return handle_subscriptions(f"Synced {synced} site(s) from {esc(remote_name)}.") @@ -847,7 +923,7 @@ def handle_subscription_autosync(sub_id): db.execute("UPDATE subscriptions SET auto_sync = 1 - auto_sync WHERE id = ?", (sub_id,)) db.commit() finally: - db.close() + return_db(db) return _redirect("/subscriptions") @@ -858,7 +934,7 @@ def handle_subscription_delete(sub_id): db.execute("DELETE FROM subscriptions WHERE id = ?", (sub_id,)) db.commit() finally: - db.close() + return_db(db) return _redirect("/subscriptions") @@ -867,32 +943,42 @@ def handle_subscription_syncall(): try: subs = db.execute("SELECT * FROM subscriptions WHERE auto_sync = 1").fetchall() finally: - db.close() + return_db(db) if not subs: return handle_subscriptions("No subscriptions have auto-sync enabled.") total = 0 for sub in subs: try: - data = fetch_remote_sites(sub["dest_hash"]) + since = sub["last_sync"].replace(" ", "T") if sub["last_sync"] else "" + data = fetch_remote_sites(sub["dest_hash"], since=since) sites = data.get("sites", []) + all_urls = data.get("all_urls") remote_name = data.get("name", sub["name"]) db = get_db() try: - db.execute("DELETE FROM remote_pages WHERE subscription_id = ?", (sub["id"],)) + if all_urls is not None: + existing = db.execute( + "SELECT id, url FROM remote_pages WHERE subscription_id = ?", (sub["id"],) + ).fetchall() + remote_url_set = set(all_urls) + for row in existing: + if row["url"] not in remote_url_set: + db.execute("DELETE FROM remote_pages WHERE id = ?", (row["id"],)) for s in sites: try: tags_str = ",".join(s.get("tags", [])) db.execute( - "INSERT INTO remote_pages (subscription_id, url, title, note, tags) VALUES (?, ?, ?, ?, ?)", + "INSERT INTO remote_pages (subscription_id, url, title, note, tags) VALUES (?, ?, ?, ?, ?) " + "ON CONFLICT(subscription_id, url) DO UPDATE SET title=excluded.title, note=excluded.note, tags=excluded.tags", (sub["id"], s["url"], s["title"], s.get("note", ""), tags_str), ) except Exception: pass - now = datetime.now().strftime("%Y-%m-%d %H:%M") + now = datetime.now().strftime("%Y-%m-%dT%H:%M:%S") db.execute("UPDATE subscriptions SET last_sync = ?, name = ? WHERE id = ?", (now, remote_name, sub["id"])) db.commit() finally: - db.close() + return_db(db) total += 1 except Exception: pass @@ -921,7 +1007,7 @@ def _dispatch_inner(data): elif path == "/add": return handle_add_form() elif path == "/pages": - return handle_pages() + return handle_pages(query) elif path.startswith("/edit/"): pid = extract_id("/edit/") return handle_edit_form(pid) if pid is not None else _error(400) @@ -942,9 +1028,9 @@ def _dispatch_inner(data): return handle_tags() elif path.startswith("/tags/"): tag_name = unquote(path[len("/tags/"):]) - return handle_tag_browse(tag_name) if tag_name else _error(400) + return handle_tag_browse(tag_name, query) if tag_name else _error(400) elif path == "/api/sites": - return handle_api_sites() + return handle_api_sites(query) elif path == "/subscriptions": return handle_subscriptions() elif path.startswith("/subscriptions/browse/"): diff --git a/rns_client.py b/rns_client.py index 32eeadc..dbc0af5 100644 --- a/rns_client.py +++ b/rns_client.py @@ -6,11 +6,11 @@ ASPECTS = ["server"] REQUEST_TIMEOUT = 30 -def fetch_remote_sites(dest_hash_hex): +def fetch_remote_sites(dest_hash_hex, since=""): """ Connect to a remote TinyWeb instance over Reticulum and fetch its shared sites. Returns the response dict from /api/sites, or raises - an exception on failure. + an exception on failure. Pass `since` as ISO timestamp for delta sync. """ dest_hash = bytes.fromhex(dest_hash_hex) @@ -48,10 +48,11 @@ def fetch_remote_sites(dest_hash_hex): try: # Request /api/sites + query = {"since": [since]} if since else {} request_data = { "method": "GET", "path": "/api/sites", - "query": {}, + "query": query, "body": {}, "gateway_host": "", }