Skip to content

Test rabbit

test_message_publishing(fastapi_app, client, test_queue, test_exchange_name, test_routing_key) async

Tests that message is published correctly.

It sends message to rabbitmq and reads it from binded queue.

Source code in hestia/tests/test_rabbit.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@pytest.mark.anyio
async def test_message_publishing(
    fastapi_app: FastAPI,
    client: AsyncClient,
    test_queue: AbstractQueue,
    test_exchange_name: str,
    test_routing_key: str,
) -> None:
    """
    Tests that message is published correctly.

    It sends message to rabbitmq and reads it
    from binded queue.
    """
    message_text = uuid.uuid4().hex
    url = fastapi_app.url_path_for("send_rabbit_message")
    await client.post(
        url,
        json={
            "exchange_name": test_exchange_name,
            "routing_key": test_routing_key,
            "message": message_text,
        },
    )
    message = await test_queue.get(timeout=1)
    assert message is not None
    await message.ack()
    assert message.body.decode("utf-8") == message_text

test_message_wrong_exchange(fastapi_app, client, test_queue, test_exchange_name, test_routing_key, test_rmq_pool) async

Tests that message can be published in undeclared exchange.

It sends message to random queue, tries to get message from binded queue and checks that new exchange were created.

Source code in hestia/tests/test_rabbit.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
@pytest.mark.anyio
async def test_message_wrong_exchange(
    fastapi_app: FastAPI,
    client: AsyncClient,
    test_queue: AbstractQueue,
    test_exchange_name: str,
    test_routing_key: str,
    test_rmq_pool: Pool[Channel],
) -> None:
    """
    Tests that message can be published in undeclared exchange.

    It sends message to random queue,
    tries to get message from binded queue
    and checks that new exchange were created.
    """
    random_exchange = uuid.uuid4().hex
    assert random_exchange != test_exchange_name
    message_text = uuid.uuid4().hex
    url = fastapi_app.url_path_for("send_rabbit_message")
    await client.post(
        url,
        json={
            "exchange_name": random_exchange,
            "routing_key": test_routing_key,
            "message": message_text,
        },
    )
    with pytest.raises(QueueEmpty):
        await test_queue.get(timeout=1)

    async with test_rmq_pool.acquire() as conn:
        exchange = await conn.get_exchange(random_exchange, ensure=True)
        await exchange.delete(if_unused=False)