import json
import ssl
import typing
import uuid
import yarl
import aiohttp
from testsuite import annotations
from testsuite.utils import http
from testsuite.utils import url_util
DEFAULT_HOST = 'localhost'
DEFAULT_TIMEOUT = 120.0
TResponse = typing.TypeVar(
'TResponse',
aiohttp.ClientResponse,
http.ClientResponse,
)
class BaseAiohttpClient:
def __init__(
self,
base_url: str,
*,
session: aiohttp.ClientSession,
ssl_context: typing.Optional[ssl.SSLContext] = None,
span_id_header: typing.Optional[str] = None,
headers: typing.Optional[typing.Dict[str, str]] = None,
timeout: float = DEFAULT_TIMEOUT,
):
"""
:param base_url: Base client url
:param session: ``aiohttp.ClientSession`` instance
:param headers: default request headers dictionary
:param timeout: http client default timeout
"""
self._base_url = url_util.ensure_trailing_separator(base_url)
self._headers = headers or {}
self._timeout = timeout
self._session = session
self._ssl_context = ssl_context
self._span_id_header = span_id_header
def url(self, path: typing.Union[str, yarl.URL]):
if isinstance(path, str):
return url_util.join(self._base_url, path)
return path
async def _aiohttp_request(
self,
http_method: str,
path: typing.Union[str, yarl.URL],
headers: typing.Optional[typing.Dict[str, str]] = None,
bearer: typing.Optional[str] = None,
x_real_ip: typing.Optional[str] = None,
**kwargs,
) -> aiohttp.ClientResponse:
url = self.url(path)
headers = self._build_headers(
headers,
bearer=bearer,
x_real_ip=x_real_ip,
)
kwargs['timeout'] = kwargs.get('timeout', self._timeout)
params = kwargs.get('params', None)
if params is not None:
kwargs['params'] = _flatten(params)
response = await self._session.request(
http_method,
url,
headers=headers,
ssl=self._ssl_context,
**kwargs,
)
return response
def _build_headers(
self,
user_headers: typing.Optional[typing.Dict[str, str]] = None,
bearer: typing.Optional[str] = None,
x_real_ip: typing.Optional[str] = None,
) -> typing.Dict[str, str]:
headers = self._headers.copy()
if user_headers:
headers.update(user_headers)
if bearer:
headers['Authorization'] = 'Bearer %s' % bearer
if x_real_ip:
headers['X-Real-IP'] = x_real_ip
if self._span_id_header and self._span_id_header not in headers:
headers[self._span_id_header] = uuid.uuid4().hex
headers = {
key: '' if value is None else value
for key, value in headers.items()
}
return headers
class GenericClient(BaseAiohttpClient, typing.Generic[TResponse]):
"""Basic asyncio HTTP service client."""
async def post(
self,
path: str,
json: annotations.JsonAnyOptional = None,
data: typing.Any = None,
params: typing.Optional[typing.Dict[str, str]] = None,
bearer: typing.Optional[str] = None,
x_real_ip: typing.Optional[str] = None,
headers: typing.Optional[typing.Dict[str, str]] = None,
**kwargs,
) -> TResponse:
"""Perform HTTP POST request."""
return await self._request(
'POST',
path,
json=json,
data=data,
params=params,
headers=headers,
bearer=bearer,
x_real_ip=x_real_ip,
**kwargs,
)
async def put(
self,
path,
json: annotations.JsonAnyOptional = None,
data: typing.Any = None,
params: typing.Optional[typing.Dict[str, str]] = None,
bearer: typing.Optional[str] = None,
x_real_ip: typing.Optional[str] = None,
headers: typing.Optional[typing.Dict[str, str]] = None,
**kwargs,
) -> TResponse:
"""Perform HTTP PUT request."""
return await self._request(
'PUT',
path,
json=json,
data=data,
params=params,
headers=headers,
bearer=bearer,
x_real_ip=x_real_ip,
**kwargs,
)
async def patch(
self,
path,
json: annotations.JsonAnyOptional = None,
data: typing.Any = None,
params: typing.Optional[typing.Dict[str, str]] = None,
bearer: typing.Optional[str] = None,
x_real_ip: typing.Optional[str] = None,
headers: typing.Optional[typing.Dict[str, str]] = None,
**kwargs,
) -> TResponse:
"""Perform HTTP PATCH request."""
return await self._request(
'PATCH',
path,
json=json,
data=data,
params=params,
headers=headers,
bearer=bearer,
x_real_ip=x_real_ip,
**kwargs,
)
async def get(
self,
path: str,
headers: typing.Optional[typing.Dict[str, str]] = None,
bearer: typing.Optional[str] = None,
x_real_ip: typing.Optional[str] = None,
**kwargs,
) -> TResponse:
"""Perform HTTP GET request."""
return await self._request(
'GET',
path,
headers=headers,
bearer=bearer,
x_real_ip=x_real_ip,
**kwargs,
)
async def delete(
self,
path: str,
headers: typing.Optional[typing.Dict[str, str]] = None,
bearer: typing.Optional[str] = None,
x_real_ip: typing.Optional[str] = None,
**kwargs,
) -> TResponse:
"""Perform HTTP DELETE request."""
return await self._request(
'DELETE',
path,
headers=headers,
bearer=bearer,
x_real_ip=x_real_ip,
**kwargs,
)
async def options(
self,
path: str,
headers: typing.Optional[typing.Dict[str, str]] = None,
bearer: typing.Optional[str] = None,
x_real_ip: typing.Optional[str] = None,
**kwargs,
) -> TResponse:
"""Perform HTTP OPTIONS request."""
return await self._request(
'OPTIONS',
path,
headers=headers,
bearer=bearer,
x_real_ip=x_real_ip,
**kwargs,
)
async def request(
self,
http_method: str,
path: str,
**kwargs,
) -> TResponse:
"""Perform HTTP ``http_method`` request."""
return await self._request(http_method, path, **kwargs)
async def _request(
self,
http_method: str,
path: typing.Union[str, yarl.URL],
headers: typing.Optional[typing.Dict[str, str]] = None,
bearer: typing.Optional[str] = None,
x_real_ip: typing.Optional[str] = None,
**kwargs,
) -> TResponse:
raise NotImplementedError
[docs]class AiohttpClient(GenericClient[aiohttp.ClientResponse]):
async def _request(
self,
http_method: str,
path: typing.Union[str, yarl.URL],
headers: typing.Optional[typing.Dict[str, str]] = None,
bearer: typing.Optional[str] = None,
x_real_ip: typing.Optional[str] = None,
**kwargs,
) -> aiohttp.ClientResponse:
return await self._aiohttp_request(
http_method,
path,
headers,
bearer,
x_real_ip,
**kwargs,
)
[docs]class Client(GenericClient[http.ClientResponse]):
async def _request(
self,
http_method: str,
path: typing.Union[str, yarl.URL],
headers: typing.Optional[typing.Dict[str, str]] = None,
bearer: typing.Optional[str] = None,
x_real_ip: typing.Optional[str] = None,
**kwargs,
) -> http.ClientResponse:
response = await self._aiohttp_request(
http_method,
path,
headers,
bearer,
x_real_ip,
**kwargs,
)
return await self._wrap_client_response(response)
def _wrap_client_response(
self,
response,
) -> typing.Awaitable[http.ClientResponse]:
return http.wrap_client_response(response, json_loads=json.loads)
def _flatten(query_params):
result = []
iterable = (
query_params.items() if isinstance(query_params, dict) else query_params
)
for key, value in iterable:
if isinstance(value, (tuple, list)):
for element in value:
result.append((key, str(element)))
else:
result.append((key, str(value)))
return result