Skip to content

Conftest

anyio_backend()

Backend for anyio pytest plugin.

Returns:

Type Description
str

backend name.

Source code in hestia/conftest.py
30
31
32
33
34
35
36
37
@pytest.fixture(scope="session")
def anyio_backend() -> str:
    """
    Backend for anyio pytest plugin.

    :return: backend name.
    """
    return "asyncio"

client(fastapi_app, anyio_backend) async

Fixture that creates client for requesting server.

:yield: client for the app.

Parameters:

Name Type Description Default
fastapi_app FastAPI

the application.

required
Source code in hestia/conftest.py
209
210
211
212
213
214
215
216
217
218
219
220
221
@pytest.fixture
async def client(
    fastapi_app: FastAPI,
    anyio_backend: Any,
) -> AsyncGenerator[AsyncClient, None]:
    """
    Fixture that creates client for requesting server.

    :param fastapi_app: the application.
    :yield: client for the app.
    """
    async with AsyncClient(app=fastapi_app, base_url="http://test") as ac:
        yield ac

dbsession(_engine) async

Get session to database.

Fixture that returns a SQLAlchemy session with a SAVEPOINT, and the rollback to it after the test completes.

:yields: async session.

Parameters:

Name Type Description Default
_engine AsyncEngine

current engine.

required
Source code in hestia/conftest.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
@pytest.fixture
async def dbsession(
    _engine: AsyncEngine,
) -> AsyncGenerator[AsyncSession, None]:
    """
    Get session to database.

    Fixture that returns a SQLAlchemy session with a SAVEPOINT, and the rollback to it
    after the test completes.

    :param _engine: current engine.
    :yields: async session.
    """
    connection = await _engine.connect()
    trans = await connection.begin()

    session_maker = async_sessionmaker(
        connection,
        expire_on_commit=False,
    )
    session = session_maker()

    try:
        yield session
    finally:
        await session.close()
        await trans.rollback()
        await connection.close()

fake_redis_pool() async

Get instance of a fake redis.

:yield: FakeRedis instance.

Source code in hestia/conftest.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
@pytest.fixture
async def fake_redis_pool() -> AsyncGenerator[ConnectionPool, None]:
    """
    Get instance of a fake redis.

    :yield: FakeRedis instance.
    """
    server = FakeServer()
    server.connected = True
    pool = ConnectionPool(connection_class=FakeConnection, server=server)

    yield pool

    await pool.disconnect()

fastapi_app(dbsession, fake_redis_pool, test_rmq_pool)

Fixture for creating FastAPI app.

Returns:

Type Description
FastAPI

fastapi app with mocked dependencies.

Source code in hestia/conftest.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
@pytest.fixture
def fastapi_app(
    dbsession: AsyncSession,
    fake_redis_pool: ConnectionPool,
    test_rmq_pool: Pool[Channel],
) -> FastAPI:
    """
    Fixture for creating FastAPI app.

    :return: fastapi app with mocked dependencies.
    """
    application = get_app()
    application.dependency_overrides[get_db_session] = lambda: dbsession
    application.dependency_overrides[get_redis_pool] = lambda: fake_redis_pool
    application.dependency_overrides[get_rmq_channel_pool] = lambda: test_rmq_pool
    return application  # noqa: WPS331

test_exchange(test_exchange_name, test_rmq_pool) async

Creates test exchange.

:yield: created exchange.

Parameters:

Name Type Description Default
test_exchange_name str

name of an exchange to create.

required
test_rmq_pool Pool[Channel]

channel pool for rabbitmq.

required
Source code in hestia/conftest.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
@pytest.fixture
async def test_exchange(
    test_exchange_name: str,
    test_rmq_pool: Pool[Channel],
) -> AsyncGenerator[AbstractExchange, None]:
    """
    Creates test exchange.

    :param test_exchange_name: name of an exchange to create.
    :param test_rmq_pool: channel pool for rabbitmq.
    :yield: created exchange.
    """
    async with test_rmq_pool.acquire() as conn:
        exchange = await conn.declare_exchange(
            name=test_exchange_name,
            auto_delete=True,
        )
        yield exchange

        await exchange.delete(if_unused=False)

test_exchange_name() async

Name of an exchange to use in tests.

Returns:

Type Description
str

name of an exchange.

Source code in hestia/conftest.py
108
109
110
111
112
113
114
115
@pytest.fixture
async def test_exchange_name() -> str:
    """
    Name of an exchange to use in tests.

    :return: name of an exchange.
    """
    return uuid.uuid4().hex

test_queue(test_exchange, test_rmq_pool, test_routing_key) async

Creates queue connected to exchange.

:yield: queue binded to test exchange.

Parameters:

Name Type Description Default
test_exchange AbstractExchange

exchange to bind queue to.

required
test_rmq_pool Pool[Channel]

channel pool for rabbitmq.

required
test_routing_key str

routing key to use while binding.

required
Source code in hestia/conftest.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
@pytest.fixture
async def test_queue(
    test_exchange: AbstractExchange,
    test_rmq_pool: Pool[Channel],
    test_routing_key: str,
) -> AsyncGenerator[AbstractQueue, None]:
    """
    Creates queue connected to exchange.

    :param test_exchange: exchange to bind queue to.
    :param test_rmq_pool: channel pool for rabbitmq.
    :param test_routing_key: routing key to use while binding.
    :yield: queue binded to test exchange.
    """
    async with test_rmq_pool.acquire() as conn:
        queue = await conn.declare_queue(name=uuid.uuid4().hex)
        await queue.bind(
            exchange=test_exchange,
            routing_key=test_routing_key,
        )
        yield queue

        await queue.delete(if_unused=False, if_empty=False)

test_rmq_pool() async

Create rabbitMQ pool.

:yield: channel pool.

Source code in hestia/conftest.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
@pytest.fixture
async def test_rmq_pool() -> AsyncGenerator[Channel, None]:
    """
    Create rabbitMQ pool.

    :yield: channel pool.
    """
    app_mock = Mock()
    init_rabbit(app_mock)
    yield app_mock.state.rmq_channel_pool
    await shutdown_rabbit(app_mock)

test_routing_key() async

Name of routing key to use while binding test queue.

Returns:

Type Description
str

key string.

Source code in hestia/conftest.py
118
119
120
121
122
123
124
125
@pytest.fixture
async def test_routing_key() -> str:
    """
    Name of routing key to use while binding test queue.

    :return: key string.
    """
    return uuid.uuid4().hex