Overview
Affected Versions
FastAPI: <0.65.2 (CVE-2021-32677); OPA middleware: <2.0.1 (CVE-2024-40627); fastapi-admin pro: v0.1.4 (CVE-2024-42816)
Code Fix Example
Vulnerable pattern and fix in Python FastAPI:
# Vulnerable pattern
from fastapi import FastAPI, Request, HTTPException
import json
from fastapi.responses import JSONResponse, HTMLResponse
app = FastAPI()
@app.post('/vuln/process')
async def vuln_process(req: Request):
data = await req.json() # vulnerable: parses JSON regardless of Content-Type
return {'received': data}
@app.get('/vuln/product')
async def vuln_product(name: str):
html = f'<html><body><h1>Product: {name}</h1></body></html>'
return HTMLResponse(html)
# Fixed pattern
@app.post('/fix/process')
async def fix_process(req: Request):
body = await req.body()
if len(body) > 2_000_000:
raise HTTPException(status_code=413, detail='Payload too large')
content_type = req.headers.get('content-type', '').split(';')[0].strip()
if content_type not in ('application/json', 'application/geo+json'):
raise HTTPException(status_code=415, detail='Unsupported Media Type')
data = json.loads(body.decode())
return {'received': data}
from markupsafe import escape
@app.get('/fix/product')
async def fix_product(name: str):
safe = escape(name)
html = f'<html><body><h1>Product: {safe}</h1></body></html>'
return HTMLResponse(html)
# OPA middleware vulnerable
class OpaMiddlewareVuln:
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
from starlette.requests import Request
if scope['type'] != 'http':
await self.app(scope, receive, send)
return
request = Request(scope, receive)
if request.method == 'OPTIONS':
await self.app(scope, receive, send) # bypass policy: vulnerability
return
if not policy_check(request):
response = JSONResponse({'detail': 'Forbidden'}, status_code=403)
await response(scope, receive, send)
return
await self.app(scope, receive, send)
def policy_check(request: Request) -> bool:
# simple placeholder policy: allow GET/POST only
if request.method not in ('GET', 'POST'):
return False
return True
vuln_app = FastAPI()
vuln_app.add_middleware(OpaMiddlewareVuln)
@vuln_app.get('/items/{item_id}')
async def read_item(item_id: str):
return {'item_id': item_id}
# OPA secure: fix by enforcing policy on OPTIONS as well
class OpaMiddlewareSecure:
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
from starlette.requests import Request
if scope['type'] != 'http':
await self.app(scope, receive, send)
return
request = Request(scope, receive)
if not policy_check_secure(request):
from starlette.responses import JSONResponse
response = JSONResponse({'detail': 'Forbidden'}, status_code=403)
await response(scope, receive, send)
return
await self.app(scope, receive, send)
def policy_check_secure(request: Request) -> bool:
# policy for all methods including OPTIONS
return request.method in ('GET', 'POST', 'PUT', 'PATCH', 'DELETE', 'OPTIONS')
fixed_app = FastAPI()
fixed_app.add_middleware(OpaMiddlewareSecure)
@fixed_app.get('/items/{item_id}')
async def read_item_fixed(item_id: str):
return {'item_id': item_id}
app = FastAPI()
app.mount('/vuln', vuln_app)
app.mount('/fix', fixed_app)