tinyweb-forum/tinyweb_forum/db.py

530 lines
19 KiB
Python

import sqlite3
import os
import threading
from datetime import datetime, timedelta
FORUM_DB = "forum.db"
class ForumDB:
def __init__(self, data_dir):
self.path = os.path.join(data_dir, FORUM_DB)
self._pool = []
self._pool_lock = threading.Lock()
self._POOL_SIZE = 8
self.init_db()
def init_db(self):
os.makedirs(os.path.dirname(self.path), exist_ok=True)
db = sqlite3.connect(self.path)
db.execute(
"CREATE TABLE IF NOT EXISTS threads ("
" id TEXT PRIMARY KEY,"
" title TEXT NOT NULL,"
" url TEXT DEFAULT '',"
" body TEXT DEFAULT '',"
" tags TEXT DEFAULT '',"
" author_instance TEXT NOT NULL,"
" author_name TEXT DEFAULT '',"
" created_at TEXT NOT NULL,"
" updated_at TEXT NOT NULL,"
" score INTEGER DEFAULT 0"
")"
)
db.execute(
"CREATE TABLE IF NOT EXISTS posts ("
" id TEXT PRIMARY KEY,"
" thread_id TEXT NOT NULL,"
" parent_id TEXT DEFAULT '',"
" body TEXT NOT NULL,"
" author_instance TEXT NOT NULL,"
" author_name TEXT DEFAULT '',"
" created_at TEXT NOT NULL,"
" FOREIGN KEY (thread_id) REFERENCES threads(id)"
")"
)
db.execute(
"CREATE TABLE IF NOT EXISTS upvotes ("
" thread_id TEXT NOT NULL,"
" instance_hash TEXT NOT NULL,"
" PRIMARY KEY (thread_id, instance_hash)"
")"
)
db.execute(
"CREATE TABLE IF NOT EXISTS synced_instances ("
" instance_hash TEXT PRIMARY KEY,"
" name TEXT DEFAULT '',"
" last_sync TEXT DEFAULT '',"
" status TEXT DEFAULT 'active'"
")"
)
try:
db.execute("ALTER TABLE synced_instances ADD COLUMN status TEXT DEFAULT 'active'")
except Exception:
pass
db.execute(
"CREATE TABLE IF NOT EXISTS settings ("
" key TEXT PRIMARY KEY,"
" value TEXT"
")"
)
db.execute("CREATE INDEX IF NOT EXISTS idx_posts_thread ON posts(thread_id)")
db.execute("CREATE INDEX IF NOT EXISTS idx_posts_created ON posts(created_at)")
db.execute("CREATE INDEX IF NOT EXISTS idx_threads_updated ON threads(updated_at)")
db.execute("CREATE INDEX IF NOT EXISTS idx_threads_created ON threads(created_at)")
db.execute(
"CREATE TABLE IF NOT EXISTS peer_blocks ("
" peer_hash TEXT NOT NULL,"
" blocked_hash TEXT NOT NULL,"
" PRIMARY KEY (peer_hash, blocked_hash)"
")"
)
db.execute(
"CREATE TABLE IF NOT EXISTS retracted_content ("
" content_id TEXT NOT NULL,"
" content_type TEXT NOT NULL,"
" author_instance TEXT NOT NULL,"
" retracted_at TEXT NOT NULL,"
" PRIMARY KEY (content_id, content_type)"
")"
)
db.commit()
db.close()
def get_db(self):
with self._pool_lock:
if self._pool:
db = self._pool.pop()
try:
db.execute("SELECT 1")
return db
except Exception:
pass
db = sqlite3.connect(self.path, timeout=10)
db.execute("PRAGMA journal_mode=WAL")
db.row_factory = sqlite3.Row
return db
def return_db(self, db):
try:
db.rollback()
except Exception:
try:
db.close()
except Exception:
pass
return
with self._pool_lock:
if len(self._pool) < self._POOL_SIZE:
self._pool.append(db)
else:
db.close()
def get_setting(self, key, default=""):
db = self.get_db()
try:
row = db.execute("SELECT value FROM settings WHERE key = ?", (key,)).fetchone()
return row["value"] if row else default
finally:
self.return_db(db)
def set_setting(self, key, value):
db = self.get_db()
try:
db.execute(
"INSERT INTO settings (key, value) VALUES (?, ?) "
"ON CONFLICT(key) DO UPDATE SET value=excluded.value",
(key, value),
)
db.commit()
finally:
self.return_db(db)
def get_thread(self, thread_id):
db = self.get_db()
try:
return db.execute(
"SELECT * FROM threads WHERE id = ?", (thread_id,)
).fetchone()
finally:
self.return_db(db)
def get_posts(self, thread_id):
db = self.get_db()
try:
return db.execute(
"SELECT * FROM posts WHERE thread_id = ? ORDER BY created_at ASC",
(thread_id,),
).fetchall()
finally:
self.return_db(db)
def get_threads(self, page=1, per_page=20, tag="", search=""):
db = self.get_db()
try:
offset = (page - 1) * per_page
params = []
where = []
if tag:
where.append("t.tags LIKE ?")
params.append(f"%{tag}%")
if search:
where.append("(t.title LIKE ? OR t.body LIKE ?)")
params.extend([f"%{search}%", f"%{search}%"])
where_clause = (" WHERE " + " AND ".join(where)) if where else ""
total = db.execute(
f"SELECT count(*) FROM threads t{where_clause}", params
).fetchone()[0]
rows = db.execute(
f"SELECT t.*, (SELECT count(*) FROM posts p WHERE p.thread_id = t.id) AS reply_count "
f"FROM threads t{where_clause} ORDER BY t.updated_at DESC LIMIT ? OFFSET ?",
params + [per_page, offset],
).fetchall()
return rows, total
finally:
self.return_db(db)
def create_thread(self, thread_id, title, url, body, tags, author_instance, author_name, now):
db = self.get_db()
try:
db.execute(
"INSERT INTO threads (id, title, url, body, tags, author_instance, author_name, created_at, updated_at) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(thread_id, title, url, body, tags, author_instance, author_name, now, now),
)
db.commit()
finally:
self.return_db(db)
def create_post(self, post_id, thread_id, parent_id, body, author_instance, author_name, now):
db = self.get_db()
try:
db.execute(
"INSERT INTO posts (id, thread_id, parent_id, body, author_instance, author_name, created_at) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(post_id, thread_id, parent_id, body, author_instance, author_name, now),
)
db.execute("UPDATE threads SET updated_at = ? WHERE id = ?", (now, thread_id))
db.commit()
finally:
self.return_db(db)
def toggle_upvote(self, thread_id, instance_hash):
db = self.get_db()
try:
row = db.execute(
"SELECT 1 FROM upvotes WHERE thread_id = ? AND instance_hash = ?",
(thread_id, instance_hash),
).fetchone()
if row:
db.execute(
"DELETE FROM upvotes WHERE thread_id = ? AND instance_hash = ?",
(thread_id, instance_hash),
)
db.execute("UPDATE threads SET score = score - 1 WHERE id = ?", (thread_id,))
delta = -1
else:
db.execute(
"INSERT INTO upvotes (thread_id, instance_hash) VALUES (?, ?)",
(thread_id, instance_hash),
)
db.execute("UPDATE threads SET score = score + 1 WHERE id = ?", (thread_id,))
delta = 1
db.commit()
return delta
finally:
self.return_db(db)
def has_upvoted(self, thread_id, instance_hash):
db = self.get_db()
try:
return db.execute(
"SELECT 1 FROM upvotes WHERE thread_id = ? AND instance_hash = ?",
(thread_id, instance_hash),
).fetchone() is not None
finally:
self.return_db(db)
def get_synced_instances(self):
db = self.get_db()
try:
return db.execute("SELECT * FROM synced_instances").fetchall()
finally:
self.return_db(db)
def add_known_peer(self, instance_hash):
"""Add a discovered peer to the sync list (auto-discovery)."""
db = self.get_db()
try:
db.execute(
"INSERT OR IGNORE INTO synced_instances (instance_hash) VALUES (?)",
(instance_hash,),
)
db.commit()
finally:
self.return_db(db)
def get_all_known_hashes(self):
"""Get all known instance hashes for peer discovery gossip."""
db = self.get_db()
try:
return [r["instance_hash"] for r in db.execute(
"SELECT instance_hash FROM synced_instances"
).fetchall()]
finally:
self.return_db(db)
def upsert_synced_instance(self, instance_hash, name=""):
db = self.get_db()
try:
db.execute(
"INSERT INTO synced_instances (instance_hash, name) VALUES (?, ?) "
"ON CONFLICT(instance_hash) DO UPDATE SET name=excluded.name",
(instance_hash, name),
)
db.commit()
finally:
self.return_db(db)
def remove_synced_instance(self, instance_hash):
db = self.get_db()
try:
db.execute("DELETE FROM synced_instances WHERE instance_hash = ?", (instance_hash,))
db.commit()
finally:
self.return_db(db)
def update_last_sync(self, instance_hash, now):
db = self.get_db()
try:
db.execute(
"UPDATE synced_instances SET last_sync = ? WHERE instance_hash = ?",
(now, instance_hash),
)
db.commit()
finally:
self.return_db(db)
def set_last_sync(self, instance_hash, timestamp):
self.update_last_sync(instance_hash, timestamp)
def get_new_content(self, since):
db = self.get_db()
try:
threads = db.execute(
"SELECT * FROM threads WHERE updated_at > ? ORDER BY updated_at ASC",
(since,),
).fetchall()
posts = db.execute(
"SELECT * FROM posts WHERE created_at > ? ORDER BY created_at ASC",
(since,),
).fetchall()
upvote_threads = db.execute(
"SELECT thread_id FROM upvotes u "
"WHERE NOT EXISTS (SELECT 1 FROM threads t WHERE t.id = u.thread_id AND t.updated_at > ?)",
(since,),
).fetchall()
return threads, posts, [r["thread_id"] for r in upvote_threads]
finally:
self.return_db(db)
def update_thread(self, thread_id, title, url, body, tags, now):
db = self.get_db()
try:
db.execute(
"UPDATE threads SET title=?, url=?, body=?, tags=?, updated_at=? WHERE id=?",
(title, url, body, tags, now, thread_id),
)
db.commit()
finally:
self.return_db(db)
def merge_thread(self, thread):
db = self.get_db()
try:
existing = db.execute("SELECT updated_at FROM threads WHERE id = ?", (thread["id"],)).fetchone()
if existing and existing["updated_at"] >= thread["updated_at"]:
return
db.execute(
"INSERT INTO threads (id, title, url, body, tags, author_instance, author_name, created_at, updated_at, score) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) "
"ON CONFLICT(id) DO UPDATE SET "
"title=excluded.title, url=excluded.url, body=excluded.body, "
"tags=excluded.tags, updated_at=excluded.updated_at",
(thread["id"], thread["title"], thread["url"], thread["body"],
thread["tags"], thread["author_instance"], thread["author_name"],
thread["created_at"], thread["updated_at"], thread["score"]),
)
db.commit()
finally:
self.return_db(db)
def merge_post(self, post):
db = self.get_db()
try:
db.execute(
"INSERT OR IGNORE INTO posts (id, thread_id, parent_id, body, author_instance, author_name, created_at) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(post["id"], post["thread_id"], post["parent_id"], post["body"],
post["author_instance"], post["author_name"], post["created_at"]),
)
db.commit()
finally:
self.return_db(db)
def merge_upvote(self, thread_id, instance_hash):
db = self.get_db()
try:
db.execute(
"INSERT OR IGNORE INTO upvotes (thread_id, instance_hash) VALUES (?, ?)",
(thread_id, instance_hash),
)
if db.total_changes:
db.execute("UPDATE threads SET score = score + 1 WHERE id = ?", (thread_id,))
db.commit()
finally:
self.return_db(db)
def record_peer_block(self, peer_hash, blocked_hash):
db = self.get_db()
try:
db.execute(
"INSERT OR IGNORE INTO peer_blocks (peer_hash, blocked_hash) VALUES (?, ?)",
(peer_hash, blocked_hash),
)
db.commit()
finally:
self.return_db(db)
def get_peer_block_counts(self):
db = self.get_db()
try:
return {
r["blocked_hash"]: r["count"]
for r in db.execute(
"SELECT blocked_hash, count(*) as count FROM peer_blocks GROUP BY blocked_hash"
).fetchall()
}
finally:
self.return_db(db)
def get_peer_block_list(self):
db = self.get_db()
try:
return [r["blocked_hash"] for r in db.execute(
"SELECT DISTINCT blocked_hash FROM peer_blocks"
).fetchall()]
finally:
self.return_db(db)
def clear_peer_block(self, blocked_hash):
db = self.get_db()
try:
db.execute("DELETE FROM peer_blocks WHERE blocked_hash = ?", (blocked_hash,))
db.commit()
finally:
self.return_db(db)
def retract_thread(self, thread_id, author_instance, now):
db = self.get_db()
try:
db.execute(
"INSERT OR REPLACE INTO retracted_content (content_id, content_type, author_instance, retracted_at) "
"VALUES (?, 'thread', ?, ?)",
(thread_id, author_instance, now),
)
db.execute("UPDATE threads SET title='[retracted]', url='', body='', tags='', score=0 WHERE id=?",
(thread_id,))
db.commit()
finally:
self.return_db(db)
def retract_post(self, post_id, author_instance, now):
db = self.get_db()
try:
db.execute(
"INSERT OR REPLACE INTO retracted_content (content_id, content_type, author_instance, retracted_at) "
"VALUES (?, 'post', ?, ?)",
(post_id, author_instance, now),
)
db.execute("UPDATE posts SET body='[retracted]' WHERE id=?", (post_id,))
db.commit()
finally:
self.return_db(db)
def merge_retraction(self, content_id, content_type, author_instance, now):
db = self.get_db()
try:
existing = db.execute(
"SELECT retracted_at FROM retracted_content WHERE content_id=? AND content_type=?",
(content_id, content_type),
).fetchone()
if existing and existing["retracted_at"] >= now:
return
if content_type == "thread":
t = db.execute("SELECT author_instance FROM threads WHERE id=?", (content_id,)).fetchone()
if t and t["author_instance"] == author_instance:
db.execute(
"INSERT OR REPLACE INTO retracted_content VALUES (?, ?, ?, ?)",
(content_id, content_type, author_instance, now),
)
db.execute("UPDATE threads SET title='[retracted]', url='', body='', tags='', score=0 WHERE id=?",
(content_id,))
elif content_type == "post":
p = db.execute("SELECT author_instance FROM posts WHERE id=?", (content_id,)).fetchone()
if p and p["author_instance"] == author_instance:
db.execute(
"INSERT OR REPLACE INTO retracted_content VALUES (?, ?, ?, ?)",
(content_id, content_type, author_instance, now),
)
db.execute("UPDATE posts SET body='[retracted]' WHERE id=?", (content_id,))
db.commit()
finally:
self.return_db(db)
def get_retracted_ids(self):
db = self.get_db()
try:
threads = set(r["content_id"] for r in db.execute(
"SELECT content_id FROM retracted_content WHERE content_type='thread'"
).fetchall())
posts = set(r["content_id"] for r in db.execute(
"SELECT content_id FROM retracted_content WHERE content_type='post'"
).fetchall())
return threads, posts
finally:
self.return_db(db)
def get_raw_retractions(self):
db = self.get_db()
try:
return db.execute(
"SELECT content_id, content_type, author_instance, retracted_at FROM retracted_content"
).fetchall()
finally:
self.return_db(db)
def prune_old_content(self, retention_days):
"""Delete threads and posts older than retention_days."""
db = self.get_db()
try:
cutoff = (datetime.utcnow() - timedelta(days=retention_days)).strftime("%Y-%m-%dT%H:%M:%S")
# Delete posts in old threads
db.execute(
"DELETE FROM posts WHERE thread_id IN "
"(SELECT id FROM threads WHERE updated_at < ?)",
(cutoff,),
)
# Delete orphaned posts (thread already deleted)
db.execute(
"DELETE FROM posts WHERE thread_id NOT IN (SELECT id FROM threads)"
)
# Delete old threads
db.execute("DELETE FROM threads WHERE updated_at < ?", (cutoff,))
# Clean up orphaned upvotes
db.execute(
"DELETE FROM upvotes WHERE thread_id NOT IN (SELECT id FROM threads)"
)
db.commit()
finally:
self.return_db(db)