| """ |
| SQL Analyzer — Pure FastAPI Backend |
| ==================================== |
| Serves: |
| • REST API → /api/health, /api/lint, /api/parse, /api/format, /api/inject |
| • Swagger UI → /docs (auto-generated by FastAPI) |
| • React SPA → everything else (static files from ./static/) |
| |
| Single-process deployment — no Node.js required. |
| Compatible with Hugging Face Spaces (Docker SDK) and any OCI-compatible host. |
| """ |
|
|
| import re |
| import json |
| import traceback |
| import os |
| from pathlib import Path |
| from typing import Any, Optional |
|
|
| from fastapi import FastAPI, HTTPException, Request |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.responses import FileResponse, JSONResponse |
| from fastapi.staticfiles import StaticFiles |
| from pydantic import BaseModel |
| import sqlfluff |
| from sqlfluff.core import Linter |
| from sqlfluff.core.parser import BaseSegment |
|
|
| |
| |
| |
|
|
| app = FastAPI( |
| title="SQL Analyzer API", |
| description=( |
| "A powerful SQL analysis backend providing linting, AST parsing, " |
| "SQL formatting, and injection detection powered by SQLFluff." |
| ), |
| version="1.0.0", |
| docs_url="/docs", |
| redoc_url="/redoc", |
| openapi_url="/openapi.json", |
| ) |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| |
| |
|
|
| class SqlRequest(BaseModel): |
| sql: str |
| dialect: str = "ansi" |
|
|
| class LintViolation(BaseModel): |
| line_no: int |
| line_pos: int |
| code: str |
| description: str |
| name: str |
| warning: bool |
| fixable: bool |
|
|
| class LintResponse(BaseModel): |
| dialect: str |
| violations: list[LintViolation] |
| passed: bool |
| stats: dict[str, Any] |
|
|
| class AstNode(BaseModel): |
| id: str |
| type: str |
| name: str |
| raw: Optional[str] |
| start_line: Optional[int] |
| start_pos: Optional[int] |
| end_line: Optional[int] |
| end_pos: Optional[int] |
| is_leaf: bool |
| children: list["AstNode"] |
|
|
| AstNode.model_rebuild() |
|
|
| class ParseResponse(BaseModel): |
| dialect: str |
| tree: AstNode |
| token_count: int |
| depth: int |
|
|
| class FormatResponse(BaseModel): |
| dialect: str |
| original: str |
| formatted: str |
| changed: bool |
| fixes_applied: int |
|
|
| class InjectionPattern(BaseModel): |
| pattern_id: str |
| risk_level: str |
| category: str |
| description: str |
| detail: str |
| offending_token: Optional[str] |
| line_no: Optional[int] |
| line_pos: Optional[int] |
| recommendation: str |
|
|
| class InjectionResponse(BaseModel): |
| dialect: str |
| safe: bool |
| risk_score: int |
| patterns: list[InjectionPattern] |
| summary: str |
|
|
| class HealthResponse(BaseModel): |
| status: str |
| version: str |
| dialects: list[str] |
|
|
| |
| |
| |
|
|
| SQLFLUFF_DIALECTS = [ |
| "ansi", "athena", "bigquery", "clickhouse", "databricks", "db2", |
| "duckdb", "exasol", "greenplum", "hive", "mysql", "oracle", |
| "postgres", "redshift", "snowflake", "soql", "sparksql", "sqlite", |
| "teradata", "trino", "tsql", |
| ] |
|
|
| _node_counter = 0 |
|
|
| def _reset_counter(): |
| global _node_counter |
| _node_counter = 0 |
|
|
| def _next_id() -> str: |
| global _node_counter |
| _node_counter += 1 |
| return f"node_{_node_counter}" |
|
|
| def _segment_to_node(seg: BaseSegment, depth: int = 0) -> AstNode: |
| """Recursively convert a SQLFluff segment into a serialisable AstNode.""" |
| is_raw_attr = getattr(seg, "is_raw", None) |
| if callable(is_raw_attr): |
| is_leaf: bool = is_raw_attr() |
| elif is_raw_attr is not None: |
| is_leaf = bool(is_raw_attr) |
| else: |
| is_leaf = not bool(seg.segments) |
|
|
| raw = seg.raw if is_leaf else None |
|
|
| start_line = start_pos = end_line = end_pos = None |
| try: |
| if hasattr(seg, "pos_marker") and seg.pos_marker: |
| pm = seg.pos_marker |
| start_line = pm.line_no |
| start_pos = pm.line_pos |
| except Exception: |
| pass |
|
|
| children = [] |
| if not is_leaf and seg.segments: |
| for child in seg.segments: |
| children.append(_segment_to_node(child, depth + 1)) |
|
|
| return AstNode( |
| id=_next_id(), |
| type=seg.get_type(), |
| name=type(seg).__name__, |
| raw=raw, |
| start_line=start_line, |
| start_pos=start_pos, |
| end_line=end_line, |
| end_pos=end_pos, |
| is_leaf=bool(is_leaf), |
| children=children, |
| ) |
|
|
| def _tree_depth(node: AstNode) -> int: |
| if not node.children: |
| return 0 |
| return 1 + max(_tree_depth(c) for c in node.children) |
|
|
| def _count_tokens(node: AstNode) -> int: |
| if node.is_leaf: |
| return 1 |
| return sum(_count_tokens(c) for c in node.children) |
|
|
| |
| |
| |
|
|
| INJECTION_PATTERNS = [ |
| { |
| "id": "tautology_or_1_1", |
| "risk": "critical", |
| "category": "Tautology", |
| "description": "Always-true tautology detected (e.g. OR 1=1)", |
| "detail": "Tautologies force WHERE clauses to always evaluate to TRUE, bypassing all filters.", |
| "recommendation": "Use parameterised queries. Never interpolate user input into SQL strings.", |
| "regex": r"\bOR\s+['\"]?\d+['\"]?\s*=\s*['\"]?\d+['\"]?", |
| "flags": re.IGNORECASE, |
| }, |
| { |
| "id": "tautology_or_true", |
| "risk": "critical", |
| "category": "Tautology", |
| "description": "Always-true boolean tautology (e.g. OR TRUE / OR 'a'='a')", |
| "detail": "Boolean tautologies bypass WHERE conditions entirely.", |
| "recommendation": "Use parameterised queries and validate all user-supplied data.", |
| "regex": r"\bOR\s+(?:TRUE|'[^']*'\s*=\s*'[^']*')", |
| "flags": re.IGNORECASE, |
| }, |
| { |
| "id": "stacked_queries", |
| "risk": "critical", |
| "category": "Stacked Queries", |
| "description": "Stacked / batched query detected (semicolon followed by another statement)", |
| "detail": "Stacked queries allow attackers to append arbitrary SQL statements.", |
| "recommendation": "Disallow multiple statements in a single query. Use stored procedures or ORMs.", |
| "regex": r";\s*(?:SELECT|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER|EXEC|EXECUTE|UNION)\b", |
| "flags": re.IGNORECASE, |
| }, |
| { |
| "id": "comment_bypass_inline", |
| "risk": "high", |
| "category": "Comment Bypass", |
| "description": "Inline comment used to truncate or bypass query logic (--)", |
| "detail": "Inline comments (--) can strip the remainder of a query, bypassing authentication checks.", |
| "recommendation": "Strip or reject SQL comment sequences from user input.", |
| "regex": r"--[^\n]*", |
| "flags": 0, |
| }, |
| { |
| "id": "comment_bypass_block", |
| "risk": "high", |
| "category": "Comment Bypass", |
| "description": "Block comment used to obfuscate or bypass query logic (/* ... */)", |
| "detail": "Block comments can hide injected code and bypass naive input filters.", |
| "recommendation": "Strip or reject SQL block comment sequences from user input.", |
| "regex": r"/\*.*?\*/", |
| "flags": re.DOTALL, |
| }, |
| { |
| "id": "union_select", |
| "risk": "high", |
| "category": "UNION-based Injection", |
| "description": "UNION SELECT detected — potential data exfiltration vector", |
| "detail": "UNION SELECT allows attackers to append result sets from other tables, leaking sensitive data.", |
| "recommendation": "Use parameterised queries. Validate column counts and types.", |
| "regex": r"\bUNION\s+(?:ALL\s+)?SELECT\b", |
| "flags": re.IGNORECASE, |
| }, |
| { |
| "id": "sleep_benchmark", |
| "risk": "high", |
| "category": "Time-based Blind Injection", |
| "description": "Time-delay function detected (SLEEP / BENCHMARK / WAITFOR)", |
| "detail": "Time-based blind injection uses delays to infer data without visible output.", |
| "recommendation": "Parameterise all queries and restrict execution of time-delay functions.", |
| "regex": r"\b(?:SLEEP|BENCHMARK|WAITFOR\s+DELAY|PG_SLEEP)\s*\(", |
| "flags": re.IGNORECASE, |
| }, |
| { |
| "id": "exec_xp_cmdshell", |
| "risk": "critical", |
| "category": "Command Execution", |
| "description": "xp_cmdshell or EXEC detected — OS command execution risk", |
| "detail": "xp_cmdshell allows execution of arbitrary OS commands from SQL Server.", |
| "recommendation": "Disable xp_cmdshell. Never allow user input to reach EXEC statements.", |
| "regex": r"\b(?:xp_cmdshell|EXEC(?:UTE)?)\s*\(", |
| "flags": re.IGNORECASE, |
| }, |
| { |
| "id": "drop_table", |
| "risk": "critical", |
| "category": "Destructive Statement", |
| "description": "DROP TABLE / DROP DATABASE detected", |
| "detail": "Injected DROP statements can destroy entire tables or databases.", |
| "recommendation": "Restrict DDL permissions. Use parameterised queries and least-privilege accounts.", |
| "regex": r"\bDROP\s+(?:TABLE|DATABASE|SCHEMA|INDEX)\b", |
| "flags": re.IGNORECASE, |
| }, |
| { |
| "id": "hex_encoding", |
| "risk": "medium", |
| "category": "Obfuscation", |
| "description": "Hex-encoded string literal detected (0x...)", |
| "detail": "Hex encoding is commonly used to bypass string-based input filters.", |
| "recommendation": "Validate and sanitise all input. Use parameterised queries.", |
| "regex": r"\b0x[0-9a-fA-F]{4,}\b", |
| "flags": 0, |
| }, |
| { |
| "id": "char_concat", |
| "risk": "medium", |
| "category": "Obfuscation", |
| "description": "CHAR() concatenation detected — common obfuscation technique", |
| "detail": "Attackers use CHAR() to build strings character-by-character to evade filters.", |
| "recommendation": "Use parameterised queries. Restrict use of CHAR() in user-facing contexts.", |
| "regex": r"\bCHAR\s*\(\s*\d+", |
| "flags": re.IGNORECASE, |
| }, |
| { |
| "id": "information_schema", |
| "risk": "medium", |
| "category": "Schema Reconnaissance", |
| "description": "INFORMATION_SCHEMA query detected — schema enumeration attempt", |
| "detail": "Attackers query INFORMATION_SCHEMA to enumerate tables, columns, and credentials.", |
| "recommendation": "Restrict access to INFORMATION_SCHEMA. Use least-privilege DB accounts.", |
| "regex": r"\bINFORMATION_SCHEMA\b", |
| "flags": re.IGNORECASE, |
| }, |
| { |
| "id": "load_file", |
| "risk": "critical", |
| "category": "File System Access", |
| "description": "LOAD_FILE() or INTO OUTFILE detected — file system access risk", |
| "detail": "These MySQL functions allow reading/writing arbitrary files on the server.", |
| "recommendation": "Disable FILE privilege. Never allow user input near file I/O functions.", |
| "regex": r"\b(?:LOAD_FILE|INTO\s+(?:OUT|DUMP)FILE)\b", |
| "flags": re.IGNORECASE, |
| }, |
| ] |
|
|
| RISK_SCORE = {"critical": 35, "high": 20, "medium": 10, "low": 5} |
|
|
| def _detect_injection(sql: str) -> list[InjectionPattern]: |
| results: list[InjectionPattern] = [] |
| for pat in INJECTION_PATTERNS: |
| for m in re.finditer(pat["regex"], sql, pat["flags"]): |
| line_no = sql[: m.start()].count("\n") + 1 |
| line_pos = m.start() - sql[: m.start()].rfind("\n") |
| results.append( |
| InjectionPattern( |
| pattern_id=pat["id"], |
| risk_level=pat["risk"], |
| category=pat["category"], |
| description=pat["description"], |
| detail=pat["detail"], |
| offending_token=m.group(0)[:120], |
| line_no=line_no, |
| line_pos=line_pos, |
| recommendation=pat["recommendation"], |
| ) |
| ) |
| return results |
|
|
| |
| |
| |
|
|
| @app.get("/api/health", response_model=HealthResponse, tags=["System"]) |
| def health(): |
| """Return API health status and SQLFluff version.""" |
| return HealthResponse( |
| status="ok", |
| version=sqlfluff.__version__, |
| dialects=SQLFLUFF_DIALECTS, |
| ) |
|
|
| @app.post("/api/lint", response_model=LintResponse, tags=["Analysis"]) |
| def lint_sql(req: SqlRequest): |
| """ |
| Lint SQL using SQLFluff and return rule violations. |
| |
| Returns a list of violations with line/column info, rule codes, |
| severity, and whether each violation is auto-fixable. |
| """ |
| try: |
| dialect = req.dialect if req.dialect in SQLFLUFF_DIALECTS else "ansi" |
| linter = Linter(dialect=dialect) |
| result = linter.lint_string(req.sql) |
| violations = [] |
| for v in result.violations: |
| violations.append(LintViolation( |
| line_no=v.line_no, |
| line_pos=v.line_pos, |
| code=v.rule_code(), |
| description=v.desc(), |
| name=v.rule_code(), |
| warning=getattr(v, "warning", False), |
| fixable=getattr(v, "fixable", False), |
| )) |
| stats = { |
| "total": len(violations), |
| "errors": sum(1 for v in violations if not v.warning), |
| "warnings": sum(1 for v in violations if v.warning), |
| "fixable": sum(1 for v in violations if v.fixable), |
| } |
| return LintResponse( |
| dialect=dialect, |
| violations=violations, |
| passed=len(violations) == 0, |
| stats=stats, |
| ) |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Lint error: {str(e)}\n{traceback.format_exc()}") |
|
|
| @app.post("/api/parse", response_model=ParseResponse, tags=["Analysis"]) |
| def parse_sql(req: SqlRequest): |
| """ |
| Parse SQL into a full Abstract Syntax Tree (AST). |
| |
| Returns a recursive tree of nodes with type, name, raw token value, |
| position info, and child nodes. |
| """ |
| try: |
| dialect = req.dialect if req.dialect in SQLFLUFF_DIALECTS else "ansi" |
| linter = Linter(dialect=dialect) |
| parsed = linter.parse_string(req.sql) |
| _reset_counter() |
| tree = _segment_to_node(parsed.tree) |
| return ParseResponse( |
| dialect=dialect, |
| tree=tree, |
| token_count=_count_tokens(tree), |
| depth=_tree_depth(tree), |
| ) |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Parse error: {str(e)}\n{traceback.format_exc()}") |
|
|
| @app.post("/api/format", response_model=FormatResponse, tags=["Analysis"]) |
| def format_sql(req: SqlRequest): |
| """ |
| Format and auto-fix SQL using SQLFluff. |
| |
| Applies all auto-fixable rules and returns the cleaned SQL alongside |
| the original, with a count of fixes applied. |
| """ |
| try: |
| dialect = req.dialect if req.dialect in SQLFLUFF_DIALECTS else "ansi" |
| |
| |
| |
| linter = Linter(dialect=dialect, exclude_rules=["CV10"]) |
| lint_before = linter.lint_string(req.sql) |
| before_count = len(lint_before.violations) |
| parsed = linter.parse_string(req.sql) |
| fixed_tree, _ = linter.fix(parsed.tree) |
| formatted = fixed_tree.raw.strip() if fixed_tree else req.sql |
| lint_after = linter.lint_string(formatted) |
| after_count = len(lint_after.violations) |
| fixes_applied = max(0, before_count - after_count) |
| return FormatResponse( |
| dialect=dialect, |
| original=req.sql, |
| formatted=formatted, |
| changed=formatted != req.sql, |
| fixes_applied=fixes_applied, |
| ) |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Format error: {str(e)}\n{traceback.format_exc()}") |
|
|
| @app.post("/api/inject", response_model=InjectionResponse, tags=["Security"]) |
| def detect_injection(req: SqlRequest): |
| """ |
| Detect SQL injection patterns in the provided SQL string. |
| |
| Checks for tautologies, stacked queries, comment-based bypasses, |
| UNION-based injection, time-based blind injection, command execution, |
| destructive statements, hex obfuscation, and schema reconnaissance. |
| """ |
| try: |
| patterns = _detect_injection(req.sql) |
| seen: set[str] = set() |
| unique_patterns: list[InjectionPattern] = [] |
| for p in patterns: |
| if p.pattern_id not in seen: |
| seen.add(p.pattern_id) |
| unique_patterns.append(p) |
|
|
| score = min(100, sum(RISK_SCORE.get(p.risk_level, 0) for p in unique_patterns)) |
|
|
| if score == 0: |
| summary = "No injection patterns detected. The SQL appears safe." |
| elif score < 25: |
| summary = f"Low risk ({score}/100): Minor obfuscation or reconnaissance patterns found." |
| elif score < 50: |
| summary = f"Medium risk ({score}/100): Suspicious patterns detected. Review carefully." |
| elif score < 75: |
| summary = f"High risk ({score}/100): Multiple injection indicators found. Do not execute." |
| else: |
| summary = f"Critical risk ({score}/100): Severe injection patterns detected. This SQL is dangerous." |
|
|
| return InjectionResponse( |
| dialect=req.dialect, |
| safe=score == 0, |
| risk_score=score, |
| patterns=unique_patterns, |
| summary=summary, |
| ) |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Injection detection error: {str(e)}") |
|
|
| |
| |
| |
| |
| |
| |
|
|
| STATIC_DIR = Path(__file__).parent / "static" |
|
|
| if STATIC_DIR.exists(): |
| |
| assets_dir = STATIC_DIR / "assets" |
| if assets_dir.exists(): |
| app.mount("/assets", StaticFiles(directory=str(assets_dir)), name="assets") |
|
|
| @app.get("/{full_path:path}", include_in_schema=False) |
| async def serve_spa(full_path: str, request: Request): |
| """Serve the React SPA for all non-API routes.""" |
| |
| candidate = STATIC_DIR / full_path |
| if candidate.exists() and candidate.is_file(): |
| return FileResponse(str(candidate)) |
| |
| return FileResponse(str(STATIC_DIR / "index.html")) |
| else: |
| @app.get("/", include_in_schema=False) |
| async def no_static(): |
| return JSONResponse({ |
| "message": "SQL Analyzer API is running. Build the React frontend and place it in api/static/.", |
| "docs": "/docs", |
| "endpoints": ["/api/health", "/api/lint", "/api/parse", "/api/format", "/api/inject"], |
| }) |
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| import uvicorn |
| port = int(os.environ.get("PORT", os.environ.get("PYTHON_API_PORT", "7860"))) |
| uvicorn.run(app, host="0.0.0.0", port=port, log_level="info") |
|
|