525 lines
19 KiB
Python
525 lines
19 KiB
Python
import sqlite3
|
|
import os
|
|
import threading
|
|
from datetime import datetime, timedelta
|
|
|
|
FORUM_DB = "forum.db"
|
|
|
|
|
|
class ForumDB:
|
|
def __init__(self, data_dir):
|
|
self.path = os.path.join(data_dir, FORUM_DB)
|
|
self._pool = []
|
|
self._pool_lock = threading.Lock()
|
|
self._POOL_SIZE = 8
|
|
self.init_db()
|
|
|
|
def init_db(self):
|
|
os.makedirs(os.path.dirname(self.path), exist_ok=True)
|
|
db = sqlite3.connect(self.path)
|
|
db.execute(
|
|
"CREATE TABLE IF NOT EXISTS threads ("
|
|
" id TEXT PRIMARY KEY,"
|
|
" title TEXT NOT NULL,"
|
|
" url TEXT DEFAULT '',"
|
|
" body TEXT DEFAULT '',"
|
|
" tags TEXT DEFAULT '',"
|
|
" author_instance TEXT NOT NULL,"
|
|
" author_name TEXT DEFAULT '',"
|
|
" created_at TEXT NOT NULL,"
|
|
" updated_at TEXT NOT NULL,"
|
|
" score INTEGER DEFAULT 0"
|
|
")"
|
|
)
|
|
db.execute(
|
|
"CREATE TABLE IF NOT EXISTS posts ("
|
|
" id TEXT PRIMARY KEY,"
|
|
" thread_id TEXT NOT NULL,"
|
|
" parent_id TEXT DEFAULT '',"
|
|
" body TEXT NOT NULL,"
|
|
" author_instance TEXT NOT NULL,"
|
|
" author_name TEXT DEFAULT '',"
|
|
" created_at TEXT NOT NULL,"
|
|
" FOREIGN KEY (thread_id) REFERENCES threads(id)"
|
|
")"
|
|
)
|
|
db.execute(
|
|
"CREATE TABLE IF NOT EXISTS upvotes ("
|
|
" thread_id TEXT NOT NULL,"
|
|
" instance_hash TEXT NOT NULL,"
|
|
" PRIMARY KEY (thread_id, instance_hash)"
|
|
")"
|
|
)
|
|
db.execute(
|
|
"CREATE TABLE IF NOT EXISTS synced_instances ("
|
|
" instance_hash TEXT PRIMARY KEY,"
|
|
" name TEXT DEFAULT '',"
|
|
" last_sync TEXT DEFAULT ''"
|
|
")"
|
|
)
|
|
db.execute(
|
|
"CREATE TABLE IF NOT EXISTS settings ("
|
|
" key TEXT PRIMARY KEY,"
|
|
" value TEXT"
|
|
")"
|
|
)
|
|
db.execute("CREATE INDEX IF NOT EXISTS idx_posts_thread ON posts(thread_id)")
|
|
db.execute("CREATE INDEX IF NOT EXISTS idx_posts_created ON posts(created_at)")
|
|
db.execute("CREATE INDEX IF NOT EXISTS idx_threads_updated ON threads(updated_at)")
|
|
db.execute("CREATE INDEX IF NOT EXISTS idx_threads_created ON threads(created_at)")
|
|
db.execute(
|
|
"CREATE TABLE IF NOT EXISTS peer_blocks ("
|
|
" peer_hash TEXT NOT NULL,"
|
|
" blocked_hash TEXT NOT NULL,"
|
|
" PRIMARY KEY (peer_hash, blocked_hash)"
|
|
")"
|
|
)
|
|
db.execute(
|
|
"CREATE TABLE IF NOT EXISTS retracted_content ("
|
|
" content_id TEXT NOT NULL,"
|
|
" content_type TEXT NOT NULL,"
|
|
" author_instance TEXT NOT NULL,"
|
|
" retracted_at TEXT NOT NULL,"
|
|
" PRIMARY KEY (content_id, content_type)"
|
|
")"
|
|
)
|
|
db.commit()
|
|
db.close()
|
|
|
|
def get_db(self):
|
|
with self._pool_lock:
|
|
if self._pool:
|
|
db = self._pool.pop()
|
|
try:
|
|
db.execute("SELECT 1")
|
|
return db
|
|
except Exception:
|
|
pass
|
|
db = sqlite3.connect(self.path, timeout=10)
|
|
db.execute("PRAGMA journal_mode=WAL")
|
|
db.row_factory = sqlite3.Row
|
|
return db
|
|
|
|
def return_db(self, db):
|
|
try:
|
|
db.rollback()
|
|
except Exception:
|
|
try:
|
|
db.close()
|
|
except Exception:
|
|
pass
|
|
return
|
|
with self._pool_lock:
|
|
if len(self._pool) < self._POOL_SIZE:
|
|
self._pool.append(db)
|
|
else:
|
|
db.close()
|
|
|
|
def get_setting(self, key, default=""):
|
|
db = self.get_db()
|
|
try:
|
|
row = db.execute("SELECT value FROM settings WHERE key = ?", (key,)).fetchone()
|
|
return row["value"] if row else default
|
|
finally:
|
|
self.return_db(db)
|
|
|
|
def set_setting(self, key, value):
|
|
db = self.get_db()
|
|
try:
|
|
db.execute(
|
|
"INSERT INTO settings (key, value) VALUES (?, ?) "
|
|
"ON CONFLICT(key) DO UPDATE SET value=excluded.value",
|
|
(key, value),
|
|
)
|
|
db.commit()
|
|
finally:
|
|
self.return_db(db)
|
|
|
|
def get_thread(self, thread_id):
|
|
db = self.get_db()
|
|
try:
|
|
return db.execute(
|
|
"SELECT * FROM threads WHERE id = ?", (thread_id,)
|
|
).fetchone()
|
|
finally:
|
|
self.return_db(db)
|
|
|
|
def get_posts(self, thread_id):
|
|
db = self.get_db()
|
|
try:
|
|
return db.execute(
|
|
"SELECT * FROM posts WHERE thread_id = ? ORDER BY created_at ASC",
|
|
(thread_id,),
|
|
).fetchall()
|
|
finally:
|
|
self.return_db(db)
|
|
|
|
def get_threads(self, page=1, per_page=20, tag="", search=""):
|
|
db = self.get_db()
|
|
try:
|
|
offset = (page - 1) * per_page
|
|
params = []
|
|
where = []
|
|
if tag:
|
|
where.append("t.tags LIKE ?")
|
|
params.append(f"%{tag}%")
|
|
if search:
|
|
where.append("(t.title LIKE ? OR t.body LIKE ?)")
|
|
params.extend([f"%{search}%", f"%{search}%"])
|
|
where_clause = (" WHERE " + " AND ".join(where)) if where else ""
|
|
total = db.execute(
|
|
f"SELECT count(*) FROM threads t{where_clause}", params
|
|
).fetchone()[0]
|
|
rows = db.execute(
|
|
f"SELECT t.*, (SELECT count(*) FROM posts p WHERE p.thread_id = t.id) AS reply_count "
|
|
f"FROM threads t{where_clause} ORDER BY t.updated_at DESC LIMIT ? OFFSET ?",
|
|
params + [per_page, offset],
|
|
).fetchall()
|
|
return rows, total
|
|
finally:
|
|
self.return_db(db)
|
|
|
|
def create_thread(self, thread_id, title, url, body, tags, author_instance, author_name, now):
|
|
db = self.get_db()
|
|
try:
|
|
db.execute(
|
|
"INSERT INTO threads (id, title, url, body, tags, author_instance, author_name, created_at, updated_at) "
|
|
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
|
(thread_id, title, url, body, tags, author_instance, author_name, now, now),
|
|
)
|
|
db.commit()
|
|
finally:
|
|
self.return_db(db)
|
|
|
|
def create_post(self, post_id, thread_id, parent_id, body, author_instance, author_name, now):
|
|
db = self.get_db()
|
|
try:
|
|
db.execute(
|
|
"INSERT INTO posts (id, thread_id, parent_id, body, author_instance, author_name, created_at) "
|
|
"VALUES (?, ?, ?, ?, ?, ?, ?)",
|
|
(post_id, thread_id, parent_id, body, author_instance, author_name, now),
|
|
)
|
|
db.execute("UPDATE threads SET updated_at = ? WHERE id = ?", (now, thread_id))
|
|
db.commit()
|
|
finally:
|
|
self.return_db(db)
|
|
|
|
def toggle_upvote(self, thread_id, instance_hash):
|
|
db = self.get_db()
|
|
try:
|
|
row = db.execute(
|
|
"SELECT 1 FROM upvotes WHERE thread_id = ? AND instance_hash = ?",
|
|
(thread_id, instance_hash),
|
|
).fetchone()
|
|
if row:
|
|
db.execute(
|
|
"DELETE FROM upvotes WHERE thread_id = ? AND instance_hash = ?",
|
|
(thread_id, instance_hash),
|
|
)
|
|
db.execute("UPDATE threads SET score = score - 1 WHERE id = ?", (thread_id,))
|
|
delta = -1
|
|
else:
|
|
db.execute(
|
|
"INSERT INTO upvotes (thread_id, instance_hash) VALUES (?, ?)",
|
|
(thread_id, instance_hash),
|
|
)
|
|
db.execute("UPDATE threads SET score = score + 1 WHERE id = ?", (thread_id,))
|
|
delta = 1
|
|
db.commit()
|
|
return delta
|
|
finally:
|
|
self.return_db(db)
|
|
|
|
def has_upvoted(self, thread_id, instance_hash):
|
|
db = self.get_db()
|
|
try:
|
|
return db.execute(
|
|
"SELECT 1 FROM upvotes WHERE thread_id = ? AND instance_hash = ?",
|
|
(thread_id, instance_hash),
|
|
).fetchone() is not None
|
|
finally:
|
|
self.return_db(db)
|
|
|
|
def get_synced_instances(self):
|
|
db = self.get_db()
|
|
try:
|
|
return db.execute("SELECT * FROM synced_instances").fetchall()
|
|
finally:
|
|
self.return_db(db)
|
|
|
|
def add_known_peer(self, instance_hash):
|
|
"""Add a discovered peer to the sync list (auto-discovery)."""
|
|
db = self.get_db()
|
|
try:
|
|
db.execute(
|
|
"INSERT OR IGNORE INTO synced_instances (instance_hash) VALUES (?)",
|
|
(instance_hash,),
|
|
)
|
|
db.commit()
|
|
finally:
|
|
self.return_db(db)
|
|
|
|
def get_all_known_hashes(self):
|
|
"""Get all known instance hashes for peer discovery gossip."""
|
|
db = self.get_db()
|
|
try:
|
|
return [r["instance_hash"] for r in db.execute(
|
|
"SELECT instance_hash FROM synced_instances"
|
|
).fetchall()]
|
|
finally:
|
|
self.return_db(db)
|
|
|
|
def upsert_synced_instance(self, instance_hash, name=""):
|
|
db = self.get_db()
|
|
try:
|
|
db.execute(
|
|
"INSERT INTO synced_instances (instance_hash, name) VALUES (?, ?) "
|
|
"ON CONFLICT(instance_hash) DO UPDATE SET name=excluded.name",
|
|
(instance_hash, name),
|
|
)
|
|
db.commit()
|
|
finally:
|
|
self.return_db(db)
|
|
|
|
def remove_synced_instance(self, instance_hash):
|
|
db = self.get_db()
|
|
try:
|
|
db.execute("DELETE FROM synced_instances WHERE instance_hash = ?", (instance_hash,))
|
|
db.commit()
|
|
finally:
|
|
self.return_db(db)
|
|
|
|
def update_last_sync(self, instance_hash, now):
|
|
db = self.get_db()
|
|
try:
|
|
db.execute(
|
|
"UPDATE synced_instances SET last_sync = ? WHERE instance_hash = ?",
|
|
(now, instance_hash),
|
|
)
|
|
db.commit()
|
|
finally:
|
|
self.return_db(db)
|
|
|
|
def set_last_sync(self, instance_hash, timestamp):
|
|
self.update_last_sync(instance_hash, timestamp)
|
|
|
|
def get_new_content(self, since):
|
|
db = self.get_db()
|
|
try:
|
|
threads = db.execute(
|
|
"SELECT * FROM threads WHERE updated_at > ? ORDER BY updated_at ASC",
|
|
(since,),
|
|
).fetchall()
|
|
posts = db.execute(
|
|
"SELECT * FROM posts WHERE created_at > ? ORDER BY created_at ASC",
|
|
(since,),
|
|
).fetchall()
|
|
upvote_threads = db.execute(
|
|
"SELECT thread_id FROM upvotes u "
|
|
"WHERE NOT EXISTS (SELECT 1 FROM threads t WHERE t.id = u.thread_id AND t.updated_at > ?)",
|
|
(since,),
|
|
).fetchall()
|
|
return threads, posts, [r["thread_id"] for r in upvote_threads]
|
|
finally:
|
|
self.return_db(db)
|
|
|
|
def update_thread(self, thread_id, title, url, body, tags, now):
|
|
db = self.get_db()
|
|
try:
|
|
db.execute(
|
|
"UPDATE threads SET title=?, url=?, body=?, tags=?, updated_at=? WHERE id=?",
|
|
(title, url, body, tags, now, thread_id),
|
|
)
|
|
db.commit()
|
|
finally:
|
|
self.return_db(db)
|
|
|
|
def merge_thread(self, thread):
|
|
db = self.get_db()
|
|
try:
|
|
existing = db.execute("SELECT updated_at FROM threads WHERE id = ?", (thread["id"],)).fetchone()
|
|
if existing and existing["updated_at"] >= thread["updated_at"]:
|
|
return
|
|
db.execute(
|
|
"INSERT INTO threads (id, title, url, body, tags, author_instance, author_name, created_at, updated_at, score) "
|
|
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) "
|
|
"ON CONFLICT(id) DO UPDATE SET "
|
|
"title=excluded.title, url=excluded.url, body=excluded.body, "
|
|
"tags=excluded.tags, updated_at=excluded.updated_at",
|
|
(thread["id"], thread["title"], thread["url"], thread["body"],
|
|
thread["tags"], thread["author_instance"], thread["author_name"],
|
|
thread["created_at"], thread["updated_at"], thread["score"]),
|
|
)
|
|
db.commit()
|
|
finally:
|
|
self.return_db(db)
|
|
|
|
def merge_post(self, post):
|
|
db = self.get_db()
|
|
try:
|
|
db.execute(
|
|
"INSERT OR IGNORE INTO posts (id, thread_id, parent_id, body, author_instance, author_name, created_at) "
|
|
"VALUES (?, ?, ?, ?, ?, ?, ?)",
|
|
(post["id"], post["thread_id"], post["parent_id"], post["body"],
|
|
post["author_instance"], post["author_name"], post["created_at"]),
|
|
)
|
|
db.commit()
|
|
finally:
|
|
self.return_db(db)
|
|
|
|
def merge_upvote(self, thread_id, instance_hash):
|
|
db = self.get_db()
|
|
try:
|
|
db.execute(
|
|
"INSERT OR IGNORE INTO upvotes (thread_id, instance_hash) VALUES (?, ?)",
|
|
(thread_id, instance_hash),
|
|
)
|
|
if db.total_changes:
|
|
db.execute("UPDATE threads SET score = score + 1 WHERE id = ?", (thread_id,))
|
|
db.commit()
|
|
finally:
|
|
self.return_db(db)
|
|
|
|
def record_peer_block(self, peer_hash, blocked_hash):
|
|
db = self.get_db()
|
|
try:
|
|
db.execute(
|
|
"INSERT OR IGNORE INTO peer_blocks (peer_hash, blocked_hash) VALUES (?, ?)",
|
|
(peer_hash, blocked_hash),
|
|
)
|
|
db.commit()
|
|
finally:
|
|
self.return_db(db)
|
|
|
|
def get_peer_block_counts(self):
|
|
db = self.get_db()
|
|
try:
|
|
return {
|
|
r["blocked_hash"]: r["count"]
|
|
for r in db.execute(
|
|
"SELECT blocked_hash, count(*) as count FROM peer_blocks GROUP BY blocked_hash"
|
|
).fetchall()
|
|
}
|
|
finally:
|
|
self.return_db(db)
|
|
|
|
def get_peer_block_list(self):
|
|
db = self.get_db()
|
|
try:
|
|
return [r["blocked_hash"] for r in db.execute(
|
|
"SELECT DISTINCT blocked_hash FROM peer_blocks"
|
|
).fetchall()]
|
|
finally:
|
|
self.return_db(db)
|
|
|
|
def clear_peer_block(self, blocked_hash):
|
|
db = self.get_db()
|
|
try:
|
|
db.execute("DELETE FROM peer_blocks WHERE blocked_hash = ?", (blocked_hash,))
|
|
db.commit()
|
|
finally:
|
|
self.return_db(db)
|
|
|
|
def retract_thread(self, thread_id, author_instance, now):
|
|
db = self.get_db()
|
|
try:
|
|
db.execute(
|
|
"INSERT OR REPLACE INTO retracted_content (content_id, content_type, author_instance, retracted_at) "
|
|
"VALUES (?, 'thread', ?, ?)",
|
|
(thread_id, author_instance, now),
|
|
)
|
|
db.execute("UPDATE threads SET title='[retracted]', url='', body='', tags='', score=0 WHERE id=?",
|
|
(thread_id,))
|
|
db.commit()
|
|
finally:
|
|
self.return_db(db)
|
|
|
|
def retract_post(self, post_id, author_instance, now):
|
|
db = self.get_db()
|
|
try:
|
|
db.execute(
|
|
"INSERT OR REPLACE INTO retracted_content (content_id, content_type, author_instance, retracted_at) "
|
|
"VALUES (?, 'post', ?, ?)",
|
|
(post_id, author_instance, now),
|
|
)
|
|
db.execute("UPDATE posts SET body='[retracted]' WHERE id=?", (post_id,))
|
|
db.commit()
|
|
finally:
|
|
self.return_db(db)
|
|
|
|
def merge_retraction(self, content_id, content_type, author_instance, now):
|
|
db = self.get_db()
|
|
try:
|
|
existing = db.execute(
|
|
"SELECT retracted_at FROM retracted_content WHERE content_id=? AND content_type=?",
|
|
(content_id, content_type),
|
|
).fetchone()
|
|
if existing and existing["retracted_at"] >= now:
|
|
return
|
|
if content_type == "thread":
|
|
t = db.execute("SELECT author_instance FROM threads WHERE id=?", (content_id,)).fetchone()
|
|
if t and t["author_instance"] == author_instance:
|
|
db.execute(
|
|
"INSERT OR REPLACE INTO retracted_content VALUES (?, ?, ?, ?)",
|
|
(content_id, content_type, author_instance, now),
|
|
)
|
|
db.execute("UPDATE threads SET title='[retracted]', url='', body='', tags='', score=0 WHERE id=?",
|
|
(content_id,))
|
|
elif content_type == "post":
|
|
p = db.execute("SELECT author_instance FROM posts WHERE id=?", (content_id,)).fetchone()
|
|
if p and p["author_instance"] == author_instance:
|
|
db.execute(
|
|
"INSERT OR REPLACE INTO retracted_content VALUES (?, ?, ?, ?)",
|
|
(content_id, content_type, author_instance, now),
|
|
)
|
|
db.execute("UPDATE posts SET body='[retracted]' WHERE id=?", (content_id,))
|
|
db.commit()
|
|
finally:
|
|
self.return_db(db)
|
|
|
|
def get_retracted_ids(self):
|
|
db = self.get_db()
|
|
try:
|
|
threads = set(r["content_id"] for r in db.execute(
|
|
"SELECT content_id FROM retracted_content WHERE content_type='thread'"
|
|
).fetchall())
|
|
posts = set(r["content_id"] for r in db.execute(
|
|
"SELECT content_id FROM retracted_content WHERE content_type='post'"
|
|
).fetchall())
|
|
return threads, posts
|
|
finally:
|
|
self.return_db(db)
|
|
|
|
def get_raw_retractions(self):
|
|
db = self.get_db()
|
|
try:
|
|
return db.execute(
|
|
"SELECT content_id, content_type, author_instance, retracted_at FROM retracted_content"
|
|
).fetchall()
|
|
finally:
|
|
self.return_db(db)
|
|
|
|
def prune_old_content(self, retention_days):
|
|
"""Delete threads and posts older than retention_days."""
|
|
db = self.get_db()
|
|
try:
|
|
cutoff = (datetime.utcnow() - timedelta(days=retention_days)).strftime("%Y-%m-%dT%H:%M:%S")
|
|
# Delete posts in old threads
|
|
db.execute(
|
|
"DELETE FROM posts WHERE thread_id IN "
|
|
"(SELECT id FROM threads WHERE updated_at < ?)",
|
|
(cutoff,),
|
|
)
|
|
# Delete orphaned posts (thread already deleted)
|
|
db.execute(
|
|
"DELETE FROM posts WHERE thread_id NOT IN (SELECT id FROM threads)"
|
|
)
|
|
# Delete old threads
|
|
db.execute("DELETE FROM threads WHERE updated_at < ?", (cutoff,))
|
|
# Clean up orphaned upvotes
|
|
db.execute(
|
|
"DELETE FROM upvotes WHERE thread_id NOT IN (SELECT id FROM threads)"
|
|
)
|
|
db.commit()
|
|
finally:
|
|
self.return_db(db)
|