Skip to content

Dependency

get_redis_pool(request=TaskiqDepends()) async

Returns connection pool.

You can use it like this:

from redis.asyncio import ConnectionPool, Redis

async def handler(redis_pool: ConnectionPool = Depends(get_redis_pool)): async with Redis(connection_pool=redis_pool) as redis: await redis.get('key')

I use pools, so you don't acquire connection till the end of the handler.

Parameters:

Name Type Description Default
request Request

current request.

TaskiqDepends()

Returns:

Type Description
AsyncGenerator[Redis, None]

redis connection pool.

Source code in hestia/services/redis/dependency.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
async def get_redis_pool(
    request: Request = TaskiqDepends(),
) -> AsyncGenerator[Redis, None]:  # type: ignore
    """
    Returns connection pool.

    You can use it like this:

    >>> from redis.asyncio import ConnectionPool, Redis
    >>>
    >>> async def handler(redis_pool: ConnectionPool = Depends(get_redis_pool)):
    >>>     async with Redis(connection_pool=redis_pool) as redis:
    >>>         await redis.get('key')

    I use pools, so you don't acquire connection till the end of the handler.

    :param request: current request.
    :returns:  redis connection pool.
    """
    return request.app.state.redis_pool