Kafka¶
In order to enable Kafka support you have to add
testsuite.kafka.pytest_plugin to pytest_plugins list in your
conftest.py.
By default testsuite starts Kafka service. In this case Kafka installation is required.
Currently Kafka plugin uses async aiokafka driver.
Kafka installation¶
Consult official docs at https://kafka.apache.org/quickstart
If you already have Kafka installed and its location differs from
/etc/kafka please specify
KAFKA_HOME environment variable accordingly.
On MacOS just install Kafka with brew: brew install kafka
Installed Kafka must support KRaft protocol.
Environment variables¶
KAFKA_HOME¶
Use to override Kafka binaries dir.
Default is /etc/kafka for Linux and /opt/homebrew/opt/kafka/libexec for MacOS
TESTSUITE_KAFKA_SERVER_HOST¶
Use to override Kafka server host. Default is localhost.
TESTSUITE_KAFKA_SERVER_PORT¶
Use to override Kafka server port. Default is 9099.
TESTSUITE_KAFKA_CONTROLLER_PORT¶
Use to override Kafka controller port. Default is 9100.
TESTSUITE_KAFKA_SERVER_START_TIMEOUT¶
By default testsuite will wait for up to 10s for Kafka to start,
one may customize this timeout via environment variable TESTSUITE_KAFKA_SERVER_START_TIMEOUT.
TESTSUITE_KAFKA_CUSTOM_TOPICS¶
All topics in tests are created automatically by Kafka broker in runtime with only 1 partition.
To create topics with several partitions, either specify TESTSUITE_KAFKA_CUSTOM_TOPICS environment
variable with the , separated list of topic to partitions count mapping or override the kafka_custom_topics fixture.
For example, TESTSUITE_KAFKA_CUSTOM_TOPICS=large-topic-1:7,large-topic-2:20
creates topic large-topic-1 with 7 partitions and large-topic-2 with 20 partitions.
Customize ports¶
Testsuite may start Kafka with custom ports if
TESTSUITE_KAFKA_SERVER_PORT or TESTSUITE_KAFKA_CONTROLLER_PORT
environment variables are specified.
Use external instance¶
If your instance is local you may try setting environment variable
TESTSUITE_KAFKA_SERVER_PORT and pytest option --kafka=1
and see if it works.
P.S. Topics creation remains on the user’s side.
Usage example¶
async def test_kafka_producer_consumer_chain(kafka_producer, kafka_consumer):
TOPIC = 'Test-topic-chain'
KEY = 'test-key'
MESSAGE = 'test-message'
await kafka_producer.send(TOPIC, KEY, MESSAGE)
consumed_message = await kafka_consumer.receive_one([TOPIC])
assert consumed_message.topic == TOPIC
assert consumed_message.key == KEY
assert consumed_message.value == MESSAGE
Example integration¶
pytest_plugins = [
'testsuite.pytest_plugin',
'testsuite.databases.kafka.pytest_plugin',
]
KAFKA_CUSTOM_TOPICS = {
'Large-topic-1': 7,
'Large-topic-2': 3,
}
@pytest.fixture(scope='session')
def kafka_custom_topics():
return KAFKA_CUSTOM_TOPICS
Fixtures¶
kafka_producer¶
- async for ... in testsuite.databases.kafka.pytest_plugin.kafka_producer()[source]
Per test Kafka producer instance.
kafka_consumer¶
- async for ... in testsuite.databases.kafka.pytest_plugin.kafka_consumer()[source]
Per test Kafka consumer instance.
kafka_custom_topics¶
- testsuite.databases.kafka.pytest_plugin.kafka_custom_topics()[source]
Redefine this fixture to pass your custom dictionary of topics’ settings.
kafka_local¶
- testsuite.databases.kafka.pytest_plugin.kafka_local()[source]
Override to use custom local cluster bootstrap servers. If not empty, no service started.
Classes¶
- class testsuite.databases.kafka.classes.KafkaProducer[source]¶
Kafka producer wrapper.
- await send(topic: str, key: str | bytes, value: str | bytes, partition: int | None = None, headers: Sequence[tuple[str, bytes]] | None = None)[source]¶
Sends the message (
value) totopicbykeyand, optionally, to a givenpartitionand waits until it is delivered. If the call is successfully awaited, message is guaranteed to be delivered.- Parameters:
topic – topic name.
key – key. Needed to determine message’s partition.
value – message payload. Must be valid UTF-8.
partition – Optional message partition. If not passed, determined by internal partitioner depends on key’s hash.
- await send_async(topic: str, key: str | bytes, value: str | bytes, partition: int | None = None, headers: Sequence[tuple[str, bytes]] | None = None)[source]¶
Sends the message (
value) totopicbykeyand, optionally, to a givenpartitionand returns the future for message delivery awaiting.- Parameters:
topic – topic name.
key – key. Needed to determine message’s partition.
value – message payload. Must be valid UTF-8.
partition – Optional message partition. If not passed, determined by internal partitioner depends on key’s hash.
- class testsuite.databases.kafka.classes.KafkaConsumer[source]¶
Kafka balanced consumer wrapper. All consumers are created with the same group.id, after each test consumer commits offsets for all consumed messages. This is needed to make tests independent.
- await receive_batch(topics: list[str], max_batch_size: int | None, timeout: float = 3.0) list[ConsumedMessage][source]¶
Waits until either
max_batch_sizemessages are consumed ortimeoutexpired.- Parameters:
topics – list of topics to read messages from.
timeout – timeout to stop waiting. Default is 3 seconds.
- Max_batch_size:
maximum number of consumed messages.
- Returns:
List[ConsumedMessage]