介绍
我想说的是,后端应用程序的80%以上与数据库有关。这是我们关心的数据(我夸大了太多吗?)。如您所知,任何数据操作(创建,更新和删除)都必须是交易的。
但是,由于应用程序的许多部分应参与交易,因此要考虑:我们如何将数据库交互层与应用程序的其他部分分开?例如,我们不想在MVC结构的服务层中明确调用DB COMMIT方法。
Java Spring已经有一个明智的解决方案:@Transactional annotation(我从阅读this book中得知)。通过使用此装饰器包装功能,我们在遵守干燥原理的同时,我们具有更干净,更脱钩的代码层。
但是,据我所知,Fastapi社区中没有相应的注释功能,我发现只有几篇文章可以用作参考:
- https://medium.com/@konstantine.dvalishvil/transactional-methods-fastapi-sqlalchemy-6f33370b95dd
- https://www.hides.kr/1103
因此,在本文中,我想回顾一下这两篇文章,以及如何根据这些文章来处理此功能。
备注:这里的代码示例使用了SQLalchemy的异步API。但是它也可以以几乎相同的方式应用于同步API。
简单直观的方法:Kosntantine dvalishvili的方法
发布url :link
我在Google上搜索此主题时遇到了本文。如果您看到代码,它将尝试彻底遵循the flow of a possible transaction using the session API of SQLAlchemy。
以下代码是作者原始代码的重写:
from typing import Optional, Callable
import functools
from contextvars import ContextVar
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
db_session_context: ContextVar[Optional[AsyncSession]] = ContextVar("db_session", default=None)
engine = create_async_engine(url="FAKE_DB_URL")
session_factory = async_sessionmaker(bind=engine, autocommit=False, autoflush=False)
def transactional(func: Callable) -> Callable:
@functools.wraps(func)
def _wrapper(*args, **kwargs):
db_session = db_session_context.get()
if db_session:
return func(*args, **kwargs)
db_session = session_factory()
db_session_context.set(db_session)
try:
result = func(*args, **kwargs)
db_session.commit()
except Exception as e:
db_session.rollback()
raise
finally:
db_session.close()
db_session_context.set(None)
return result
return _wrapper
有关此代码的一件事是它在Python中使用contextvars
STL。作者说,它是为了像全局变量一样访问当前会话。
但是,与作者不再提及的contextvars
有一个非常重要的主题。由于任何后端申请都以同时的方式运行,因此我们应该manage our session in thread-safe way。根据SQLAlchemy documentation的说法,我们应该将当前会话与当前请求相关联,在这里我们在作者代码中没有看到太多考虑。
因此,在这里我们有以下问题要解决:如何将当前会话与传入请求联系起来?由于该文档强烈建议遵循后端框架提供的集成工具,而不是使用scoped_session
API,因此我们需要研究FastAPI首先管理数据库会话。
插曲:Fastapi如何管理数据库会话
以下代码是FastApi文档中原始代码的重写:
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
engine = create_async_engine(url="FAKE_DB_URL")
session_factory = async_sessionmaker(bind=engine, autocommit=False, autoflush=False)
def get_db_session():
db_session = session_factory()
try:
yield db_session
finally:
db_session.close()
但是,这种方法与我们的装饰注释方法不兼容:我们希望将会话作为全局变量(全球但特定于本)。而且,由于我们也将在域逻辑中使用任何会话对象,因此数据库会话无法明确与请求进行交互。
同一文档页面提供了另一种方法:use middleware(一种旧方法,但适合我们的需求)。在中间软件中,我们可以直接触摸请求对象。在这里,我们应用了我们从Kosntantine dvalishvili的方法中学到的技术:将contextvars
用于全球对象。
但是,等等,我们如何将请求映射到单个数据库会话?作为SQLAlchemy documentation pointed out,如果FastApi提供了这样的机制,那将是最好的。但是,目前似乎尚无此类功能。也就是说,我们必须自己提供这样的功能。
我们是否需要一个单独的“全局” python字典对象映射这两个?
session_table: dict[int, AsyncSession] = {
<session_id>: <session_object>
}
current_db_session: AsyncSession = session_table[get_db_session_context()]
实际上,Sqlalchemy已经提供了这样的API,称为koude2(当然,我们将在这里使用async_scoped_session
)。我前面提到的第二个参考文献提供了一个很好的例子(尽管它没有明确地将请求映射到使用hash
的数据库会话中)。
范围范围的课程:hide的方法
发布url :link
备注:此参考是用韩语编写的,尽管您可以阅读代码并查看作者尝试实现的目标。
因此,这种方法基本上是结合了我之前提到的两个元素:使用contextvars
访问当前会话,并将其与当前传入请求会话匹配。如果我们使用scoped_session
,则使用用户in order to map the current context to one of the database sessions传递的scopefunc
。在引擎盖下,只是a simple Python dictionary。
因此,通过将函数get_session_context
传递到参数scopefunc
,我们可以将当前请求会话平滑地映射到单个数据库会话。
以下代码是作者原始代码的重写:
from typing import Callable
from contextvars import ContextVar, Token
import functools
from uuid import uuid4
from fastapi import Request
from sqlalchemy.ext.asyncio import (
create_async_engine,
async_sessionmaker,
async_scoped_session
)
session_context: ContextVar[str] = ContextVar("session_context", default="")
def get_session_context() -> str:
return session_context.get()
def set_session_context(session_id: str) -> Token:
return session_context.set(session_id)
def reset_session_context(context: Token) -> None:
session_context.reset(context)
engine = create_async_engine(url="YOUR_DB_URL", pool_recycle=3600)
AsyncScopedSession = async_scoped_session(
async_sessionmaker(autocommit=True, autoflush=False, bind=engine),
scopefunc=get_session_context,
)
async def middleware_function(request: Request, call_next):
session_id = str(uuid4())
context = set_session_context(session_id=session_id)
session = AsyncScopedSession()
try:
response = await call_next(request)
except Exception as e:
session.rollback()
raise e
finally:
session.remove()
reset_session_context(context=context)
return response
async def transactional(func: Callable) -> Callable:
@functools.wraps(func)
async def _wrapper(*args, **kwargs):
session = AsyncScopedSession()
try:
result = await func(*args, **kwargs)
await session.commit()
except Exception as e:
await session.rollback()
raise e
finally:
await session.close()
return result
return _wrapper
但是,我们仍然可以在几点上改进此代码。
- 在这里,作者使用
uuid4
函数来设置会话ID,但是由于我们已经有hash
用于请求对象,因此我们只需使用hash(request)
即可。这也适合调试,因为我们可以在出现问题时确定某个请求。 - 正如第一种方法所做的那样,我们不想拥有嵌套的交易以进行更简单的设计。
- 我们可以简单地使用单个上下文管理器包装所有这些显式
commit
,rollback
或close
方法:koude18
摘要:我的方法
因此,考虑到到目前为止我们进行的所有讨论,我们最终可以联系到这些简单的代码:
-
_session.py
:
from typing import Optional
from contextvars import ContextVar
from sqlalchemy.ext.asyncio import (
create_async_engine,
async_scoped_session,
async_sessionmaker,
AsyncSession,
)
from ..config import config
# some hints from: https://github.com/teamhide/fastapi-boilerplate/blob/master/core/db/session.py
db_session_context: ContextVar[Optional[int]] = ContextVar(
"db_session_context", default=None
)
engine = create_async_engine(url=config.DB_URL)
def get_db_session_context() -> int:
session_id = db_session_context.get()
if not session_id:
raise ValueError("Currently no session is available")
return session_id
def set_db_session_context(*, session_id: int) -> None:
db_session_context.set(session_id)
AsyncScopedSession = async_scoped_session(
session_factory=async_sessionmaker(bind=engine, autoflush=False, autocommit=False),
scopefunc=get_db_session_context,
)
def get_current_session() -> AsyncSession:
return AsyncScopedSession()
-
utils.py
from typing import Callable, Awaitable, Any
import functools
from ..utils.logger import get_logger
from ._session import get_current_session, get_db_session_context
AsyncCallable = Callable[..., Awaitable]
logger = get_logger(filename=__file__)
def transactional(func: AsyncCallable) -> AsyncCallable:
@functools.wraps(func)
async def _wrapper(*args, **kwargs) -> Awaitable[Any]:
try:
db_session = get_current_session()
if db_session.in_transaction():
return await func(*args, **kwargs)
async with db_session.begin():
# automatically committed / rolled back thanks to the context manager
return_value = await func(*args, **kwargs)
return return_value
except Exception as error:
logger.info(f"request hash: {get_db_session_context()}")
logger.exception(error)
raise
return _wrapper
-
middleware.py
from typing import Callable, Awaitable
from fastapi import Request, Response, status as HTTPStatus
from ._session import set_db_session_context, AsyncScopedSession
async def db_session_middleware_function(
request: Request, call_next: Callable[[Request], Awaitable[Response]]
) -> Response:
response = Response(
"Internal server error", status_code=HTTPStatus.HTTP_500_INTERNAL_SERVER_ERROR
)
try:
set_db_session_context(session_id=hash(request))
response = await call_next(request)
finally:
await AsyncScopedSession.remove() # this includes closing the session as well
set_db_session_context(session_id=None)
return response
要概括,我们在这里尝试获得以下功能列表
- 使用
contextvars
(STL)和scoped_session
(SQLalchemy)访问请求特定的数据库会话 - 实现FastAPI中间件功能,以直接访问传入请求对象
- 避免嵌套交易
- 使用
session.begin()
的上下文管理器更简单的交易代码
感谢您阅读这篇长文章。如果您对此帖子有任何想法,请发表评论。祝你有美好的一天!