import contextlib
import inspect
import itertools
import signal
import subprocess
import uuid
import warnings
from typing import (
Any,
AsyncContextManager,
AsyncGenerator,
Callable,
Dict,
Optional,
Sequence,
Tuple,
)
import aiohttp
import pytest
from testsuite import annotations
from testsuite._internal import fixture_class, fixture_types
from testsuite.utils import compat
from . import service_client, service_daemon, spawn
from .spawn import __tracebackhide__
SHUTDOWN_SIGNALS = {
'SIGINT': signal.SIGINT,
'SIGKILL': signal.SIGKILL,
'SIGQUIT': signal.SIGQUIT,
'SIGTERM': signal.SIGTERM,
}
class _DaemonScope:
def __init__(self, name: str, spawn: Callable) -> None:
self.name = name
self._spawn = spawn
async def spawn(self) -> 'DaemonInstance':
manager = self._spawn()
# For backward compatibility with older spawners
if inspect.iscoroutine(manager):
warnings.warn(
f'Please rewrite your spawner into async context manager {self._spawn}',
PendingDeprecationWarning,
)
manager = await manager
process = await manager.__aenter__()
return DaemonInstance(manager, process)
class DaemonInstance:
process: Optional[subprocess.Popen]
def __init__(self, owner, process) -> None:
self.id = uuid.uuid4().hex
self._owner = owner
self.process = process
async def aclose(self) -> None:
await self._owner.__aexit__(None, None, None)
class _DaemonStore:
cells: Dict[str, DaemonInstance]
def __init__(self) -> None:
self.cells = {}
async def aclose(self) -> None:
for daemon in self.cells.values():
await self._close_daemon(daemon)
self.cells = {}
@contextlib.asynccontextmanager
async def scope(self, name, spawn) -> AsyncGenerator[_DaemonScope, None]:
scope = _DaemonScope(name, spawn)
try:
yield scope
finally:
daemon = self.cells.pop(name, None)
if daemon:
await self._close_daemon(daemon)
async def request(self, scope: _DaemonScope) -> DaemonInstance:
if scope.name in self.cells:
daemon = self.cells[scope.name]
if daemon.process is None:
return daemon
if daemon.process.poll() is None:
return daemon
await self.aclose()
daemon = await scope.spawn()
self.cells[scope.name] = daemon
return daemon
def has_running_daemons(self) -> bool:
for daemon in self.cells.values():
if daemon.process and daemon.process.poll() is None:
return True
return False
async def _close_daemon(self, daemon: DaemonInstance):
await daemon.aclose()
class EnsureDaemonStartedFixture(fixture_class.Fixture):
"""Fixture that starts requested service."""
_fixture__global_daemon_store: _DaemonStore
_fixture__testsuite_suspend_capture: Any
_fixture_pytestconfig: Any
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._requests = set()
async def __call__(self, scope: _DaemonScope) -> DaemonInstance:
self._requests.add(scope.name)
if len(self._requests) > 1:
pytest.fail('Test requested multiple daemons: %r' % self._requests)
if self._fixture_pytestconfig.option.service_wait:
with self._fixture__testsuite_suspend_capture():
return await self._fixture__global_daemon_store.request(scope)
return await self._fixture__global_daemon_store.request(scope)
[docs]class ServiceSpawnerFactory(fixture_class.Fixture):
_fixture_pytestconfig: Any
_fixture_service_client_session_factory: Any
_fixture_wait_service_started: Any
[docs] def __call__(
self,
args: Sequence[str],
*,
base_command: Optional[Sequence[str]] = None,
env: Optional[Dict[str, str]] = None,
poll_retries: int = service_daemon.POLL_RETRIES,
ping_url: Optional[str] = None,
ping_request_timeout: float = service_daemon.PING_REQUEST_TIMEOUT,
ping_response_codes: Tuple[int] = service_daemon.PING_RESPONSE_CODES,
health_check: Optional[service_daemon.HealthCheckType] = None,
subprocess_spawner: Optional[Callable[..., subprocess.Popen]] = None,
subprocess_options: Optional[Dict[str, Any]] = None,
setup_service: Optional[Callable[[subprocess.Popen], None]] = None,
shutdown_signal: Optional[int] = None,
stdout_handler=None,
stderr_handler=None,
):
"""Creates service spawner asynccontextmanager factory.
:param args: command arguments
:param base_command: Arguments to be prepended to ``args``.
:param env: Environment variables dictionary.
:param poll_retries: Number of tries for service health check
:param ping_url: service health check url, service is considered up
when 200 received.
:param ping_request_timeout: Timeout for ping_url request
:param ping_response_codes: HTTP resopnse codes tuple meaning that
service is up and running.
:param health_check: Async function to check service is running.
:param subprocess_spawner: callable with `subprocess.Popen` interface.
:param subprocess_options: Custom subprocess options.
:param setup_service: Function to be called right after service
is started.
:param shutdown_signal: Signal used to stop running services.
:returns: Return asynccontextmanager factory that might be used
within ``register_daemon_scope`` fixture.
"""
pytestconfig = self._fixture_pytestconfig
shutdown_timeout = pytestconfig.option.service_shutdown_timeout
if shutdown_signal is None:
shutdown_signal = SHUTDOWN_SIGNALS[
pytestconfig.option.service_shutdown_signal
]
health_check = service_daemon.make_health_check(
ping_url=ping_url,
ping_request_timeout=ping_request_timeout,
ping_response_codes=ping_response_codes,
health_check=health_check,
)
command_args = _build_command_args(args, base_command)
@contextlib.asynccontextmanager
async def spawn():
if pytestconfig.option.service_wait:
manager = self._fixture_wait_service_started(
args=command_args,
health_check=health_check,
)
elif pytestconfig.option.service_disable:
manager = service_daemon.start_dummy_process()
else:
manager = service_daemon.start(
args=command_args,
env=env,
shutdown_signal=shutdown_signal,
shutdown_timeout=shutdown_timeout,
poll_retries=poll_retries,
health_check=health_check,
session_factory=self._fixture_service_client_session_factory,
subprocess_options=subprocess_options,
setup_service=setup_service,
subprocess_spawner=subprocess_spawner,
stdout_handler=stdout_handler,
stderr_handler=stderr_handler,
)
async with manager as process:
yield process
return spawn
class ServiceSpawnerFixture(fixture_class.Fixture):
_fixture_service_spawner_factory: ServiceSpawnerFactory
def __call__(self, *args, **kwargs):
warnings.warn(
'service_spawner() fixture is deprecated, '
'use service_spawner_factory()',
PendingDeprecationWarning,
)
factory = self._fixture_service_spawner_factory(*args, **kwargs)
async def spawner():
return factory()
return spawner
[docs]class CreateDaemonScope(fixture_class.Fixture):
"""Create daemon scope for daemon with command to start."""
_fixture__global_daemon_store: _DaemonStore
_fixture_service_spawner_factory: ServiceSpawnerFactory
[docs] def __call__(
self,
*,
args: Sequence[str],
ping_url: Optional[str] = None,
name: Optional[str] = None,
base_command: Optional[Sequence] = None,
env: Optional[Dict[str, str]] = None,
poll_retries: int = service_daemon.POLL_RETRIES,
ping_request_timeout: float = service_daemon.PING_REQUEST_TIMEOUT,
ping_response_codes: Tuple[int] = service_daemon.PING_RESPONSE_CODES,
health_check: Optional[service_daemon.HealthCheckType] = None,
subprocess_options: Optional[Dict[str, Any]] = None,
setup_service: Optional[Callable[[subprocess.Popen], None]] = None,
shutdown_signal: Optional[int] = None,
stdout_handler=None,
stderr_handler=None,
) -> AsyncContextManager[_DaemonScope]:
"""
:param args: command arguments
:param base_command: Arguments to be prepended to ``args``.
:param env: Environment variables dictionary.
:param poll_retries: Number of tries for service health check
:param ping_url: service health check url, service is considered up
when 200 received.
:param ping_request_timeout: Timeout for ping_url request
:param ping_response_codes: HTTP resopnse codes tuple meaning that
service is up and running.
:param health_check: Async function to check service is running.
:param subprocess_options: Custom subprocess options.
:param setup_service: Function to be called right after service
is started.
:param shutdown_signal: Signal used to stop running services.
:returns: Returns internal daemon scope instance to be used with
``ensure_daemon_started`` fixture.
"""
if name is None:
name = ' '.join(args)
return self._fixture__global_daemon_store.scope(
name=name,
spawn=self._fixture_service_spawner_factory(
args=args,
base_command=base_command,
env=env,
poll_retries=poll_retries,
ping_url=ping_url,
ping_request_timeout=ping_request_timeout,
ping_response_codes=ping_response_codes,
health_check=health_check,
subprocess_options=subprocess_options,
setup_service=setup_service,
shutdown_signal=shutdown_signal,
stdout_handler=stdout_handler,
stderr_handler=stderr_handler,
),
)
[docs]class CreateServiceClientFixture(fixture_class.Fixture):
"""Creates service client instance.
Example:
.. code-block:: python
def my_client(create_service_client):
return create_service_client('http://localhost:9999/')
"""
_fixture_service_client_default_headers: Dict[str, str]
_fixture_service_client_options: Dict[str, Any]
[docs] def __call__(
self,
base_url: str,
*,
client_class=service_client.Client,
**kwargs,
):
"""
:param base_url: base url for http client
:param client_class: client class to use
:returns: ``client_class`` instance
"""
return client_class(
base_url,
headers=self._fixture_service_client_default_headers,
**self._fixture_service_client_options,
**kwargs,
)
ensure_daemon_started = fixture_class.create_fixture_factory(
EnsureDaemonStartedFixture,
)
service_spawner = fixture_class.create_fixture_factory(
ServiceSpawnerFixture,
scope='session',
)
service_spawner_factory = fixture_class.create_fixture_factory(
ServiceSpawnerFactory,
scope='session',
)
create_daemon_scope = fixture_class.create_fixture_factory(
CreateDaemonScope,
scope='session',
)
create_service_client = fixture_class.create_fixture_factory(
CreateServiceClientFixture,
)
@pytest.fixture(scope='session')
def wait_service_started(pytestconfig, service_client_session_factory):
reporter = pytestconfig.pluginmanager.getplugin('terminalreporter')
@contextlib.asynccontextmanager
async def waiter(*, args, health_check):
await service_daemon.service_wait(
args=args,
reporter=reporter,
health_check=health_check,
session_factory=service_client_session_factory,
)
yield None
return waiter
def pytest_addoption(parser):
group = parser.getgroup('services')
group.addoption(
'--service-timeout',
metavar='TIMEOUT',
help=(
'Service client timeout in seconds. 0 means no timeout. '
'Default is %(default)s'
),
default=120.0,
type=float,
)
group.addoption(
'--service-disable',
action='store_true',
help='Do not start service daemon from testsuite',
)
group.addoption(
'--service-wait',
action='store_true',
help='Wait for service to start outside of testsuite itself, e.g. gdb',
)
group.addoption(
'--service-shutdown-timeout',
help='Service shutdown timeout in seconds. Default is %(default)s',
default=120.0,
type=float,
)
group.addoption(
'--service-shutdown-signal',
help='Service shutdown signal. Default is %(default)s',
default='SIGINT',
choices=sorted(SHUTDOWN_SIGNALS.keys()),
)
[docs]@pytest.fixture(scope='session')
def register_daemon_scope(_global_daemon_store: _DaemonStore):
"""Context manager that registers service process session.
Yields daemon scope instance.
:param name: service name
:spawn spawn: asynccontextmanager service factory
"""
return _global_daemon_store.scope
@pytest.fixture(scope='session')
def service_client_session_factory(
event_loop,
) -> service_daemon.ClientSessionFactory:
def make_session(**kwargs):
kwargs.setdefault('loop', event_loop)
return aiohttp.ClientSession(**kwargs)
return make_session
@pytest.fixture
async def service_client_session(
service_client_session_factory,
) -> annotations.AsyncYieldFixture[aiohttp.ClientSession]:
async with service_client_session_factory() as session:
yield session
@pytest.fixture
def service_client_default_headers() -> Dict[str, str]:
"""Default service client headers.
Fill free to override in your conftest.py
"""
return {}
@pytest.fixture
def service_client_options(
pytestconfig,
service_client_session: aiohttp.ClientSession,
mockserver: fixture_types.MockserverFixture,
) -> annotations.YieldFixture[Dict[str, Any]]:
"""Returns service client options dictionary."""
yield {
'session': service_client_session,
'timeout': pytestconfig.option.service_timeout or None,
'span_id_header': mockserver.span_id_header,
}
@pytest.fixture(scope='session')
async def _global_daemon_store(loop):
store = _DaemonStore()
async with compat.aclosing(store):
yield store
@pytest.fixture(scope='session')
def _testsuite_suspend_capture(pytestconfig):
capmanager = pytestconfig.pluginmanager.getplugin('capturemanager')
@contextlib.contextmanager
def suspend():
try:
capmanager.suspend_global_capture()
yield
finally:
capmanager.resume_global_capture()
return suspend
def _build_command_args(
args: Sequence,
base_command: Optional[Sequence],
) -> Tuple[str, ...]:
return tuple(str(arg) for arg in itertools.chain(base_command or (), args))