Kafka¶
In order to enable Kafka support you have to add
testsuite.kafka.pytest_plugin
to pytest_plugins
list in your
conftest.py
.
By default testsuite starts Kafka service. In this case Kafka installation is required.
Currently Kafka plugin uses async aiokafka driver.
Kafka installation¶
Consult official docs at https://kafka.apache.org/quickstart
If you already have Kafka installed and its location differs from
/etc/kafka
please specify
KAFKA_HOME
environment variable accordingly.
Installed Kafka must support KRaft protocol.
Environment variables¶
KAFKA_HOME¶
Use to override Kafka binaries dir. Default is /etc/kafka
TESTSUITE_KAFKA_SERVER_HOST¶
Use to override Kafka server host. Default is localhost
.
TESTSUITE_KAFKA_SERVER_PORT¶
Use to override Kafka server port. Default is 9099
.
TESTSUITE_KAFKA_CONTROLLER_PORT¶
Use to override Kafka controller port. Default is 9100
.
TESTSUITE_KAFKA_SERVER_START_TIMEOUT¶
By default testsuite will wait for up to 10s for Kafka to start,
one may customize this timeout via environment variable TESTSUITE_KAFKA_SERVER_START_TIMEOUT
.
TESTSUITE_KAFKA_CUSTOM_TOPICS¶
All topics in tests are created automatically by Kafka broker in runtime with only 1 partition.
To create topics with several partitions, either specify TESTSUITE_KAFKA_CUSTOM_TOPICS
environment
variable with the ,
separated list of topic to partitions count mapping or override the kafka_custom_topics
fixture.
For example, TESTSUITE_KAFKA_CUSTOM_TOPICS=large-topic-1:7,large-topic-2:20
creates topic large-topic-1
with 7 partitions and large-topic-2
with 20 partitions.
Customize ports¶
Testsuite may start Kafka with custom ports if
TESTSUITE_KAFKA_SERVER_PORT
or TESTSUITE_KAFKA_CONTROLLER_PORT
environment variables are specified.
Use external instance¶
If your instance is local you may try setting environment variable
TESTSUITE_KAFKA_SERVER_PORT
and pytest option --kafka=1
and see if it works.
P.S. Topics creation remains on the user’s side.
Usage example¶
async def test_kafka_producer_consumer_chain(kafka_producer, kafka_consumer):
TOPIC = 'Test-topic-chain'
KEY = 'test-key'
MESSAGE = 'test-message'
await kafka_producer.send(TOPIC, KEY, MESSAGE)
consumed_message = await kafka_consumer.receive_one([TOPIC])
assert consumed_message.topic == TOPIC
assert consumed_message.key == KEY
assert consumed_message.value == MESSAGE
Example integration¶
pytest_plugins = [
'testsuite.pytest_plugin',
'testsuite.databases.kafka.pytest_plugin',
]
KAFKA_CUSTOM_TOPICS = {
'Large-topic-1': 7,
'Large-topic-2': 3,
}
@pytest.fixture(scope='session')
def kafka_custom_topics():
return KAFKA_CUSTOM_TOPICS
Fixtures¶
kafka_producer¶
- async for ... in testsuite.databases.kafka.pytest_plugin.kafka_producer()[source]
Per test Kafka producer instance.
kafka_consumer¶
- async for ... in testsuite.databases.kafka.pytest_plugin.kafka_consumer()[source]
Per test Kafka consumer instance.
kafka_custom_topics¶
- testsuite.databases.kafka.pytest_plugin.kafka_custom_topics()[source]
Redefine this fixture to pass your custom dictionary of topics’ settings.
kafka_local¶
- testsuite.databases.kafka.pytest_plugin.kafka_local()[source]
Override to use custom local cluster bootstrap servers. If not empty, no service started.
Classes¶
- class testsuite.databases.kafka.classes.KafkaProducer[source]¶
Kafka producer wrapper.
- coroutine send(topic: str, key: str, value: str, partition: Optional[int] = None)[source]¶
Sends the message (
value
) totopic
bykey
and, optionally, to a givenpartition
and waits until it is delivered. If the call is successfully awaited, message is guaranteed to be delivered.- Parameters:
topic – topic name.
key – key. Needed to determine message’s partition.
value – message payload. Must be valid UTF-8.
partition – Optional message partition. If not passed, determined by internal partitioner depends on key’s hash.
- coroutine send_async(topic: str, key: str, value: str, partition: Optional[int] = None)[source]¶
Sends the message (
value
) totopic
bykey
and, optionally, to a givenpartition
and returns the future for message delivery awaiting.- Parameters:
topic – topic name.
key – key. Needed to determine message’s partition.
value – message payload. Must be valid UTF-8.
partition – Optional message partition. If not passed, determined by internal partitioner depends on key’s hash.
- class testsuite.databases.kafka.classes.KafkaConsumer[source]¶
Kafka balanced consumer wrapper. All consumers are created with the same group.id, after each test consumer commits offsets for all consumed messages. This is needed to make tests independent.
- coroutine receive_batch(topics: List[str], max_batch_size: Optional[int], timeout: float = 3.0) List[ConsumedMessage] [source]¶
Waits until either
max_batch_size
messages are consumed ortimeout
expired.- Parameters:
topics – list of topics to read messages from.
timeout – timeout to stop waiting. Default is 3 seconds.
- Max_batch_size:
maximum number of consumed messages.
- Returns:
List[ConsumedMessage]