Kafka

In order to enable Kafka support you have to add testsuite.kafka.pytest_plugin to pytest_plugins list in your conftest.py.

By default testsuite starts Kafka service. In this case Kafka installation is required.

Currently Kafka plugin uses async aiokafka driver.

Kafka installation

Consult official docs at https://kafka.apache.org/quickstart

If you already have Kafka installed and its location differs from /etc/kafka please specify KAFKA_HOME environment variable accordingly.

Installed Kafka must support KRaft protocol.

Environment variables

KAFKA_HOME

Use to override Kafka binaries dir. Default is /etc/kafka

TESTSUITE_KAFKA_SERVER_HOST

Use to override Kafka server host. Default is localhost.

TESTSUITE_KAFKA_SERVER_PORT

Use to override Kafka server port. Default is 9099.

TESTSUITE_KAFKA_CONTROLLER_PORT

Use to override Kafka controller port. Default is 9100.

TESTSUITE_KAFKA_SERVER_START_TIMEOUT

By default testsuite will wait for up to 10s for Kafka to start, one may customize this timeout via environment variable TESTSUITE_KAFKA_SERVER_START_TIMEOUT.

TESTSUITE_KAFKA_CUSTOM_TOPICS

All topics in tests are created automatically by Kafka broker in runtime with only 1 partition. To create topics with several partitions, either specify TESTSUITE_KAFKA_CUSTOM_TOPICS environment variable with the , separated list of topic to partitions count mapping or override the kafka_custom_topics fixture. For example, TESTSUITE_KAFKA_CUSTOM_TOPICS=large-topic-1:7,large-topic-2:20 creates topic large-topic-1 with 7 partitions and large-topic-2 with 20 partitions.

Customize ports

Testsuite may start Kafka with custom ports if TESTSUITE_KAFKA_SERVER_PORT or TESTSUITE_KAFKA_CONTROLLER_PORT environment variables are specified.

Use external instance

If your instance is local you may try setting environment variable TESTSUITE_KAFKA_SERVER_PORT and pytest option --kafka=1 and see if it works.

P.S. Topics creation remains on the user’s side.

Usage example

async def test_kafka_producer_consumer_chain(kafka_producer, kafka_consumer):
    TOPIC = 'Test-topic-chain'
    KEY = 'test-key'
    MESSAGE = 'test-message'

    await kafka_producer.send(TOPIC, KEY, MESSAGE)

    consumed_message = await kafka_consumer.receive_one([TOPIC])

    assert consumed_message.topic == TOPIC
    assert consumed_message.key == KEY
    assert consumed_message.value == MESSAGE

Example integration

pytest_plugins = [
    'testsuite.pytest_plugin',
    'testsuite.databases.kafka.pytest_plugin',
]

KAFKA_CUSTOM_TOPICS = {
    'Large-topic-1': 7,
    'Large-topic-2': 3,
}

@pytest.fixture(scope='session')
def kafka_custom_topics():
    return KAFKA_CUSTOM_TOPICS

Fixtures

kafka_producer

async for ... in testsuite.databases.kafka.pytest_plugin.kafka_producer()[source]

Per test Kafka producer instance.

Returns:

testsuite.databases.kafka.classes.KafkaProducer

kafka_consumer

async for ... in testsuite.databases.kafka.pytest_plugin.kafka_consumer()[source]

Per test Kafka consumer instance.

Returns:

testsuite.databases.kafka.classes.KafkaConsumer

kafka_custom_topics

testsuite.databases.kafka.pytest_plugin.kafka_custom_topics()[source]

Redefine this fixture to pass your custom dictionary of topics’ settings.

kafka_local

testsuite.databases.kafka.pytest_plugin.kafka_local()[source]

Override to use custom local cluster bootstrap servers. If not empty, no service started.

Classes

class testsuite.databases.kafka.classes.KafkaProducer[source]

Kafka producer wrapper.

coroutine send(topic: str, key: str, value: str, partition: Optional[int] = None)[source]

Sends the message (value) to topic by key and, optionally, to a given partition and waits until it is delivered. If the call is successfully awaited, message is guaranteed to be delivered.

Parameters:
  • topic – topic name.

  • key – key. Needed to determine message’s partition.

  • value – message payload. Must be valid UTF-8.

  • partition – Optional message partition. If not passed, determined by internal partitioner depends on key’s hash.

coroutine send_async(topic: str, key: str, value: str, partition: Optional[int] = None)[source]

Sends the message (value) to topic by key and, optionally, to a given partition and returns the future for message delivery awaiting.

Parameters:
  • topic – topic name.

  • key – key. Needed to determine message’s partition.

  • value – message payload. Must be valid UTF-8.

  • partition – Optional message partition. If not passed, determined by internal partitioner depends on key’s hash.

class testsuite.databases.kafka.classes.KafkaConsumer[source]

Kafka balanced consumer wrapper. All consumers are created with the same group.id, after each test consumer commits offsets for all consumed messages. This is needed to make tests independent.

coroutine receive_batch(topics: List[str], max_batch_size: Optional[int], timeout: float = 3.0) List[ConsumedMessage][source]

Waits until either max_batch_size messages are consumed or timeout expired.

Parameters:
  • topics – list of topics to read messages from.

  • timeout – timeout to stop waiting. Default is 3 seconds.

Max_batch_size:

maximum number of consumed messages.

Returns:

List[ConsumedMessage]

coroutine receive_one(topics: List[str], timeout: float = 20.0) ConsumedMessage[source]

Waits until one message are consumed.

Parameters:
  • topics – list of topics to read messages from.

  • timeout – timeout to stop waiting. Default is 20 seconds.

Returns:

ConsumedMessage

class testsuite.databases.kafka.classes.ConsumedMessage[source]

Wrapper for consumed record.