MySQL chat storage backend example¶
Example Service¶
import argparse
from aiohttp import web
import aiomysql
HTTP_TIMEOUT = 10
def main():
parser = argparse.ArgumentParser(
description='Testsuite service integration example.',
)
parser.add_argument('--port', type=int, default=8080)
parser.add_argument(
'--mysql-host',
help='MySQL hostname',
default='localhost',
)
parser.add_argument(
'--mysql-port',
help='MySQL port',
type=int,
default=3306,
)
parser.add_argument('--mysql-user', help='MySQL user', default='root')
parser.add_argument(
'--mysql-dbname',
help='MySQL database',
default='chat_messages',
)
args = parser.parse_args()
web.run_app(create_app(args), port=args.port)
async def create_app(args):
routes = web.RouteTableDef()
@routes.get('/ping')
async def handle_ping(request):
return web.Response(text='OK.')
@routes.post('/messages/send')
async def post(request):
data = await request.json()
async with app['pool'].acquire() as connection:
async with connection.cursor() as cursor:
await cursor.execute(
'INSERT INTO messages(username, text) VALUES (%s, %s) ',
(data['username'], data['text']),
)
await connection.commit()
return web.json_response({'id': cursor.lastrowid})
@routes.post('/messages/retrieve')
async def get(request):
async with app['pool'].acquire() as connection:
async with connection.cursor() as cursor:
await cursor.execute(
'SELECT created, username, text FROM messages '
'ORDER BY created DESC LIMIT 20',
)
messages = [
{
'created': record[0].isoformat(),
'username': record[1],
'text': record[2],
}
for record in await cursor.fetchall()
]
return web.json_response({'messages': messages})
app = web.Application()
app['pool'] = await aiomysql.create_pool(
host=args.mysql_host,
port=args.mysql_port,
user=args.mysql_user,
db=args.mysql_dbname,
)
app.on_shutdown.append(on_shutdown)
app.add_routes(routes)
return app
async def on_shutdown(app):
app['pool'].close()
await app['pool'].wait_closed()
if __name__ == '__main__':
main()
Conftest¶
import pathlib
import sys
import pytest
from testsuite.databases.mysql import discover
pytest_plugins = [
'testsuite.pytest_plugin',
'testsuite.databases.mysql.pytest_plugin',
]
def pytest_addoption(parser):
group = parser.getgroup('Example service')
group.addoption(
'--example-service-port',
help='Bind example services to this port (default is %(default)s)',
default=8080,
type=int,
)
@pytest.fixture
async def example_service(
ensure_daemon_started,
# Service process holder
example_service_scope,
# Service dependencies
mockserver,
mysql,
):
# Start service if not started yet
await ensure_daemon_started(example_service_scope)
@pytest.fixture
async def example_client(
create_service_client,
example_service_baseurl,
example_service,
):
# Create service client instance
return create_service_client(example_service_baseurl)
@pytest.fixture(scope='session')
def example_service_baseurl(pytestconfig):
return f'http://localhost:{pytestconfig.option.example_service_port}/'
@pytest.fixture(scope='session')
def example_root():
"""Path to example service root."""
return pathlib.Path(__file__).parent.parent
@pytest.fixture(scope='session')
async def example_service_scope(
pytestconfig,
create_daemon_scope,
example_root,
example_service_baseurl,
mysql_local,
mysql_conninfo,
):
async with create_daemon_scope(
args=[
sys.executable,
str(example_root.joinpath('server.py')),
'--port',
str(pytestconfig.option.example_service_port),
'--mysql-host',
mysql_conninfo.hostname,
'--mysql-port',
str(mysql_conninfo.port),
'--mysql-user',
mysql_conninfo.user,
'--mysql-dbname',
mysql_local['chat_messages'].dbname,
],
ping_url=example_service_baseurl + 'ping',
) as scope:
yield scope
@pytest.fixture(scope='session')
def mysql_local(example_root):
return discover.find_schemas([example_root.joinpath('schemas/mysql')])
Test¶
async def test_messages_send(example_client, mysql):
response = await example_client.post(
'/messages/send',
json={'username': 'foo', 'text': 'bar'},
)
assert response.status_code == 200
data = response.json()
assert 'id' in data
cursor = mysql['chat_messages'].cursor()
cursor.execute(
'SELECT username, text FROM messages WHERE id = %s',
(data['id'],),
)
record = cursor.fetchone()
assert record == ('foo', 'bar')
async def test_messages_retrieve(example_client):
response = await example_client.post('/messages/retrieve', json={})
assert response.json() == {
'messages': [
{
'created': '2019-12-31T21:00:01',
'text': 'happy ny',
'username': 'bar',
},
{
'created': '2019-12-31T21:00:00',
'text': 'hello, world!',
'username': 'foo',
},
],
}