Source code for openapi.ws.manager

import asyncio
from functools import cached_property
from typing import Any, Callable, Dict, Set

from .channels import CannotSubscribe, Channels
from .errors import CannotPublish

WsHandlerType = Callable[[str, Any], None]


[docs]class Websocket: """A websocket connection""" socket_id: str = "" """websocket ID""" def __str__(self) -> str: return self.socket_id
[docs]class SocketsManager: """A base class for websocket managers""" @cached_property def sockets(self) -> Set[Websocket]: """Set of connected :class:`.Websocket`""" return set() @cached_property def channels(self) -> Channels: """Pub/sub :class:`.Channels` currently active on the running pod""" return Channels(self)
[docs] def add(self, ws: Websocket) -> None: """Add a new websocket to the connected set""" self.sockets.add(ws)
[docs] def remove(self, ws: Websocket) -> None: """Remove a websocket from the connected set""" self.sockets.discard(ws)
[docs] def server_info(self) -> Dict: """Server information""" return dict(connections=len(self.sockets), channels=self.channels.info())
[docs] async def close_sockets(self) -> None: """Close and remove all websockets from the connected set""" await asyncio.gather(*[view.response.close() for view in self.sockets]) self.sockets.clear() self.channels.clear()
[docs] async def publish( self, channel: str, event: str, body: Dict ) -> None: # pragma: no cover """Publish an event to a channel :property channel: the channel to publish to :property event: the event in the channel :property body: the body of the event to broadcast in the channel This method should raise :class:`.CannotPublish` if not possible to publish """ raise CannotPublish
[docs] async def subscribe(self, channel: str) -> None: # pragma: no cover """Subscribe to a channel This method should raise :class:`.CannotSubscribe` if not possible to publish """ raise CannotSubscribe
[docs] async def subscribe_to_event(self, channel: str, event: str) -> None: """Callback when a subscription to an event is done :property channel: the channel to publish to :property event: the event in the channel You can use this callback to perform any backend subscriptions to third-party streaming services if required. By default it does nothing. """
[docs] async def unsubscribe(self, channel: str) -> None: """Unsubscribe from a channel"""