Reference

Data

DataView

class openapi.data.view.DataView[source]

Utility class for data with a valid Supported Schema

cleaned(schema, data, *, multiple=False, strict=True, Error=None)[source]

Clean data using a given schema

Parameters
  • schema (Any) – a valid Supported Schema or an the name of an attribute in Operation

  • data (Union[Dict[str, Any], MultiDict]) – data to validate and clean

  • multiple (bool) – multiple values for a given key are acceptable

  • strict (bool) – all required attributes in schema must be available

  • Error (Optional[type]) – optional Exception class

Return type

Any

dump(schema, data)[source]

Dump data using a given a valid Supported Schema, if the schema is None it returns the same data as the input.

Parameters
  • schema (Any) – a schema or an the name of an attribute in Operation

  • data (Any) – data to clean and dump

Return type

Any

get_schema(schema=None)[source]

Get the Supported Schema. If not found it raises an exception

Parameters

schema (Optional[Any]) – a schema or an the name of an attribute in Operation

Return type

TypingInfo

get_special_params(params)[source]

A dictionary of special parameters extracted from params. This function has side effects on params.

Parameters

params (Dict[str, Any]) – dictionary from where special parameters are extracted

Return type

Dict[str, Any]

raise_validation_error(message='', errors=None)[source]

Raise an aiohttp.web.HTTPUnprocessableEntity

Return type

NoReturn

validation_error(message='', errors=None)[source]

Create the validation exception used by raise_validation_error()

Return type

Exception

TypeInfo

class openapi.utils.TypingInfo(element: Any, container: Optional[type] = None)[source]

Information about a type annotation

container: Optional[type]

Alias for field number 1

element: Any

Alias for field number 0

classmethod get(value)[source]

Create a TypingInfo from a typing annotation or another typing info

Parameters

value (Any) – typing annotation

Return type

Optional[TypingInfo]

property is_complex

True if element is either a dataclass or a union

Return type

bool

property is_dataclass

True if element is a dataclass

Return type

bool

property is_union

True if element is a union of typing info

Return type

bool

Data Fields

openapi.data.fields.data_field(required=False, validator=None, dump=None, format=None, description=None, items=None, post_process=None, ops=(), meta=None, **kwargs)[source]

Extend a dataclass field with the following metadata

Parameters
  • validator (Optional[Callable[[Field, Any], Any]]) – optional callable which accept field and raw value as inputs and return the validated value

  • required (bool) – boolean specifying if field is required

  • dump (Optional[Callable[[Any], Any]]) – optional callable which receive the field value and convert to the desired value to serve in requests

  • format (Optional[str]) – optional string which represents the JSON schema format

  • description (Optional[str]) – optional field description

  • items (Optional[Field]) – field for items of the current field (only used for List and Dict fields)

  • post_process (Optional[Callable[[Any], Any]]) – post processor function executed after validation

  • ops (Tuple) – optional tuple of strings specifying available operations

  • meta (Optional[Dict[str, Any]]) – optional dictionary with additional metadata

Return type

Field

String field

openapi.data.fields.str_field(min_length=0, max_length=0, **kw)[source]

A specialized data_field() for strings

Parameters
  • min_length (int) – minim length of string

  • max_length (int) – maximum length of string

Return type

Field

Bool field

openapi.data.fields.bool_field(**kw)[source]

Specialized data_field() for bool types

Return type

Field

UUID field

openapi.data.fields.uuid_field(format='uuid', **kw)[source]

A UUID field with validation

Return type

Field

Numeric field

openapi.data.fields.number_field(min_value=None, max_value=None, precision=None, **kw)[source]

A specialized data_field() for numeric values

Parameters
Return type

Field

Integer field

openapi.data.fields.integer_field(min_value=None, max_value=None, **kw)[source]

A specialized data_field() for integer values

Parameters
Return type

Field

Email field

openapi.data.fields.email_field(min_length=0, max_length=0, **kw)[source]

A specialized data_field() for emails, validation via the email_validator third party library

Parameters
  • min_length (int) – minimum length of email

  • max_length (int) – maximum length of email

Return type

Field

Enum field

openapi.data.fields.enum_field(EnumClass, **kw)[source]

A specialized data_field() for enums

Parameters

EnumClass – enum for validation

Return type

Field

Date field

openapi.data.fields.date_field(**kw)[source]

A specialized data_field() for dates

Return type

Field

Datetime field

openapi.data.fields.date_time_field(timezone=False, **kw)[source]

A specialized data_field() for datetimes

Parameters

timezone – timezone for validation

Return type

Field

JSON field

openapi.data.fields.json_field(**kw)[source]

A specialized data_field() for JSON data

Return type

Field

Data Validation

Validate

The entry function to validate input data and return a python representation. The function accept as input a valid type annotation or a TypingInfo object.

openapi.data.validate.validate(schema, data, *, strict=True, multiple=False, raise_on_errors=False, items=None, as_schema=False)[source]

Validate data with a given schema

Parameters
  • schema (Any) – a typing annotation or a TypingInfo object

  • data (Any) – a data object to validate against the schema

  • strict (bool) – if True validation is strict, i.e. missing required parameters will cause validation to fails

  • multiple (bool) – allow parameters to have multiple values

  • raise_on_errors (bool) – when True failure of validation will result in a ValidationErrors error, otherwise a ValidatedData object is returned.

  • items (Optional[Field]) – an optional Field for items in a composite type (List or Dict)

  • as_schema (bool) – return the schema object rather than simple data type (dataclass rather than dict for example)

Return type

Any

Validate Schema

Same as the validate() but returns the validation schema object rather than simple data types (this is mainly different for dataclasses)

openapi.data.validate.validated_schema(schema, data, *, strict=True)[source]

Validate data with a given schema and return a valid representation of the data as a schema instance

Parameters
  • schema (Any) – a valid Supported Schema or a TypingInfo object

  • data (Any) – a data object to validate against the schema

  • strict (bool) – if True validation is strict, i.e. missing required parameters will cause validation to fails

Return type

Any

Dataclass from db table

openapi.data.db.dataclass_from_table(name, table, *, exclude=None, include=None, required=False, ops=None)[source]

Create a dataclass from an sqlalchemy.schema.Table

Parameters
  • name (str) – dataclass name

  • table (Table) – sqlalchemy table

  • exclude (Optional[Sequence[str]]) – fields to exclude from the dataclass

  • include (Optional[Sequence[str]]) – fields to include in the dataclass

  • required (bool) – set all non nullable columns as required fields in the dataclass

  • ops (Optional[Dict[str, Sequence[str]]]) – additional operation for fields

Return type

type

Dump data

openapi.data.dump.dump(schema, data)[source]

Dump data with a given schema.

Parameters
  • schema (Any) – a valid Supported Schema

  • data (Any) – data to dump, if dataclasses are part of the schema, the dump metadata function will be used if available (see data_field())

Return type

Any

Openapi Spec

OpenApiSpec

class openapi.spec.OpenApiSpec(info=<factory>, default_content_type='application/json', default_responses=<factory>, security=<factory>, servers=<factory>, validate_docs=False, allowed_tags=<factory>, spec_url='/spec', redoc=None)[source]

Open API Specification

op decorator

Decorator for specifying schemas at route/method level. It is used by both the business logic as well the auto-documentation.

class openapi.spec.op(body_schema=None, query_schema=None, response_schema=None, response=200)[source]

Decorator for a ApiPath view which specifies an operation object in an OpenAPI Path. Parameters are dataclasses used for validation and OpenAPI auto documentation.

Redoc

Allow to add redoc redering to your api.

class openapi.spec.Redoc(path='/docs', favicon_url='https://raw.githubusercontent.com/Redocly/redoc/master/demo/favicon.png', redoc_js_url='https://cdn.jsdelivr.net/npm/redoc@next/bundles/redoc.standalone.js', font='family=Montserrat:300,400,700|Roboto:300,400,700')[source]

A dataclass for redoc rendering

DB

This module provides integration with SqlAlchemy asynchronous engine for postgresql. The connection string supported is of this type only:

postgresql+asyncpg://<db_user>:<db_password>@<db_host>:<db_port>/<db_name>

Database

class openapi.db.container.Database(dsn='', metadata=None)[source]

A container for tables in a database and a manager of asynchronous connections to a postgresql database

Parameters
property dsn

Data source name used for database connections

Return type

str

property metadata

The sqlalchemy.schema.MetaData containing tables

Return type

MetaData

property engine

The sqlalchemy.ext.asyncio.AsyncEngine creating connection and transactions

Return type

AsyncEngine

property sync_engine

The sqlalchemy.engine.Engine for synchrouns operations

Return type

Engine

__getattr__(name)[source]

Retrive a sqlalchemy.schema.Table from metadata tables

Parameters

name (str) – if this is a valid table name in the tables of metadata it returns the table, otherwise it defaults to superclass method

Return type

Any

connection()[source]

Context manager for obtaining an asynchronous connection

Return type

AsyncConnection

transaction()[source]

Context manager for initializing an asynchronous database transaction

Return type

AsyncConnection

ensure_connection(conn=None)[source]

Context manager for ensuring we a connection has initialized a database transaction

Return type

AsyncConnection

async close()[source]

Close the asynchronous db engine if opened

Return type

None

create_all()[source]

Create all tables defined in metadata

Return type

None

drop_all()[source]

Drop all tables from metadata in database

Return type

None

drop_all_schemas()[source]

Drop all schema in database

Return type

None

CrudDB

Database container with CRUD operations. Used extensively by the SqlApiPath routing class.

class openapi.db.dbmodel.CrudDB(dsn='', metadata=None)[source]

A Database with additional methods for CRUD operations

async db_count(table, filters, *, conn=None, consumer=None)[source]

Count rows in a table

Parameters
  • table (Table) – sqlalchemy Table

  • filters (Dict) – key-value pairs for filtering rows

  • conn (Optional[AsyncConnection]) – optional db connection

  • consumer (Optional[Any]) – optional consumer (see get_query())

Return type

int

async db_delete(table, filters, *, conn=None, consumer=None)[source]

Delete rows from a given table

Parameters
  • table (Table) – sqlalchemy Table

  • filters (Dict) – key-value pairs for filtering rows

  • conn (Optional[AsyncConnection]) – optional db connection

  • consumer (Optional[Any]) – optional consumer (see get_query())

Return type

CursorResult

async db_insert(table, data, *, conn=None)[source]

Perform an insert into a table

Parameters
  • table (Table) – sqlalchemy Table

  • data (Union[List[Dict], Dict]) – key-value pairs for columns values

  • conn (Optional[AsyncConnection]) – optional db connection

Return type

CursorResult

async db_select(table, filters, *, conn=None, consumer=None)[source]

Select rows from a given table

Parameters
  • table (Table) – sqlalchemy Table

  • filters (Dict) – key-value pairs for filtering rows

  • conn (Optional[AsyncConnection]) – optional db connection

  • consumer (Optional[Any]) – optional consumer (see get_query())

Return type

CursorResult

async db_update(table, filters, data, *, conn=None, consumer=None)[source]

Perform an update of rows

Parameters
  • table (Table) – sqlalchemy Table

  • filters (Dict) – key-value pairs for filtering rows to update

  • data (Dict) – key-value pairs for updating columns values of selected rows

  • conn (Optional[AsyncConnection]) – optional db connection

  • consumer (Optional[Any]) – optional consumer (see get_query())

Return type

CursorResult

async db_upsert(table, filters, data=None, *, conn=None, consumer=None)[source]

Perform an upsert for a single record

Parameters
  • table (Table) – sqlalchemy Table

  • filters (Dict) – key-value pairs for filtering rows to update

  • data (Optional[Dict]) – key-value pairs for updating columns values of selected rows

  • conn (Optional[AsyncConnection]) – optional db connection

  • consumer (Optional[Any]) – optional consumer (see get_query())

Return type

Row

default_filter_field(field, op, value)[source]

Applies a filter on a field.

Notes on ‘ne’ op:

Example data: [None, ‘john’, ‘roger’] ne:john would return only roger (i.e. nulls excluded) ne: would return john and roger

Notes on ‘search’ op:

For some reason, SQLAlchemy uses to_tsquery rather than plainto_tsquery for the match operator

to_tsquery uses operators (&, |, ! etc.) while plainto_tsquery tokenises the input string and uses AND between tokens, hence plainto_tsquery is what we want here

For other database back ends, the behaviour of the match operator is completely different - see: http://docs.sqlalchemy.org/en/rel_1_0/core/sqlelement.html

Parameters
  • field (Column) – field name

  • op (str) – ‘eq’, ‘ne’, ‘gt’, ‘lt’, ‘ge’, ‘le’ or ‘search’

  • value (Any) – comparison value, string or list/tuple

Returns

get_query(table, query, *, params=None, consumer=None)[source]

Build an SqlAlchemy query

Parameters
  • table (Table) – sqlalchemy Table

  • query (Union[Delete, Update, Select]) – sqlalchemy query type

  • params (Optional[Dict]) – key-value pairs for the query

  • consumer (Optional[Any]) – optional consumer for manipulating parameters

Return type

Union[Delete, Update, Select]

get_db

openapi.db.get_db(app, store_url=None)[source]

Create an Open API db handler and set it for use in an aiohttp application

Parameters
  • app (Application) – aiohttp Application

  • store_url (Optional[str]) – datastore connection string, if not provided the env variable DATASTORE is used instead. If the env variable is not available either the method logs a warning and return None

This function 1) adds the database to the aiohttp application at key “db”, 2) add the db command to the command line client (if command is True) and 3) add the close handler on application shutdown

Return type

Optional[CrudDB]

SingleConnDatabase

A CrudDB container for testing database driven Rest APIs.

class openapi.testing.SingleConnDatabase(*args, **kwargs)[source]

Useful for speedup testing

connection()[source]

Context manager for obtaining an asynchronous connection

Return type

AsyncConnection

transaction()[source]

Context manager for initializing an asynchronous database transaction

Return type

AsyncConnection

Routes

ApiPath

class openapi.spec.path.ApiPath(request)[source]

A DataView class for OpenAPI path

path_schema: Optional[type] = None

Optional dataclass for validating path variables

insert_data(data, *, multiple=False, strict=True, body_schema='body_schema')[source]

Validate data for insertion

if a path_schema is given, it validate the request match_info against it and add it to the validated data.

Parameters
  • data (Any) – object to be validated against the body_schema, usually obtained from the request body (JSON)

  • multiple (bool) – multiple values for a given key are acceptable

  • strict (bool) – all required attributes in schema must be available

  • body_schema (Union[str, List[type], type]) – the schema to validate against

Return type

Dict[str, Any]

get_filters(*, query=None, query_schema='query_schema')[source]

Collect a dictionary of filters from the request query string. If path_schema is defined, it collects filter data from path variables as well.

Parameters
  • query (Union[Dict[str, Any], MultiDict, None]) – optional query dictionary (will be overwritten by the request.query)

  • query_schema (Union[str, List[type], type]) – a dataclass or an the name of an attribute in Operation for collecting query filters

Return type

Dict[str, Any]

async json_data()[source]

Load JSON data from the request.

Raises

HTTPBadRequest – when body data is not valid JSON

Return type

Any

validation_error(message='', errors=None)[source]

Create an aiohttp.web.HTTPUnprocessableEntity

Return type

Exception

SqlApiPath

class openapi.db.path.SqlApiPath(request)[source]

An ApiPath backed by an SQL model

table: str = ''

sql table name

property db

Database connection pool

Return type

CrudDB

property db_table

Default database table for this route.

Obtained from the table attribute.

Return type

Table

async get_list(*, filters=None, query=None, table=None, query_schema='query_schema', dump_schema='response_schema', conn=None)[source]

Get a list of models

Parameters
  • filters (Optional[Dict[str, Any]]) – dictionary of filters, if not provided it will be created from the query_schema

  • query (Optional[Dict[str, Any]]) – additional query parameters, only used when filters is not provided

  • table (Optional[Table]) – sqlalchemy table, if not provided the default db_table is used instead

  • conn (Optional[AsyncConnection]) – optional db connection

Return type

PaginatedData

async create_one(*, data=None, table=None, body_schema='body_schema', dump_schema='response_schema', conn=None)[source]

Create a new database model

Parameters
Return type

Dict[str, Any]

async create_list(*, data=None, table=None, body_schema='body_schema', dump_schema='response_schema', conn=None)[source]

Create multiple models

async get_one(*, filters=None, query=None, table=None, query_schema='query_schema', dump_schema='response_schema', conn=None)[source]

Get a single model

Parameters
  • filters (Optional[Dict[str, Any]]) – dictionary of filters, if not provided it will be created from the query_schema

  • query (Optional[Dict[str, Any]]) – additional query parameters, only used when filters is not provided

  • table (Optional[Table]) – sqlalchemy table, if not provided the default db_table is used instead

  • conn (Optional[AsyncConnection]) – optional db connection

async update_one(*, data=None, filters=None, query=None, table=None, body_schema='body_schema', query_schema='query_schema', dump_schema='response_schema', conn=None)[source]

Update a single model

async delete_one(*, filters=None, query=None, table=None, query_schema='query_schema', conn=None)[source]

Delete a single model

Return type

Row

async delete_list(*, filters=None, query=None, table=None, conn=None)[source]

delete multiple models

Return type

CursorResult

Websocket

Websocket RPC

class openapi.ws.manager.Websocket[source]

A websocket connection

socket_id: str = ''

websocket ID

SocketsManager

class openapi.ws.manager.SocketsManager[source]

A base class for websocket managers

property sockets

Set of connected Websocket

property channels

Pub/sub Channels currently active on the running pod

add(ws)[source]

Add a new websocket to the connected set

Return type

None

remove(ws)[source]

Remove a websocket from the connected set

Return type

None

server_info()[source]

Server information

Return type

Dict

async close_sockets()[source]

Close and remove all websockets from the connected set

Return type

None

async publish(channel, event, body)[source]

Publish an event to a channel

Property channel

the channel to publish to

Property event

the event in the channel

Property body

the body of the event to broadcast in the channel

This method should raise CannotPublish if not possible to publish

Return type

None

async subscribe(channel)[source]

Subscribe to a channel

This method should raise CannotSubscribe if not possible to publish

Return type

None

async subscribe_to_event(channel, event)[source]

Callback when a subscription to an event is done

Property channel

the channel to publish to

Property event

the event in the channel

You can use this callback to perform any backend subscriptions to third-party streaming services if required.

By default it does nothing.

Return type

None

async unsubscribe(channel)[source]

Unsubscribe from a channel

Return type

None

Channels

class openapi.ws.channels.Channels(sockets)[source]

Manage channels for publish/subscribe

property registered

Registered channels

Return type

Tuple[str, …]

async register(channel_name, event_name, callback)[source]

Register a callback

Parameters
  • channel_name (str) – name of the channel

  • event_name (str) – name of the event in the channel or a pattern

  • callback (Callable[[], None]) – the callback to invoke when the event on channel occurs

Return type

Channel

async unregister(channel_name, event, callback)[source]

Safely unregister a callback from the list of event callbacks for channel_name

Return type

Optional[Channel]

Channel

class openapi.ws.channel.Channel(name, _events=<factory>)[source]

A websocket channel

property events

List of event names this channel is registered with

register(event_name, callback)[source]

Register a callback for event_name

event_pattern(event)[source]

Channel pattern for an event name

WsPathMixin

class openapi.ws.path.WsPathMixin[source]

Api Path mixin for Websocket RPC protocol

SOCKETS_KEY = 'web_sockets'

Key in the app where the Web Sockets manager is located

property sockets

Connected websockets

Return type

SocketsManager

property channels

Channels for pub/sub

Return type

Channels

decode_message(msg)[source]

Decode JSON string message, override for different protocol

Return type

Any

encode_message(msg)[source]

Encode as JSON string message, override for different protocol

Return type

str

Subscribe

class openapi.ws.pubsub.Subscribe[source]

Mixin which implements the subscribe and unsubscribe RPC methods

Must be used as mixin of WsPathMixin

property channel_callback

The callback for Channels

async ws_rpc_subscribe(payload)[source]

Subscribe to an event on a channel

async ws_rpc_unsubscribe(payload)[source]

Unsubscribe to an event on a channel

Publish

class openapi.ws.pubsub.Publish[source]

Mixin which implements the publish RPC method

Must be used as mixin of WsPathMixin

get_publish_message(data)[source]

Create the publish message from the data payload

Return type

Any

async ws_rpc_publish(payload)[source]

Publish an event on a channel