PostgreSQL chat storage backend example¶
Example Service¶
import argparse
from aiohttp import web
import asyncpg
HTTP_TIMEOUT = 10
def main():
parser = argparse.ArgumentParser(
description='Testsuite service integration example.',
)
parser.add_argument('--postgresql', help='PostgreSQL connection string')
parser.add_argument('--port', type=int, default=8080)
args = parser.parse_args()
web.run_app(create_app(args), port=args.port)
async def create_app(args):
routes = web.RouteTableDef()
@routes.get('/ping')
async def handle_ping(request):
return web.Response(text='OK.')
@routes.post('/messages/send')
async def post(request):
data = await request.json()
async with app['pool'].acquire() as connection:
row_id = await connection.fetchval(
'INSERT INTO messages(username, text) VALUES ($1, $2) '
'RETURNING id',
data['username'],
data['text'],
)
return web.json_response({'id': row_id})
@routes.post('/messages/retrieve')
async def get(request):
async with app['pool'].acquire() as connection:
records = await connection.fetch(
'SELECT created, username, "text" FROM messages '
'ORDER BY created DESC LIMIT 20',
)
messages = [
{
'created': record[0].isoformat(),
'username': record[1],
'text': record[2],
}
for record in records
]
return web.json_response({'messages': messages})
app = web.Application()
app['pool'] = await asyncpg.create_pool(dsn=args.postgresql)
app.add_routes(routes)
return app
if __name__ == '__main__':
main()
Conftest¶
import pathlib
import sys
import pytest
from testsuite.databases.pgsql import discover
pytest_plugins = [
'testsuite.pytest_plugin',
'testsuite.databases.pgsql.pytest_plugin',
]
def pytest_addoption(parser):
group = parser.getgroup('Example service')
group.addoption(
'--example-service-port',
help='Bind example services to this port (default is %(default)s)',
default=8080,
type=int,
)
@pytest.fixture
async def example_service(
ensure_daemon_started,
# Service process holder
example_service_scope,
# Service dependencies
mockserver,
pgsql,
):
# Start service if not started yet
await ensure_daemon_started(example_service_scope)
@pytest.fixture
async def example_client(
create_service_client,
example_service_baseurl,
example_service,
):
# Create service client instance
return create_service_client(example_service_baseurl)
@pytest.fixture(scope='session')
def example_service_baseurl(pytestconfig):
return f'http://localhost:{pytestconfig.option.example_service_port}/'
@pytest.fixture(scope='session')
def example_root():
"""Path to example service root."""
return pathlib.Path(__file__).parent.parent
@pytest.fixture(scope='session')
async def example_service_scope(
pytestconfig,
create_daemon_scope,
example_root,
example_service_baseurl,
pgsql_local,
):
async with create_daemon_scope(
args=[
sys.executable,
str(example_root.joinpath('server.py')),
'--port',
str(pytestconfig.option.example_service_port),
'--postgresql',
pgsql_local['chat_messages'].get_uri(),
],
ping_url=example_service_baseurl + 'ping',
) as scope:
yield scope
@pytest.fixture(scope='session')
def pgsql_local(example_root, pgsql_local_create):
databases = discover.find_schemas(
'chat_storage_postgres',
[example_root.joinpath('schemas/postgresql')],
)
return pgsql_local_create(list(databases.values()))
Test¶
async def test_messages_send(example_client, pgsql):
response = await example_client.post(
'/messages/send',
json={'username': 'foo', 'text': 'bar'},
)
assert response.status_code == 200
data = response.json()
assert 'id' in data
cursor = pgsql['chat_messages'].cursor()
cursor.execute(
'SELECT username, text FROM messages WHERE id = %s',
(data['id'],),
)
record = cursor.fetchone()
assert record == ('foo', 'bar')
async def test_messages_retrieve(example_client, pgsql):
response = await example_client.post('/messages/retrieve', json={})
assert response.json() == {
'messages': [
{
'created': '2019-12-31T21:00:01+00:00',
'text': 'happy ny',
'username': 'bar',
},
{
'created': '2019-12-31T21:00:00+00:00',
'text': 'hello, world!',
'username': 'foo',
},
],
}