#!/usr/bin/env python3 """Minimal cookie-session auth gate for DuckDB shell.""" import hmac, hashlib, os, secrets, subprocess, time from http.server import HTTPServer, BaseHTTPRequestHandler from urllib.parse import parse_qs PASSWORD = os.environ.get('BASIC_AUTH_PASSWORD', '').encode() _SECRET = secrets.token_bytes(32) def _make_token(): day = str(int(time.time()) // 86400) return hmac.new(_SECRET, day.encode(), hashlib.sha256).hexdigest() def _valid(token): if not token: return False for delta in (0, 1): day = str(int(time.time()) // 86400 - delta) expected = hmac.new(_SECRET, day.encode(), hashlib.sha256).hexdigest() if hmac.compare_digest(token, expected): return True return False LOGIN_HTML = """ DB Shell

DB Shell

""".encode() class H(BaseHTTPRequestHandler): def _cookie(self): for part in self.headers.get('Cookie', '').split(';'): part = part.strip() if part.startswith('ddb_auth='): return part[9:] return '' def do_GET(self): if self.path == '/auth': if _valid(self._cookie()): self._resp(200) else: self.send_response(302) self.send_header('Location', '/login') self.end_headers() else: self._resp(200, LOGIN_HTML, 'text/html; charset=utf-8') def do_POST(self): if self.path == '/query': pwd = self.headers.get('X-Password', '').encode() if not hmac.compare_digest(pwd, PASSWORD): self._resp(401, b'Unauthorized\n') return sql = self.rfile.read(int(self.headers.get('Content-Length', 0))).decode(errors='replace') try: r = subprocess.run( ['duckdb', '-readonly', '--init', '/app/ssh_init.sql', '/app/basedosdados.duckdb'], input=sql, capture_output=True, text=True, timeout=120 ) out = (r.stdout + r.stderr).encode() except subprocess.TimeoutExpired: out = b'timeout\n' self._resp(200, out, 'text/plain; charset=utf-8') return body = self.rfile.read(int(self.headers.get('Content-Length', 0))).decode(errors='replace') pwd = parse_qs(body).get('password', [''])[0].encode() if hmac.compare_digest(pwd, PASSWORD): self.send_response(302) self.send_header('Set-Cookie', f'ddb_auth={_make_token()}; Path=/; HttpOnly; SameSite=Strict') self.send_header('Location', '/') self.end_headers() else: self._resp(200, LOGIN_HTML, 'text/html; charset=utf-8') def _resp(self, code, body=b'', ct='text/plain'): self.send_response(code) if body: self.send_header('Content-Type', ct) self.send_header('Content-Length', str(len(body))) self.end_headers() if body: self.wfile.write(body) def log_message(self, *_): pass HTTPServer(('127.0.0.1', 8081), H).serve_forever()