Skip to content

Dependencies

get_rmq_channel_pool(request=TaskiqDepends())

Get channel pool from the state.

Parameters:

Name Type Description Default
request Request

current request.

TaskiqDepends()

Returns:

Type Description
Pool[Channel]

channel pool.

Source code in hestia/services/rabbit/dependencies.py
 7
 8
 9
10
11
12
13
14
15
16
def get_rmq_channel_pool(
    request: Request = TaskiqDepends(),
) -> Pool[Channel]:  # pragma: no cover
    """
    Get channel pool from the state.

    :param request: current request.
    :return: channel pool.
    """
    return request.app.state.rmq_channel_pool