PostgreSQL chat storage backend example

Example Service

import argparse

from aiohttp import web
import asyncpg

HTTP_TIMEOUT = 10


def main():
    parser = argparse.ArgumentParser(
        description='Testsuite service integration example.',
    )
    parser.add_argument('--postgresql', help='PostgreSQL connection string')
    parser.add_argument('--port', type=int, default=8080)
    args = parser.parse_args()

    web.run_app(create_app(args), port=args.port)


async def create_app(args):
    routes = web.RouteTableDef()

    @routes.get('/ping')
    async def handle_ping(request):
        return web.Response(text='OK.')

    @routes.post('/messages/send')
    async def post(request):
        data = await request.json()
        async with app['pool'].acquire() as connection:
            row_id = await connection.fetchval(
                'INSERT INTO messages(username, text) VALUES ($1, $2) '
                'RETURNING id',
                data['username'],
                data['text'],
            )
        return web.json_response({'id': row_id})

    @routes.post('/messages/retrieve')
    async def get(request):
        async with app['pool'].acquire() as connection:
            records = await connection.fetch(
                'SELECT created, username, "text" FROM messages '
                'ORDER BY created DESC LIMIT 20',
            )
        messages = [
            {
                'created': record[0].isoformat(),
                'username': record[1],
                'text': record[2],
            }
            for record in records
        ]
        return web.json_response({'messages': messages})

    app = web.Application()
    app['pool'] = await asyncpg.create_pool(dsn=args.postgresql)
    app.add_routes(routes)
    return app


if __name__ == '__main__':
    main()

Conftest

import pathlib
import sys

import pytest

from testsuite.databases.pgsql import discover


pytest_plugins = [
    'testsuite.pytest_plugin',
    'testsuite.databases.pgsql.pytest_plugin',
]


def pytest_addoption(parser):
    group = parser.getgroup('Example service')
    group.addoption(
        '--example-service-port',
        help='Bind example services to this port (default is %(default)s)',
        default=8080,
        type=int,
    )


@pytest.fixture
async def example_service(
    ensure_daemon_started,
    # Service process holder
    example_service_scope,
    # Service dependencies
    mockserver,
    pgsql,
):
    # Start service if not started yet
    await ensure_daemon_started(example_service_scope)


@pytest.fixture
async def example_client(
    create_service_client,
    example_service_baseurl,
    example_service,
):
    # Create service client instance
    return create_service_client(example_service_baseurl)


@pytest.fixture(scope='session')
def example_service_baseurl(pytestconfig):
    return f'http://localhost:{pytestconfig.option.example_service_port}/'


@pytest.fixture(scope='session')
def example_root():
    """Path to example service root."""
    return pathlib.Path(__file__).parent.parent


@pytest.fixture(scope='session')
async def example_service_scope(
    pytestconfig,
    create_daemon_scope,
    example_root,
    example_service_baseurl,
    pgsql_local,
):
    async with create_daemon_scope(
        args=[
            sys.executable,
            str(example_root.joinpath('server.py')),
            '--port',
            str(pytestconfig.option.example_service_port),
            '--postgresql',
            pgsql_local['chat_messages'].get_uri(),
        ],
        ping_url=example_service_baseurl + 'ping',
    ) as scope:
        yield scope


@pytest.fixture(scope='session')
def pgsql_local(example_root, pgsql_local_create):
    databases = discover.find_schemas(
        'chat_storage_postgres',
        [example_root.joinpath('schemas/postgresql')],
    )
    return pgsql_local_create(list(databases.values()))

Test

async def test_messages_send(example_client, pgsql):
    response = await example_client.post(
        '/messages/send',
        json={'username': 'foo', 'text': 'bar'},
    )
    assert response.status_code == 200
    data = response.json()
    assert 'id' in data

    cursor = pgsql['chat_messages'].cursor()
    cursor.execute(
        'SELECT username, text FROM messages WHERE id = %s',
        (data['id'],),
    )
    record = cursor.fetchone()
    assert record == ('foo', 'bar')


async def test_messages_retrieve(example_client, pgsql):
    response = await example_client.post('/messages/retrieve', json={})
    assert response.json() == {
        'messages': [
            {
                'created': '2019-12-31T21:00:01+00:00',
                'text': 'happy ny',
                'username': 'bar',
            },
            {
                'created': '2019-12-31T21:00:00+00:00',
                'text': 'hello, world!',
                'username': 'foo',
            },
        ],
    }