174 tests covering URL normalization, FTS5 query sanitization, SSRF/CSRF guards, sharing-mode logic, DB schema and upsert paths, handler end-to-end flows, and gateway body-size / mesh-whitelist guards. Each recent bug-fix commit (6ffd38d,1bc695f,8dffd8c) has an explicit regression test in test_regressions.py. One xfail documents a minor latent bug in clean_url where port 80 is not stripped from upgraded https URLs.
164 lines
5.6 KiB
Python
164 lines
5.6 KiB
Python
"""Tests for gateway-level guards: body-size cap and Reticulum surface whitelist.
|
|
|
|
Regression targets from commit 1bc695f — a 16 MiB upload limit (DoS guard)
|
|
and a strict GET-/api/sites-only whitelist for requests arriving over the
|
|
Reticulum mesh (CSRF can't protect mesh callers, so gate by whitelist).
|
|
"""
|
|
import io
|
|
|
|
import pytest
|
|
|
|
import app as app_module
|
|
from gateway import GatewayHandler, MAX_BODY_SIZE
|
|
|
|
|
|
class FakeHeaders:
|
|
"""Minimal replacement for http.server request headers."""
|
|
def __init__(self, items=None):
|
|
self._items = dict(items or {})
|
|
|
|
def get(self, key, default=None):
|
|
return self._items.get(key, default)
|
|
|
|
|
|
class FakeGatewayHandler(GatewayHandler):
|
|
"""Bypass the socket-bound __init__ and capture response calls in memory."""
|
|
def __init__(self, path="/", method="POST", headers=None, rfile=None):
|
|
self.path = path
|
|
self.command = method
|
|
self.headers = FakeHeaders(headers or {})
|
|
self.rfile = rfile or io.BytesIO()
|
|
self.wfile = io.BytesIO()
|
|
self._captured = {
|
|
"error": None, "status": None, "headers": [], "body_written": None,
|
|
}
|
|
|
|
def send_error(self, code, msg=""):
|
|
self._captured["error"] = (code, msg)
|
|
|
|
def send_response(self, code):
|
|
self._captured["status"] = code
|
|
|
|
def send_header(self, k, v):
|
|
self._captured["headers"].append((k, v))
|
|
|
|
def end_headers(self):
|
|
pass
|
|
|
|
|
|
def test_post_over_size_cap_rejected_with_413():
|
|
"""Regression for 1bc695f: request bodies over MAX_BODY_SIZE must be rejected
|
|
without being read into memory."""
|
|
oversize = MAX_BODY_SIZE + 1
|
|
handler = FakeGatewayHandler(
|
|
path="/add",
|
|
method="POST",
|
|
headers={"Content-Length": str(oversize)},
|
|
)
|
|
handler._forward("POST")
|
|
assert handler._captured["error"] is not None
|
|
code, _msg = handler._captured["error"]
|
|
assert code == 413
|
|
|
|
|
|
def test_post_at_size_cap_accepted():
|
|
"""A body exactly at MAX_BODY_SIZE should not be rejected by the size check."""
|
|
handler = FakeGatewayHandler(
|
|
path="/_does_not_matter",
|
|
method="POST",
|
|
headers={"Content-Length": str(MAX_BODY_SIZE)},
|
|
# rfile has no data; handler will try to read; local_dispatch isn't set.
|
|
# We only care that the 413 check passes, not that the request succeeds.
|
|
rfile=io.BytesIO(b""),
|
|
)
|
|
# Stub out local_dispatch so _forward doesn't try the network path.
|
|
from gateway import GatewayState
|
|
original = GatewayState.local_dispatch
|
|
GatewayState.local_dispatch = lambda data: {
|
|
"status": 404, "content_type": "text/plain", "body": "nope",
|
|
}
|
|
try:
|
|
handler._forward("POST")
|
|
finally:
|
|
GatewayState.local_dispatch = original
|
|
# Not a 413, because the body is exactly at the cap (cap is inclusive).
|
|
if handler._captured["error"]:
|
|
assert handler._captured["error"][0] != 413
|
|
|
|
|
|
def test_negative_content_length_rejected():
|
|
handler = FakeGatewayHandler(
|
|
path="/add",
|
|
method="POST",
|
|
headers={"Content-Length": "-1"},
|
|
)
|
|
handler._forward("POST")
|
|
assert handler._captured["error"] is not None
|
|
code, _msg = handler._captured["error"]
|
|
assert code == 400
|
|
|
|
|
|
def test_invalid_content_length_rejected():
|
|
handler = FakeGatewayHandler(
|
|
path="/add",
|
|
method="POST",
|
|
headers={"Content-Length": "abc"},
|
|
)
|
|
handler._forward("POST")
|
|
assert handler._captured["error"] is not None
|
|
code, _msg = handler._captured["error"]
|
|
assert code == 400
|
|
|
|
|
|
# -------- Reticulum mesh surface whitelist --------
|
|
|
|
|
|
def test_mesh_rejects_non_api_sites_get():
|
|
"""Regression for 1bc695f: remote mesh callers can only GET /api/sites."""
|
|
resp = app_module.rns_request_handler(
|
|
path="/tinyweb",
|
|
data={"method": "GET", "path": "/pages", "query": {}, "body": {}, "gateway_host": ""},
|
|
request_id="x", link_id="y", remote_identity=None, requested_at=0,
|
|
)
|
|
assert resp["status"] == 403
|
|
|
|
|
|
def test_mesh_rejects_post_to_api_sites():
|
|
resp = app_module.rns_request_handler(
|
|
path="/tinyweb",
|
|
data={"method": "POST", "path": "/api/sites", "query": {}, "body": {}, "gateway_host": ""},
|
|
request_id="x", link_id="y", remote_identity=None, requested_at=0,
|
|
)
|
|
assert resp["status"] == 403
|
|
|
|
|
|
def test_mesh_rejects_sensitive_local_endpoints():
|
|
for path in ("/add", "/delete/1", "/style", "/import", "/export"):
|
|
resp = app_module.rns_request_handler(
|
|
path="/tinyweb",
|
|
data={"method": "GET", "path": path, "query": {}, "body": {}, "gateway_host": ""},
|
|
request_id="x", link_id="y", remote_identity=None, requested_at=0,
|
|
)
|
|
assert resp["status"] == 403, f"path {path!r} leaked through mesh whitelist"
|
|
|
|
|
|
def test_mesh_allows_api_sites_get(temp_db, csrf_session):
|
|
"""Sanity check: the one whitelisted combination is accepted."""
|
|
resp = app_module.rns_request_handler(
|
|
path="/tinyweb",
|
|
data={"method": "GET", "path": "/api/sites", "query": {}, "body": {}, "gateway_host": ""},
|
|
request_id="x", link_id="y", remote_identity=None, requested_at=0,
|
|
)
|
|
# Status depends on handler output; 200 is the happy path.
|
|
assert resp["status"] in (200, 403) # 403 if sharing is disabled by default
|
|
|
|
|
|
def test_mesh_handles_missing_data_payload():
|
|
"""Regression-minded check: a None or malformed data object shouldn't crash."""
|
|
resp = app_module.rns_request_handler(
|
|
path="/tinyweb",
|
|
data=None,
|
|
request_id="x", link_id="y", remote_identity=None, requested_at=0,
|
|
)
|
|
# Default data has method=GET, path=/ which is not in the whitelist.
|
|
assert resp["status"] == 403
|