import asyncio
import dataclasses
import typing
import aiokafka
import logging

logger = logging.getLogger(__name__)

class ServiceSettings:
    """Kafka service start settings"""

    server_host: str
    server_port: int
    controller_port: int
    custom_start_topics: typing.Dict[str, int]

"""Kafka bootstrap servers URLs list"""
BootstrapServers = typing.List[str]

class KafkaDisabledError(Exception):

[docs]class KafkaProducer: """ Kafka producer wrapper. """ def __init__(self, enabled: bool, bootstrap_servers: str): self._enabled = enabled self._bootstrap_servers = bootstrap_servers async def start(self): if self._enabled: self.producer = aiokafka.AIOKafkaProducer( bootstrap_servers=self._bootstrap_servers, linger_ms=0, # turn off message buffering ) await self.producer.start()
[docs] async def send( self, topic: str, key: str, value: str, partition: typing.Optional[int] = None, ): """ Sends the message (``value``) to ``topic`` by ``key`` and, optionally, to a given ``partition`` and waits until it is delivered. If the call is successfully awaited, message is guaranteed to be delivered. :param topic: topic name. :param key: key. Needed to determine message's partition. :param value: message payload. Must be valid UTF-8. :param partition: Optional message partition. If not passed, determined by internal partitioner depends on key's hash. """ resp_future = await self.send_async(topic, key, value, partition) await resp_future
[docs] async def send_async( self, topic: str, key: str, value: str, partition: typing.Optional[int] = None, ): """ Sends the message (``value``) to ``topic`` by ``key`` and, optionally, to a given ``partition`` and returns the future for message delivery awaiting. :param topic: topic name. :param key: key. Needed to determine message's partition. :param value: message payload. Must be valid UTF-8. :param partition: Optional message partition. If not passed, determined by internal partitioner depends on key's hash. """ if not self._enabled: raise KafkaDisabledError return await self.producer.send( topic=topic, value=value.encode(), key=key.encode(), partition=partition, )
async def _flush(self):'Flusing produced messages') await self.producer.flush() async def aclose(self): if self._enabled: await self.producer.stop()
[docs]class ConsumedMessage: """Wrapper for consumed record.""" topic: str key: str value: str partition: int offset: int def __init__(self, record: aiokafka.ConsumerRecord): self.topic = record.topic self.key = record.key.decode() self.value = record.value.decode() self.partition = record.partition self.offset = record.offset
[docs]class KafkaConsumer: """ Kafka balanced consumer wrapper. All consumers are created with the same, after each test consumer commits offsets for all consumed messages. This is needed to make tests independent. """ def __init__(self, enabled: bool, bootstrap_servers): self._enabled = enabled self._bootstrap_servers = bootstrap_servers self._subscribed_topics: typing.List[str] = [] async def start(self): if self._enabled: self.consumer = aiokafka.AIOKafkaConsumer( group_id='Test-group', bootstrap_servers=self._bootstrap_servers, auto_offset_reset='earliest', enable_auto_commit=False, ) await self.consumer.start() def _subscribe(self, topics: typing.List[str]): if not self._enabled: raise KafkaDisabledError to_subscribe: typing.List[str] = [] for topic in topics: if topic not in self._subscribed_topics: to_subscribe.append(topic) if to_subscribe:'Subscribing to [%s]', ','.join(to_subscribe)) self.consumer.subscribe(to_subscribe) self._subscribed_topics.extend(to_subscribe) async def _commit(self): if not self._enabled: raise KafkaDisabledError if self._subscribed_topics: await self.consumer.commit() async def _unsubscribe(self): await self._commit() if self._subscribed_topics:'Unsubscribing from all topics') self.consumer.unsubscribe() self._subscribed_topics = []
[docs] async def receive_one( self, topics: typing.List[str], timeout: float = 20.0 ) -> ConsumedMessage: """ Waits until one message are consumed. :param topics: list of topics to read messages from. :param timeout: timeout to stop waiting. Default is 20 seconds. :returns: :py:class:`ConsumedMessage` """ if not self._enabled: raise KafkaDisabledError self._subscribe(topics) async def _do_receive(): record: aiokafka.ConsumerRecord = await self.consumer.getone() return ConsumedMessage(record) return await asyncio.wait_for(_do_receive(), timeout=timeout)
[docs] async def receive_batch( self, topics: typing.List[str], max_batch_size: typing.Optional[int], timeout: float = 3.0, ) -> typing.List[ConsumedMessage]: """ Waits until either ``max_batch_size`` messages are consumed or ``timeout`` expired. :param topics: list of topics to read messages from. :max_batch_size: maximum number of consumed messages. :param timeout: timeout to stop waiting. Default is 3 seconds. :returns: :py:class:`List[ConsumedMessage]` """ if not self._enabled: raise KafkaDisabledError self._subscribe(topics) records: typing.Dict[ aiokafka.TopicPartition, typing.List[aiokafka.ConsumerRecord] ] = await self.consumer.getmany( timeout_ms=int(timeout * 1000), max_records=max_batch_size ) return list(map(ConsumedMessage, sum(records.values(), [])))
async def aclose(self): if self._enabled: await self._unsubscribe() await self.consumer.stop()