MySQL chat storage backend example

Example Service

import argparse

from aiohttp import web
import aiomysql

HTTP_TIMEOUT = 10


def main():
    parser = argparse.ArgumentParser(
        description='Testsuite service integration example.',
    )
    parser.add_argument('--port', type=int, default=8080)
    parser.add_argument(
        '--mysql-host',
        help='MySQL hostname',
        default='localhost',
    )
    parser.add_argument(
        '--mysql-port',
        help='MySQL port',
        type=int,
        default=3306,
    )
    parser.add_argument('--mysql-user', help='MySQL user', default='root')
    parser.add_argument(
        '--mysql-dbname',
        help='MySQL database',
        default='chat_messages',
    )
    args = parser.parse_args()

    web.run_app(create_app(args), port=args.port)


async def create_app(args):
    routes = web.RouteTableDef()

    @routes.get('/ping')
    async def handle_ping(request):
        return web.Response(text='OK.')

    @routes.post('/messages/send')
    async def post(request):
        data = await request.json()
        async with app['pool'].acquire() as connection:
            async with connection.cursor() as cursor:
                await cursor.execute(
                    'INSERT INTO messages(username, text) VALUES (%s, %s) ',
                    (data['username'], data['text']),
                )
            await connection.commit()
        return web.json_response({'id': cursor.lastrowid})

    @routes.post('/messages/retrieve')
    async def get(request):
        async with app['pool'].acquire() as connection:
            async with connection.cursor() as cursor:
                await cursor.execute(
                    'SELECT created, username, text FROM messages '
                    'ORDER BY created DESC LIMIT 20',
                )
                messages = [
                    {
                        'created': record[0].isoformat(),
                        'username': record[1],
                        'text': record[2],
                    }
                    for record in await cursor.fetchall()
                ]
        return web.json_response({'messages': messages})

    app = web.Application()
    app['pool'] = await aiomysql.create_pool(
        host=args.mysql_host,
        port=args.mysql_port,
        user=args.mysql_user,
        db=args.mysql_dbname,
    )
    app.on_shutdown.append(on_shutdown)
    app.add_routes(routes)
    return app


async def on_shutdown(app):
    app['pool'].close()
    await app['pool'].wait_closed()


if __name__ == '__main__':
    main()

Conftest

import pathlib
import sys

import pytest

from testsuite.databases.mysql import discover


pytest_plugins = [
    'testsuite.pytest_plugin',
    'testsuite.databases.mysql.pytest_plugin',
]


def pytest_addoption(parser):
    group = parser.getgroup('Example service')
    group.addoption(
        '--example-service-port',
        help='Bind example services to this port (default is %(default)s)',
        default=8080,
        type=int,
    )


@pytest.fixture
async def example_service(
    ensure_daemon_started,
    # Service process holder
    example_service_scope,
    # Service dependencies
    mockserver,
    mysql,
):
    # Start service if not started yet
    await ensure_daemon_started(example_service_scope)


@pytest.fixture
async def example_client(
    create_service_client,
    example_service_baseurl,
    example_service,
):
    # Create service client instance
    return create_service_client(example_service_baseurl)


@pytest.fixture(scope='session')
def example_service_baseurl(pytestconfig):
    return f'http://localhost:{pytestconfig.option.example_service_port}/'


@pytest.fixture(scope='session')
def example_root():
    """Path to example service root."""
    return pathlib.Path(__file__).parent.parent


@pytest.fixture(scope='session')
async def example_service_scope(
    pytestconfig,
    create_daemon_scope,
    example_root,
    example_service_baseurl,
    mysql_local,
    mysql_conninfo,
):
    async with create_daemon_scope(
        args=[
            sys.executable,
            str(example_root.joinpath('server.py')),
            '--port',
            str(pytestconfig.option.example_service_port),
            '--mysql-host',
            mysql_conninfo.hostname,
            '--mysql-port',
            str(mysql_conninfo.port),
            '--mysql-user',
            mysql_conninfo.user,
            '--mysql-dbname',
            mysql_local['chat_messages'].dbname,
        ],
        ping_url=example_service_baseurl + 'ping',
    ) as scope:
        yield scope


@pytest.fixture(scope='session')
def mysql_local(example_root):
    return discover.find_schemas([example_root.joinpath('schemas/mysql')])

Test

async def test_messages_send(example_client, mysql):
    response = await example_client.post(
        '/messages/send',
        json={'username': 'foo', 'text': 'bar'},
    )
    assert response.status_code == 200
    data = response.json()
    assert 'id' in data

    cursor = mysql['chat_messages'].cursor()
    cursor.execute(
        'SELECT username, text FROM messages WHERE id = %s',
        (data['id'],),
    )
    record = cursor.fetchone()
    assert record == ('foo', 'bar')


async def test_messages_retrieve(example_client):
    response = await example_client.post('/messages/retrieve', json={})
    assert response.json() == {
        'messages': [
            {
                'created': '2019-12-31T21:00:01',
                'text': 'happy ny',
                'username': 'bar',
            },
            {
                'created': '2019-12-31T21:00:00',
                'text': 'hello, world!',
                'username': 'foo',
            },
        ],
    }