Fastapi的自定义中间件
#网络开发人员 #python #fastapi #middleware

本文介绍了为FastAPI应用程序设计的自定义中间件。中间件有几个目的:

  1. 令牌身份验证:它使用Azure AD JWKS端点验证JWT令牌,确保用户得到身份验证。
  2. content-type处理:它处理JavaScript文件的内容类型。
  3. 服务UI文件:它提供UI文件,并提供了为其他请求提供index.html文件的后备。

成分

环境变量

中间件利用以下环境变量:

  • AZURE_CLIENT_ID:Azure Client ID。
  • AZURE_CLIENT_SECRET:Azure Client Secret。
  • AZURE_TENANT_ID:Azure房客ID。
  • EXPECTED_AUDIENCE:JWT令牌的预期听众。
  • EXPECTED_ISSUER:使用AZURE_TENANT_ID构建的JWT令牌的预期发行者。

功能

error_response(error_msg: str, status_code: int) -> JSONResponse

此功能记录了错误消息,并返回带有错误消息的JSONRESPONSE。它用于生成标准错误响应。

validate_token(token: str) -> bool

此功能使用Azure AD JWKS端点验证JWT令牌。如果令牌有效,则返回true,否则为false。

自定义中间件类

class CustomMiddleware(BaseHTTPMiddleware)

此类包含中间件的核心逻辑。

  • dispatch方法:此方法处理请求并设置JavaScript文件的内容类型标头或将index.html文件作为其他请求的后备。

工作流程

  1. 令牌验证:如果请求URL以“/api”开头,则中间件检查授权标头并使用validate_token函数验证令牌。
  2. 文件服务:如果请求URL不以“/api”开头,则中间件检查是否存在请求的文件并使用适当的内容类型进行服务。
  3. index.html的后备:如果找不到请求的文件,则中间件服务于index.html文件。

代码

"""
Custom Middleware for FastAPI Application

This module provides a custom middleware class for FastAPI applications, focused on handling authentication and serving UI files.

- Token Authentication: Validates JWT tokens using the Azure AD JWKS endpoint.
- Content-Type Handling: Manages the content type of JavaScript files.
- Serving UI Files: Serves UI files and provides a fallback to serve the index.html file for other requests.

Environment Variables:
    AZURE_CLIENT_ID: Azure client ID.
    AZURE_CLIENT_SECRET: Azure client secret.
    AZURE_TENANT_ID: Azure tenant ID.
    EXPECTED_AUDIENCE: Expected audience for the JWT token.
    EXPECTED_ISSUER: Expected issuer for the JWT token.

Functions:
    error_response(error_msg: str, status_code: int) -> JSONResponse
    validate_token(token: str) -> bool

Classes:
    CustomMiddleware(BaseHTTPMiddleware): Custom middleware class.

Usage:
    Add the CustomMiddleware class to your FastAPI application's middleware stack.
"""

import os
import jwt
import json
import logging
import requests
import mimetypes


from starlette.requests import Request
from typing import Callable, Coroutine, Any
from starlette.responses import Response, JSONResponse
from starlette.middleware.base import BaseHTTPMiddleware

UI_DIR = "ui/dist"
UI_PATH = "ui/dist/index.html"
AZURE_CLIENT_ID = os.getenv("AZURE_CLIENT_ID")
AZURE_CLIENT_SECRET = os.getenv("AZURE_CLIENT_SECRET")
AZURE_TENANT_ID = os.getenv("AZURE_TENANT_ID")
EXPECTED_AUDIENCE = os.getenv("EXPECTED_AUDIENCE")
EXPECTED_ISSUER = f"https://sts.windows.net/{AZURE_TENANT_ID}/"
logger = logging.getLogger("CustomMiddleware")


def error_response(error_msg: str, status_code: int) -> JSONResponse:
    """
    Logs the error message and returns a JSONResponse with the error message.

    Args:
        error_msg (str): the error message to be logged and returned in the JSONResponse
        status_code (int): the HTTP status code

    Returns:
        JSONResponse: JSONResponse containing the error message
    """
    logger.error(error_msg)
    return JSONResponse(
        content={"error": {"message": error_msg}},
        status_code=status_code,
    )


def validate_token(token: str) -> bool:
    """
    Validate a JWT token using the Azure AD JWKS endpoint.

    Args:
        token (str): The JWT token to be validated.

    Returns:
        bool: True if the token is valid, False otherwise.
    """
    jwks_uri = f"https://login.microsoftonline.com/{AZURE_TENANT_ID}/discovery/v2.0/keys"

    jwks = json.loads(requests.get(jwks_uri).text)
    header = jwt.get_unverified_header(token)

    signing_key = None
    for key in jwks["keys"]:
        if key["kid"] == header["kid"]:
            signing_key = jwt.algorithms.RSAAlgorithm.from_jwk(json.dumps(key))
            break

    if not signing_key:
        return False

    try:
        jwt.decode(
            token,
            signing_key,
            algorithms=["RS256"],
            audience=EXPECTED_AUDIENCE,
            issuer=EXPECTED_ISSUER,
        )
        return True
    except Exception as e:
        logger.error(f"Token validation failed: {e}")
        return False


class CustomMiddleware(BaseHTTPMiddleware):
    """
    Custom middleware for handling content-type of JavaScript files and serving
    index.html as a fallback for other requests.
    """

    async def dispatch(
        self,
        request: Request,
        call_next: Callable[[Request], Coroutine[Any, Any, Response]],
    ) -> Response:
        """
        Process the request and set the content-type header for JavaScript files or
        serve the index.html file as a fallback for other requests.

        Args:
            request (Request): The incoming request.
            call_next (Callable): The next middleware or handler in the stack.

        Returns:
            Response: The generated response.
        """
        logger.debug(f"Request URL path: {request.url.path}")
        if request.url.path.startswith("/api"):
            try:
                if "Authorization" not in request.headers:
                    return error_response("Unauthorized", 401)
                token_header = request.headers["Authorization"]
                if token_header.startswith("Bearer "):
                    token = token_header.split("Bearer ")[-1]
                    if not validate_token(token):
                        return error_response("Unauthorized", 401)
                else:
                    return error_response("Token should begin with Bearer", 400)
            except Exception as e:
                return error_response(f"{e}", 400)
            response = await call_next(request)
            return response

        file_path = os.path.join(UI_DIR, request.url.path.lstrip("/"))

        if os.path.isfile(file_path):
            with open(file_path, "rb") as file:
                content_type, _ = mimetypes.guess_type(file_path)
                return Response(content=file.read(), media_type=content_type)

        response = await call_next(request)
        if response.status_code == 404:
            with open(UI_PATH, "rb") as file:
                return Response(content=file.read(), media_type="text/html")
        return response

结论

此自定义中间件提供了一个统一的解决方案,用于在FastAPI应用程序中处理身份验证和服务静态文件。使用JWT进行身份验证提供了安全性,而集成的方法用于服务静态文件可以简化应用程序结构。