[Python]如何在Fastapi + Sqlalchemy中实现交易装饰员 - 审查其他方法
#网络开发人员 #python #database #fastapi

介绍

我想说的是,后端应用程序的80%以上与数据库有关。这是我们关心的数据(我夸大了太多吗?)。如您所知,任何数据操作(创建,更新和删除)都必须是交易的。

但是,由于应用程序的许多部分应参与交易,因此要考虑:我们如何将数据库交互层与应用程序的其他部分分开?例如,我们不想在MVC结构的服务层中明确调用DB COMMIT方法。

Java Spring已经有一个明智的解决方案:@Transactional annotation(我从阅读this book中得知)。通过使用此装饰器包装功能,我们在遵守干燥原理的同时,我们具有更干净,更脱钩的代码层。

但是,据我所知,Fastapi社区中没有相应的注释功能,我发现只有几篇文章可以用作参考:

因此,在本文中,我想回顾一下这两篇文章,以及如何根据这些文章来处理此功能。

备注:这里的代码示例使用了SQLalchemy的异步API。但是它也可以以几乎相同的方式应用于同步API。

简单直观的方法:Kosntantine dvalishvili的方法

发布url link

我在Google上搜索此主题时遇到了本文。如果您看到代码,它将尝试彻底遵循the flow of a possible transaction using the session API of SQLAlchemy

以下代码是作者原始代码的重写:

from typing import Optional, Callable
import functools
from contextvars import ContextVar

from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession

db_session_context: ContextVar[Optional[AsyncSession]]  = ContextVar("db_session", default=None)

engine = create_async_engine(url="FAKE_DB_URL")
session_factory = async_sessionmaker(bind=engine, autocommit=False, autoflush=False)


def transactional(func: Callable) -> Callable:
    @functools.wraps(func)
    def _wrapper(*args, **kwargs):
        db_session = db_session_context.get()
        if db_session:
            return func(*args, **kwargs)

        db_session = session_factory()
        db_session_context.set(db_session)

        try:
            result = func(*args, **kwargs)
            db_session.commit()

        except Exception as e:
            db_session.rollback()
            raise

        finally:
            db_session.close()
            db_session_context.set(None)
        return result

    return _wrapper

有关此代码的一件事是它在Python中使用contextvars STL。作者说,它是为了像全局变量一样访问当前会话。

但是,与作者不再提及的contextvars有一个非常重要的主题。由于任何后端申请都以同时的方式运行,因此我们应该manage our session in thread-safe way。根据SQLAlchemy documentation的说法,我们应该将当前会话与当前请求相关联,在这里我们在作者代码中没有看到太多考虑。

因此,在这里我们有以下问题要解决:如何将当前会话与传入请求联系起来?由于该文档强烈建议遵循后端框架提供的集成工具,而不是使用scoped_session API,因此我们需要研究FastAPI首先管理数据库会话。

插曲:Fastapi如何管理数据库会话

您可能已经知道,basic way FastAPI recommends is simply creating a new session for each request and close it when the request finishes its duty

以下代码是FastApi文档中原始代码的重写:

from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession

engine = create_async_engine(url="FAKE_DB_URL")
session_factory = async_sessionmaker(bind=engine, autocommit=False, autoflush=False)


def get_db_session():
    db_session = session_factory()
    try:
        yield db_session
    finally:
        db_session.close()

但是,这种方法与我们的装饰注释方法不兼容:我们希望将会话作为全局变量(全球但特定于本)。而且,由于我们也将在域逻辑中使用任何会话对象,因此数据库会话无法明确与请求进行交互。

同一文档页面提供了另一种方法:use middleware(一种旧方法,但适合我们的需求)。在中间软件中,我们可以直接触摸请求对象。在这里,我们应用了我们从Kosntantine dvalishvili的方法中学到的技术:将contextvars用于全球对象。

但是,等等,我们如何将请求映射到单个数据库会话?作为SQLAlchemy documentation pointed out,如果FastApi提供了这样的机制,那将是最好的。但是,目前似乎尚无此类功能。也就是说,我们必须自己提供这样的功能。

我们是否需要一个单独的“全局” python字典对象映射这两个?

session_table: dict[int, AsyncSession] = {
    <session_id>: <session_object>
}

current_db_session: AsyncSession = session_table[get_db_session_context()]

实际上,Sqlalchemy已经提供了这样的API,称为koude2(当然,我们将在这里使用async_scoped_session)。我前面提到的第二个参考文献提供了一个很好的例子(尽管它没有明确地将请求映射到使用hash的数据库会话中)。

范围范围的课程:hide的方法

发布url link

备注此参考是用韩语编写的,尽管您可以阅读代码并查看作者尝试实现的目标。

因此,这种方法基本上是结合了我之前提到的两个元素:使用contextvars访问当前会话,并将其与当前传入请求会话匹配。如果我们使用scoped_session,则使用用户in order to map the current context to one of the database sessions传递的scopefunc。在引擎盖下,只是a simple Python dictionary

因此,通过将函数get_session_context传递到参数scopefunc,我们可以将当前请求会话平滑地映射到单个数据库会话。

以下代码是作者原始代码的重写:

from typing import Callable
from contextvars import ContextVar, Token
import functools
from uuid import uuid4

from fastapi import Request
from sqlalchemy.ext.asyncio import (
  create_async_engine, 
  async_sessionmaker, 
  async_scoped_session
)

session_context: ContextVar[str] = ContextVar("session_context", default="")


def get_session_context() -> str:
    return session_context.get()


def set_session_context(session_id: str) -> Token:
    return session_context.set(session_id)


def reset_session_context(context: Token) -> None:
    session_context.reset(context)


engine = create_async_engine(url="YOUR_DB_URL", pool_recycle=3600)

AsyncScopedSession = async_scoped_session(
    async_sessionmaker(autocommit=True, autoflush=False, bind=engine),
    scopefunc=get_session_context,
)


async def middleware_function(request: Request, call_next):
    session_id = str(uuid4())
    context = set_session_context(session_id=session_id)
    session = AsyncScopedSession()

    try:
        response = await call_next(request)
    except Exception as e:
        session.rollback()
        raise e
    finally:
        session.remove()
        reset_session_context(context=context)

    return response


async def transactional(func: Callable) -> Callable:
    @functools.wraps(func) 
    async def _wrapper(*args, **kwargs):
        session = AsyncScopedSession()
        try:
            result = await func(*args, **kwargs)
            await session.commit()
        except Exception as e:
            await session.rollback() 
            raise e 
        finally: 
            await session.close()
        return result
    return _wrapper

但是,我们仍然可以在几点上改进此代码。

  • 在这里,作者使用uuid4函数来设置会话ID,但是由于我们已经有hash用于请求对象,因此我们只需使用hash(request)即可。这也适合调试,因为我们可以在出现问题时确定某个请求。
  • 正如第一种方法所做的那样,我们不想拥有嵌套的交易以进行更简单的设计。
  • 我们可以简单地使用单个上下文管理器包装所有这些显式commitrollbackclose方法:koude18

摘要:我的方法

因此,考虑到到目前为止我们进行的所有讨论,我们最终可以联系到这些简单的代码:

  • _session.py
from typing import Optional
from contextvars import ContextVar

from sqlalchemy.ext.asyncio import (
    create_async_engine,
    async_scoped_session,
    async_sessionmaker,
    AsyncSession,
)

from ..config import config

# some hints from: https://github.com/teamhide/fastapi-boilerplate/blob/master/core/db/session.py
db_session_context: ContextVar[Optional[int]] = ContextVar(
    "db_session_context", default=None
)
engine = create_async_engine(url=config.DB_URL)


def get_db_session_context() -> int:
    session_id = db_session_context.get()

    if not session_id:
        raise ValueError("Currently no session is available")

    return session_id


def set_db_session_context(*, session_id: int) -> None:
    db_session_context.set(session_id)


AsyncScopedSession = async_scoped_session(
    session_factory=async_sessionmaker(bind=engine, autoflush=False, autocommit=False),
    scopefunc=get_db_session_context,
)


def get_current_session() -> AsyncSession:
    return AsyncScopedSession()
  • utils.py
from typing import Callable, Awaitable, Any
import functools

from ..utils.logger import get_logger
from ._session import get_current_session, get_db_session_context


AsyncCallable = Callable[..., Awaitable]
logger = get_logger(filename=__file__)


def transactional(func: AsyncCallable) -> AsyncCallable:
    @functools.wraps(func)
    async def _wrapper(*args, **kwargs) -> Awaitable[Any]:
        try:
            db_session = get_current_session()

            if db_session.in_transaction():
                return await func(*args, **kwargs)

            async with db_session.begin():
                # automatically committed / rolled back thanks to the context manager
                return_value = await func(*args, **kwargs)

            return return_value
        except Exception as error:
            logger.info(f"request hash: {get_db_session_context()}")
            logger.exception(error)
            raise

    return _wrapper
  • middleware.py
from typing import Callable, Awaitable

from fastapi import Request, Response, status as HTTPStatus

from ._session import set_db_session_context, AsyncScopedSession


async def db_session_middleware_function(
    request: Request, call_next: Callable[[Request], Awaitable[Response]]
) -> Response:
    response = Response(
        "Internal server error", status_code=HTTPStatus.HTTP_500_INTERNAL_SERVER_ERROR
    )

    try:
        set_db_session_context(session_id=hash(request))
        response = await call_next(request)

    finally:
        await AsyncScopedSession.remove()  # this includes closing the session as well
        set_db_session_context(session_id=None)

    return response

要概括,我们在这里尝试获得以下功能列表

  • 使用contextvars(STL)和scoped_session(SQLalchemy)访问请求特定的数据库会话
  • 实现FastAPI中间件功能,以直接访问传入请求对象
  • 避免嵌套交易
  • 使用session.begin()的上下文管理器更简单的交易代码

感谢您阅读这篇长文章。如果您对此帖子有任何想法,请发表评论。祝你有美好的一天!