Source code for testsuite.databases.kafka.pytest_plugin

import typing

import pytest

from . import service
from . import classes

from testsuite.utils import compat


def pytest_addoption(parser):
    group = parser.getgroup('kafka')
    group.addoption('--kafka')
    group.addoption(
        '--no-kafka',
        help='Disable use of Kafka',
        action='store_true',
    )


def pytest_configure(config):
    config.addinivalue_line(
        'markers',
        'kafka: per-test Kafka initialization',
    )


def pytest_service_register(register_service):
    register_service('kafka', service.create_kafka_service)


@pytest.fixture(scope='session')
async def _kafka_global_producer(
    _kafka_service,
    _bootstrap_servers,
) -> typing.AsyncGenerator[classes.KafkaConsumer, None]:
    producer = classes.KafkaProducer(
        enabled=_kafka_service,
        bootstrap_servers=_bootstrap_servers,
    )
    await producer.start()

    async with compat.aclosing(producer):
        yield producer


[docs]@pytest.fixture async def kafka_producer( _kafka_global_producer, ) -> typing.AsyncGenerator[classes.KafkaProducer, None]: """ Per test Kafka producer instance. :returns: :py:class:`testsuite.databases.kafka.classes.KafkaProducer` """ yield _kafka_global_producer await _kafka_global_producer._flush()
@pytest.fixture(scope='session') async def _kafka_global_consumer( _kafka_service, _bootstrap_servers, ) -> typing.AsyncGenerator[classes.KafkaConsumer, None]: consumer = classes.KafkaConsumer( enabled=_kafka_service, bootstrap_servers=_bootstrap_servers, ) await consumer.start() async with compat.aclosing(consumer): yield consumer
[docs]@pytest.fixture async def kafka_consumer(_kafka_global_consumer): """ Per test Kafka consumer instance. :returns: :py:class:`testsuite.databases.kafka.classes.KafkaConsumer` """ yield _kafka_global_consumer await _kafka_global_consumer._unsubscribe()
[docs]@pytest.fixture(scope='session') def kafka_custom_topics() -> typing.Dict[str, int]: """ Redefine this fixture to pass your custom dictionary of topics' settings. """ return service.try_get_custom_topics()
[docs]@pytest.fixture(scope='session') def kafka_local() -> classes.BootstrapServers: """ Override to use custom local cluster bootstrap servers. If not empty, no service started. """ return []
@pytest.fixture(scope='session') def kafka_disabled(pytestconfig) -> bool: return pytestconfig.option.no_kafka @pytest.fixture(scope='session') def _kafka_service_settings(kafka_custom_topics) -> classes.ServiceSettings: return service.get_service_settings(kafka_custom_topics) @pytest.fixture(scope='session') def _bootstrap_servers(kafka_local, _kafka_service_settings) -> str: if kafka_local: return ','.join(kafka_local) server_host = _kafka_service_settings.server_host server_port = _kafka_service_settings.server_port return f'{server_host}:{server_port}' @pytest.fixture(scope='session') def _kafka_service( ensure_service_started, kafka_local, kafka_disabled, pytestconfig, _kafka_service_settings, ) -> bool: if kafka_disabled: return False if not kafka_local and not pytestconfig.option.kafka: ensure_service_started('kafka', settings=_kafka_service_settings) return True