#!/usr/bin/env python3
"""Minimal cookie-session auth gate for DuckDB shell."""
import hmac, hashlib, os, secrets, subprocess, time
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs
PASSWORD = os.environ.get('BASIC_AUTH_PASSWORD', '').encode()
_SECRET = secrets.token_bytes(32)
def _make_token():
day = str(int(time.time()) // 86400)
return hmac.new(_SECRET, day.encode(), hashlib.sha256).hexdigest()
def _valid(token):
if not token:
return False
for delta in (0, 1):
day = str(int(time.time()) // 86400 - delta)
expected = hmac.new(_SECRET, day.encode(), hashlib.sha256).hexdigest()
if hmac.compare_digest(token, expected):
return True
return False
LOGIN_HTML = """
DB Shell
""".encode()
class H(BaseHTTPRequestHandler):
def _cookie(self):
for part in self.headers.get('Cookie', '').split(';'):
part = part.strip()
if part.startswith('ddb_auth='):
return part[9:]
return ''
def do_GET(self):
if self.path == '/auth':
if _valid(self._cookie()):
self._resp(200)
else:
self.send_response(302)
self.send_header('Location', '/login')
self.end_headers()
else:
self._resp(200, LOGIN_HTML, 'text/html; charset=utf-8')
def do_POST(self):
if self.path == '/query':
pwd = self.headers.get('X-Password', '').encode()
if not hmac.compare_digest(pwd, PASSWORD):
self._resp(401, b'Unauthorized\n')
return
sql = self.rfile.read(int(self.headers.get('Content-Length', 0))).decode(errors='replace')
try:
r = subprocess.run(
['duckdb', '-readonly', '--init', '/app/ssh_init.sql', '/app/basedosdados.duckdb'],
input=sql, capture_output=True, text=True, timeout=120
)
out = (r.stdout + r.stderr).encode()
except subprocess.TimeoutExpired:
out = b'timeout\n'
self._resp(200, out, 'text/plain; charset=utf-8')
return
body = self.rfile.read(int(self.headers.get('Content-Length', 0))).decode(errors='replace')
pwd = parse_qs(body).get('password', [''])[0].encode()
if hmac.compare_digest(pwd, PASSWORD):
self.send_response(302)
self.send_header('Set-Cookie', f'ddb_auth={_make_token()}; Path=/; HttpOnly; SameSite=Strict')
self.send_header('Location', '/')
self.end_headers()
else:
self._resp(200, LOGIN_HTML, 'text/html; charset=utf-8')
def _resp(self, code, body=b'', ct='text/plain'):
self.send_response(code)
if body:
self.send_header('Content-Type', ct)
self.send_header('Content-Length', str(len(body)))
self.end_headers()
if body:
self.wfile.write(body)
def log_message(self, *_):
pass
HTTPServer(('127.0.0.1', 8081), H).serve_forever()