RabbitMQ¶
In order to enable RabbitMQ support you have to add
testsuite.rabbitmq.pytest_plugin
to pytest_plugins
list in your
conftest.py
.
By default testsuite starts RabbitMQ service. In this case RabbitMQ installation is required.
Currently RabbitMQ plugin uses async aio-pika driver.
RabbitMQ installation¶
Consult official docs at https://www.rabbitmq.com/download.html
If you already have RabbitMQ installed and its location differs from
/usr/lib/rabbitmq please specify
TESTSUITE_RABBITMQ_BINDIR
environment variable accordingly.
On MacOS just install Rabbitmq with brew
: brew install rabbitmq
Environment variables¶
TESTSUITE_RABBITMQ_BINDIR¶
Use to override rabbitmq binary dir.
Default is /usr/lib/rabbitmq/bin/
for Linux
and /opt/homebrew/sbin/
for MacOS
TESTSUITE_RABBITMQ_TCP_PORT¶
Use to override rabbitmq server port. Default is 8672
.
TESTSUITE_RABBITMQ_EPMD_PORT¶
Use to override rabbitmq epmd port. Default is 8673
.
TESTSUITE_RABBITMQ_SERVER_START_TIMEOUT¶
By default testsuite will wait for up to 10s for RabbitMQ to start,
one may customize this timeout via environment variable TESTSUITE_RABBITMQ_SERVER_START_TIMEOUT
.
Customize ports¶
Testsuite may start RabbitMQ with custom ports if
TESTSUITE_RABBITMQ_TCP_PORT
or TESTSUITE_RABBITMQ_EPMD_PORT
environment variables are specified.
Use external instance¶
Usage of external instance is not officially supported for now,
but if your instance is local you may try setting environment variable
TESTSUITE_RABBITMQ_TCP_PORT
and pytest option --rabbitmq=1
and see if it works.
Usage example¶
async def test_rabbitmq_basic(rabbitmq):
exchange = 'testsuite_exchange'
queue = 'testsuite_queue'
routing_key = 'testsuite_routing_key'
channel = await rabbitmq.get_channel()
async with channel:
await channel.declare_exchange(
exchange=exchange, exchange_type='fanout'
)
await channel.declare_queue(queue=queue)
await channel.bind_queue(
queue=queue, exchange=exchange, routing_key=routing_key
)
await channel.publish(
exchange=exchange,
routing_key=routing_key,
body=b'hi from testsuite!',
)
messages = await channel.consume(queue=queue, count=1)
assert messages == [b'hi from testsuite!']
Fixtures¶
rabbitmq¶
- testsuite.databases.rabbitmq.pytest_plugin.rabbitmq()[source]
Classes¶
- class testsuite.databases.rabbitmq.classes.ConnectionInfo[source]
RabbitMQ connection parameters