Websocket RPC

The library includes a minimal API for Websocket JSON-RPC (remote procedure calls).

To add websockets RPC you need to create a websocket route:

from aiohttp import web
from openapi.spec import ApiPath
from openapi.ws import WsPathMixin
from openapi.ws.pubsub import Publish, Subscribe

ws_routes = web.RouteTableDef()

class Websocket(ApiPath, WsPathMixin, Subscribe, Publish):

    async def ws_rpc_info(self, payload):
        """Server information"""
        return self.sockets.server_info()

the WsPathMixin adds the get method for accepting websocket requests with the RPC protocol. Subscribe and Publish are optional mixins for adding Pub/Sub RPC methods to the endpoint.

The endpoint can be added to an application in the setup function:

from aiohttp.web import Application

from openapi.ws import SocketsManager

def setup_app(app: Application) -> None:
    app['web_sockets'] = SocketsManager()

RPC protocol

The RPC protocol has the following structure for incoming messages

    "id": "abc",
    "method": "rpc_method_name",
    "payload": {

The id is used by clients to link the request with the corresponding response. The response for an RPC call is either a success

    "id": "abc",
    "method": "rpc_method_name",
    "response": {

or an error

    "id": "abc",
    "method": "rpc_method_name":
    "error": {


To subscribe to messages, one need to use the Subscribe mixin with the websocket route (like we have done in this example). Messages take the form:

    "channel": "channel_name",
    "event": "event_name",
    "data": {


The websocket backend is implemented by subclassing the SocketsManager and implement the methods required by your application. This example implements a very simple backend for testing the websocket module in unittests.

import asyncio

from aiohttp import web
from openapi.ws.manager import SocketsManager

class LocalBroker(SocketsManager):
    """A local broker for testing"""

    def __init__(self):
        self.binds = set()
        self.messages: asyncio.Queue = asyncio.Queue()
        self.worker = None
        self._stop = False

    def for_app(cls, app: web.Application) -> "LocalBroker":
        broker = cls()
        return broker

    async def start(self, *arg):
        if not self.worker:
            self.worker = asyncio.ensure_future(self._work())

    async def publish(self, channel: str, event: str, body: Any):
        """simulate network latency"""
        if channel.lower() != channel:
            raise CannotPublish
        payload = dict(event=event, data=self.get_data(body))
            0.01, self.messages.put_nowait, (channel, payload)

    async def subscribe(self, channel: str) -> None:
        """ force channel names to be lowercase"""
        if channel.lower() != channel:
            raise CannotSubscribe

    async def close(self, *arg):
        self._stop = True
        await self.close_sockets()
        if self.worker:
            self.messages.put_nowait((None, None))
            await self.worker
            self.worker = None

    async def _work(self):
        while True:
            channel, body = await self.messages.get()
            if self._stop:
            await self.channels(channel, body)

    def get_data(self, data: Any) -> Any:
        if data == "error":
            return self.raise_error
        elif data == "runtime_error":
            return self.raise_runtime
        return data

    def raise_error(self):
        raise ValueError

    def raise_runtime(self):
        raise RuntimeError