Receivers

Receivers are intended to be the core part of server applications running as daemons in the background to receive data from devices (such as the CHECS-Camera). Optionally these data can be lightly processed and lastly relayed forward usually on a zmq TCP socket that is subscriptable.

ssdaq.core.ReceiverServer

Receivers should inherit from the ReceiverServer class in order to be started with the control-ssdaq application. The class contains functionality to open a TCP or UDP listening sockets using asyncio. Methods prefixed with ct_ must be coroutines and will be added once to the event loop when the run() method is called. The ReceiverServer class also implements a command receiver server. Callbacks methods for the commands are prefixed with cmd_.

class ssdaq.core.ReceiverServer(ip: str, port: int, publishers: list, name: str, loop=None)[source]

Base class for receivers.

Implements some of the boilerplate code to setup a server that listens to either a TCP or UDP socket, publishes processed data and can receive commands via ipc. Asynchrosity is implemented using asyncio. Executing the run() method will, thus, start the event loop.

Parameters:
  • ip (str) – The ip address/interface to listen to
  • port (int) – The port to listen to
  • publishers (list) – A list of publisher instances that are cycled through when self.publish(data) is called
  • name (str) – A name for the instance (usefull for logging)
Kwargs:
loop (asyncio.loop):If not given an an event loop an asyncio loop will
retreived.
__init__(ip: str, port: int, publishers: list, name: str, loop=None)[source]

The init of a ReceiverServer

handle_commands()[source]

This is the server part of the receiver server that handles incomming control commands

publish(packet: bytes)[source]

Publishes the packed processed data on the publishers in the publisher list of the Receiver.

Should be called by the inheriting classes to publish data

Parameters:packet (bytes) – The data packet that should be published
run()[source]

Starts the eventloop of the ReceiverServer (blocking)

setup_stream(recv_protocol)[source]

Adds a TCP stream socket in the ReceiverServer eventloop.

Parameters:recv_protocol – an asyncio.Protocol that conforms to asyncio TCP protocols
setup_udp(recv_protocol)[source]

Adds a UDP socket in the ReceiverServer eventloop.

Parameters:recv_protocol – an asyncio.Protocol that conforms to asyncio Datagram protocols

Example of a receiver listening to a UDP socket

The following code example shows the typical way to set up a receiver in SSDAQ that listens to a UDP socket and later publishes the received packets:

import asyncio
from ssdaq.core.receiver_server import ReceiverServer
from ssdaq.receivers.mon_sender import ReceiverMonSender
class PacketProtocol(asyncio.Protocol):
    def __init__(self, log):
        self.buffer = asyncio.Queue()
        self.log = log.getChild("PacketProtocol")

    def connection_made(self, transport):
        self.log.info("Connected to port")
        self.transport = transport

    def datagram_received(self, data, addr):
        self.buffer.put_nowait((data, addr))


class PacketReceiver(ReceiverServer):
    def __init__(self, ip: str, port: int, publishers: list):
        super().__init__(ip, port, publishers, "PacketReceiver")
        self.trans, self.tpp = self.setup_udp(
            lambda: PacketProtocol(self.log)
        )
        self.running = True
        # To send basic monitoring data a ReceiverMonSender instance is needed
        self.mon = ReceiverMonSender("PacketReceiver", self.loop, self._context)
    # This method will be automatically added to as a task to the
    # asyncio eventloop
    async def ct_relay(self):
        while self.running:
            packet = await self.tpp.buffer.get()
            # Need to register each time we get a packet
            # The ReceiverMonSender automatically calculates the
            # rate and pushes the monitoring data to a monitor receiver
            self.mon.register_data_packet()
            await self.publish(packet[0])

Furthermore, nothing prevents from calling self.setup_udp several times in the __init__() and have a ct_relay method for each connection to listen and handle data from several ports at the same time.

Using zmq sockets

It is also possible to use a zmq socket instead of the asyncio TCP and UDP sockets. The code actually gets even shorter:

import asyncio
import zmq
from ssdaq.core.receiver_server import ReceiverServer
from ssdaq.recievers.mon_sender import ReceiverMonSender


class ZMQReceiver(ReceiverServer):
    def __init__(self, ip: str, port: int, publishers: list):
        super().__init__(ip, port, publishers, "ZMQReceiver")
        self.running = True
        # The ReceiverServer already has a zmq context
        self.receiver = self._context.socket(zmq.PULL)
        connectionstr = "tcp://{}:{}".format(ip, port)
        self.log.info("Setting up monitor zmq pull server at {}".format(connectionstr))
        self.receiver.bind(connectionstr)
        # NOTE: this is needed when not setting socket via the ReceiverServer methods
        self._setup = True
        self.mon = ReceiverMonSender("ZMQReceiver", self.loop, self._context)

    async def ct_pull(self):
        while self.running:
            packet = await self.receiver.recv()
            self.mon.register_data_packet()
            await self.publish(packet)

Note the line self._setup = True in the __init__ which is needed to overide a check that there is at least one listening socket added.

ssdaq.core.publishers

class ssdaq.core.publishers.ZMQTCPPublisher(ip: str, port: int, name: str = 'ZMQTCPPublisher', logger=None, mode: str = 'local')[source]

A generic zmq tcp publisher

Publishes on a TCP/IP socket using the zmq PUB-SUB protocol.

__init__(ip: str, port: int, name: str = 'ZMQTCPPublisher', logger=None, mode: str = 'local')[source]

A generic zmq tcp publisher

Parameters:
  • ip (str) – the name (ip) interface which the readouts are published on
  • port (int) – the port number of the interface which the readouts are published on
  • name (str, optional) – The name of this instance (usefull for logging)
  • logger (None, optional) – An instance of a logger to inherit from
  • mode (str, optional) – The mode of publishing. Possible modes (‘local’,’outbound’, ‘remote’)

The three different modes defines how the socket is setup for different use cases.

  • ‘local’ this is the normal use case where readouts are published on localhost
    and ip should be ‘127.x.x.x’
  • ‘outbound’ is when the readouts are published on an outbound network interface of
    the machine so that remote clients can connect to the machine to receive the readouts. In this case ip is the ip address of the interface on which the readouts should be published
  • ‘remote’ specifies that the given ip is of a remote machine to which the readouts should be sent to.

Listing derived Receivers

class ssdaq.receivers.log_receiver.LogReceiver(ip: str, port: int, publishers: list, name: str = 'LogReceiver')[source]
__init__(ip: str, port: int, publishers: list, name: str = 'LogReceiver')[source]

The init of a ReceiverServer

class ssdaq.receivers.log_receiver.LogReceiverProtocol(server, loop, log)[source]
__init__(server, loop, log)[source]

Initialize self. See help(type(self)) for accurate signature.

connection_lost(exc)[source]

Called when the connection is lost or closed.

The argument is an exception object or None (the latter meaning a regular EOF is received or the connection was aborted or closed).

connection_made(transport)[source]

Called when a connection is made.

The argument is the transport representing the pipe connection. To receive data, wait for data_received() calls. When the connection is closed, connection_lost() is called.

data_received(data)[source]

Called when some data is received.

The argument is a bytes object.

class ssdaq.receivers.monitor_receiver.MonitorReceiver(ip: str, port: int, publishers: list)[source]
__init__(ip: str, port: int, publishers: list)[source]

The init of a ReceiverServer

class ssdaq.receivers.teldata_receiver.TelDataReceiver(ip: str, port: int, publishers: list, name: str = 'TelDataReceiver')[source]
__init__(ip: str, port: int, publishers: list, name: str = 'TelDataReceiver')[source]

The init of a ReceiverServer

class ssdaq.receivers.timestamp_receiver.TimestampReceiver(ip: str, port: int, publishers: list)[source]
__init__(ip: str, port: int, publishers: list)[source]

The init of a ReceiverServer

class ssdaq.receivers.trigger_receiver.TriggerPacketProtocol(loop, log)[source]
__init__(loop, log)[source]

Initialize self. See help(type(self)) for accurate signature.

connection_made(transport)[source]

Called when a connection is made.

The argument is the transport representing the pipe connection. To receive data, wait for data_received() calls. When the connection is closed, connection_lost() is called.

class ssdaq.receivers.trigger_receiver.TriggerPacketReceiver(ip: str, port: int, publishers: list)[source]
__init__(ip: str, port: int, publishers: list)[source]

The init of a ReceiverServer

class ssdaq.receivers.readout_assembler.ReadoutAssembler(relaxed_ip_range: bool = False, readout_tw: float = 100000.0, listen_ip: str = '0.0.0.0', listen_port: int = 2009, buffer_length: int = 1000, buffer_time: float = 10000000000.0, publishers: list = None, packet_debug_stream_file: str = None)[source]

Slow signal readout assembler. Constructs slow signal readouts from data packets recieved from Target Modules.

__init__(relaxed_ip_range: bool = False, readout_tw: float = 100000.0, listen_ip: str = '0.0.0.0', listen_port: int = 2009, buffer_length: int = 1000, buffer_time: float = 10000000000.0, publishers: list = None, packet_debug_stream_file: str = None)[source]

Summary

Parameters:
  • relaxed_ip_range (bool, optional) – Description
  • readout_tw (float, optional) – Description
  • listen_ip (str, optional) – Description
  • listen_port (int, optional) – Description
  • buffer_length (int, optional) – Description
  • buffer_time (float, optional) – Description
  • publishers (list, optional) – Description
  • packet_debug_stream_file (str, optional) – Description
assemble_readouts(matched)[source]

Summary

Parameters:matched (TYPE) – Description
Returns:Description
Return type:TYPE
class ssdaq.receivers.readout_assembler.SlowSignalDataProtocol(loop, log, relaxed_ip_range, mon, packet_debug_stream_file=None)[source]
__init__(loop, log, relaxed_ip_range, mon, packet_debug_stream_file=None)[source]

Initialize self. See help(type(self)) for accurate signature.

connection_made(transport)[source]

Called when a connection is made.

The argument is the transport representing the pipe connection. To receive data, wait for data_received() calls. When the connection is closed, connection_lost() is called.