Skip to content

Dependencies

This file defines SQLAlchemy database dependencies for the application.

It contains functions and types related to database operations, such as obtaining a database session, managing transactions, and handling database-related dependencies.

This file contributes to the overall database operations and management in the application.

Note: The specific implementation details related to the database session factory and the configuration of the application state may vary based on your actual setup.

get_db_session(request=TaskiqDepends()) async

Create and get database session.

:yield: database session.

Parameters:

Name Type Description Default
request Request

current request.

TaskiqDepends()
Source code in hestia/db/dependencies.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
async def get_db_session(
    request: Request = TaskiqDepends(),
) -> AsyncGenerator[AsyncSession, None]:
    """
    Create and get database session.

    :param request: current request.
    :yield: database session.
    """
    session: AsyncSession = request.app.state.db_session_factory()

    try:  # noqa: WPS501
        yield session
    finally:
        await session.commit()
        await session.close()