Source code for ssdaq.core.receiver_server

import asyncio
import inspect
import zmq
import zmq.asyncio
from distutils.version import LooseVersion
from ssdaq import sslogger

if LooseVersion("17") > LooseVersion(zmq.__version__):
    zmq.asyncio.install()


[docs]class ReceiverServer: """ Base class for receivers. Implements some of the boilerplate code to setup a server that listens to either a TCP or UDP socket, publishes processed data and can receive commands via ipc. Asynchrosity is implemented using asyncio. Executing the `run()` method will, thus, start the event loop. Args: ip (str): The ip address/interface to listen to port (int): The port to listen to publishers (list): A list of publisher instances that are cycled through when `self.publish(data)` is called name (str): A name for the instance (usefull for logging) Kwargs: loop (asyncio.loop):If not given an an event loop an asyncio loop will retreived. """
[docs] def __init__(self, ip: str, port: int, publishers: list, name: str, loop=None): """ The init of a ReceiverServer """ self.loop = loop or asyncio.get_event_loop() self.log = sslogger.getChild(name) self._name = name self.publishers = publishers self.listen_addr = (ip, port) self.corrs = [] for p in self.publishers: p.set_loop(self.loop) # setting up communications socket self._context = zmq.asyncio.Context() self._com_sock = self._context.socket(zmq.REP) self._com_sock.bind("ipc:///tmp/{}".format(self._name)) self._setup = False
[docs] def setup_stream(self, recv_protocol): """ Adds a TCP stream socket in the ReceiverServer eventloop. Args: recv_protocol: an asyncio.Protocol that conforms to asyncio TCP protocols """ self._setup = True self.log.info( "Settting up TCP receiver socket at %s:%d" % (tuple(self.listen_addr)) ) listen = self.loop.create_server( recv_protocol, host=self.listen_addr[0], port=self.listen_addr[1] ) return self.loop.run_until_complete(listen)
[docs] def setup_udp(self, recv_protocol): """ Adds a UDP socket in the ReceiverServer eventloop. Args: recv_protocol: an asyncio.Protocol that conforms to asyncio Datagram protocols """ self._setup = True self.log.info( "Settting up UDP receiver socket at %s:%d" % (tuple(self.listen_addr)) ) listen = self.loop.create_datagram_endpoint( recv_protocol, local_addr=self.listen_addr ) return self.loop.run_until_complete(listen)
[docs] def run(self): """ Starts the eventloop of the ReceiverServer (blocking) """ if not self._setup: raise RuntimeError("No receiver socket setup") self.log.info("Number of publishers registered %d" % len(self.publishers)) self._introspect() for c in self.corrs: self.loop.create_task(c) try: self.loop.run_forever() except Exception as e: self.log.error("Exception caught while running event loop: {}".format(e)) self.loop.close()
def _introspect(self): # Introspecting to find all methods that # handle commands method_list = inspect.getmembers(self, predicate=inspect.ismethod) self.cmds = {} for method in method_list: if method[0][:4] == "cmd_": self.cmds[method[0][4:]] = method[1] if method[0][:3] == "ct_": self.corrs.append(method[1]())
[docs] async def handle_commands(self): """ This is the server part of the receiver server that handles incomming control commands """ while True: cmd = await self._com_sock.recv() self.log.info("Handling incoming command %s" % cmd.decode("ascii")) cmd = cmd.decode("ascii").split(" ") if cmd[0] in self.cmds.keys(): reply = self.cmds[cmd[0]](cmd[1:]) else: reply = b"Error, No command `%s` found." % (cmd[0]) self.log.info("Incomming command `%s` not recognized") self._com_sock.send(reply)
[docs] async def publish(self, packet: bytes): """ Publishes the packed processed data on the publishers in the publisher list of the Receiver. Should be called by the inheriting classes to publish data Args: packet (bytes): The data packet that should be published """ tasks = [] for pub in self.publishers: tasks.append(self.loop.create_task(pub.apublish(packet))) for task in tasks: await task