Receivers¶
Receivers are intended to be the core part of server applications running as daemons in the background to receive data from devices (such as the CHECS-Camera). Optionally these data can be lightly processed and lastly relayed forward usually on a zmq TCP socket that is subscriptable.
ssdaq.core.ReceiverServer¶
Receivers should inherit from the ReceiverServer class in order to be started with
the control-ssdaq application. The class contains functionality to open a TCP or UDP
listening sockets using asyncio. Methods prefixed with ct_ must be coroutines and will
be added once to the event loop when the run() method is called. The ReceiverServer
class also implements a command receiver server. Callbacks methods for the commands are prefixed
with cmd_.
-
class
ssdaq.core.ReceiverServer(ip: str, port: int, publishers: list, name: str, loop=None)[source]¶ Base class for receivers.
Implements some of the boilerplate code to setup a server that listens to either a TCP or UDP socket, publishes processed data and can receive commands via ipc. Asynchrosity is implemented using asyncio. Executing the run() method will, thus, start the event loop.
Parameters: - ip (str) – The ip address/interface to listen to
- port (int) – The port to listen to
- publishers (list) – A list of publisher instances that are cycled through when self.publish(data) is called
- name (str) – A name for the instance (usefull for logging)
- Kwargs:
- loop (asyncio.loop):If not given an an event loop an asyncio loop will
- retreived.
-
__init__(ip: str, port: int, publishers: list, name: str, loop=None)[source]¶ The init of a ReceiverServer
-
handle_commands()[source]¶ This is the server part of the receiver server that handles incomming control commands
-
publish(packet: bytes)[source]¶ Publishes the packed processed data on the publishers in the publisher list of the Receiver.
Should be called by the inheriting classes to publish data
Parameters: packet (bytes) – The data packet that should be published
Example of a receiver listening to a UDP socket¶
The following code example shows the typical way to set up a receiver in SSDAQ that listens to a UDP socket and later publishes the received packets:
import asyncio
from ssdaq.core.receiver_server import ReceiverServer
from ssdaq.receivers.mon_sender import ReceiverMonSender
class PacketProtocol(asyncio.Protocol):
def __init__(self, log):
self.buffer = asyncio.Queue()
self.log = log.getChild("PacketProtocol")
def connection_made(self, transport):
self.log.info("Connected to port")
self.transport = transport
def datagram_received(self, data, addr):
self.buffer.put_nowait((data, addr))
class PacketReceiver(ReceiverServer):
def __init__(self, ip: str, port: int, publishers: list):
super().__init__(ip, port, publishers, "PacketReceiver")
self.trans, self.tpp = self.setup_udp(
lambda: PacketProtocol(self.log)
)
self.running = True
# To send basic monitoring data a ReceiverMonSender instance is needed
self.mon = ReceiverMonSender("PacketReceiver", self.loop, self._context)
# This method will be automatically added to as a task to the
# asyncio eventloop
async def ct_relay(self):
while self.running:
packet = await self.tpp.buffer.get()
# Need to register each time we get a packet
# The ReceiverMonSender automatically calculates the
# rate and pushes the monitoring data to a monitor receiver
self.mon.register_data_packet()
await self.publish(packet[0])
Furthermore, nothing prevents from calling self.setup_udp several times in the __init__()
and have a ct_relay method for each connection to listen and handle data from several ports at
the same time.
Using zmq sockets¶
It is also possible to use a zmq socket instead of the asyncio TCP and UDP sockets.
The code actually gets even shorter:
import asyncio
import zmq
from ssdaq.core.receiver_server import ReceiverServer
from ssdaq.recievers.mon_sender import ReceiverMonSender
class ZMQReceiver(ReceiverServer):
def __init__(self, ip: str, port: int, publishers: list):
super().__init__(ip, port, publishers, "ZMQReceiver")
self.running = True
# The ReceiverServer already has a zmq context
self.receiver = self._context.socket(zmq.PULL)
connectionstr = "tcp://{}:{}".format(ip, port)
self.log.info("Setting up monitor zmq pull server at {}".format(connectionstr))
self.receiver.bind(connectionstr)
# NOTE: this is needed when not setting socket via the ReceiverServer methods
self._setup = True
self.mon = ReceiverMonSender("ZMQReceiver", self.loop, self._context)
async def ct_pull(self):
while self.running:
packet = await self.receiver.recv()
self.mon.register_data_packet()
await self.publish(packet)
Note the line self._setup = True in the __init__ which is needed to overide a check that there is at least one listening socket added.
ssdaq.core.publishers¶
-
class
ssdaq.core.publishers.ZMQTCPPublisher(ip: str, port: int, name: str = 'ZMQTCPPublisher', logger=None, mode: str = 'local')[source]¶ A generic zmq tcp publisher
Publishes on a TCP/IP socket using the zmq PUB-SUB protocol.
-
__init__(ip: str, port: int, name: str = 'ZMQTCPPublisher', logger=None, mode: str = 'local')[source]¶ A generic zmq tcp publisher
Parameters: - ip (str) – the name (ip) interface which the readouts are published on
- port (int) – the port number of the interface which the readouts are published on
- name (str, optional) – The name of this instance (usefull for logging)
- logger (None, optional) – An instance of a logger to inherit from
- mode (str, optional) – The mode of publishing. Possible modes (‘local’,’outbound’, ‘remote’)
The three different modes defines how the socket is setup for different use cases.
- ‘local’ this is the normal use case where readouts are published on localhost
- and ip should be ‘127.x.x.x’
- ‘outbound’ is when the readouts are published on an outbound network interface of
- the machine so that remote clients can connect to the machine to receive the readouts. In this case ip is the ip address of the interface on which the readouts should be published
- ‘remote’ specifies that the given ip is of a remote machine to which the readouts should be sent to.
-
Listing derived Receivers¶
-
class
ssdaq.receivers.log_receiver.LogReceiver(ip: str, port: int, publishers: list, name: str = 'LogReceiver')[source]¶
-
class
ssdaq.receivers.log_receiver.LogReceiverProtocol(server, loop, log)[source]¶ -
-
connection_lost(exc)[source]¶ Called when the connection is lost or closed.
The argument is an exception object or None (the latter meaning a regular EOF is received or the connection was aborted or closed).
-
-
class
ssdaq.receivers.monitor_receiver.MonitorReceiver(ip: str, port: int, publishers: list)[source]¶
-
class
ssdaq.receivers.teldata_receiver.TelDataReceiver(ip: str, port: int, publishers: list, name: str = 'TelDataReceiver')[source]¶
-
class
ssdaq.receivers.timestamp_receiver.TimestampReceiver(ip: str, port: int, publishers: list)[source]¶
-
class
ssdaq.receivers.trigger_receiver.TriggerPacketReceiver(ip: str, port: int, publishers: list)[source]¶
-
class
ssdaq.receivers.readout_assembler.ReadoutAssembler(relaxed_ip_range: bool = False, readout_tw: float = 100000.0, listen_ip: str = '0.0.0.0', listen_port: int = 2009, buffer_length: int = 1000, buffer_time: float = 10000000000.0, publishers: list = None, packet_debug_stream_file: str = None)[source]¶ Slow signal readout assembler. Constructs slow signal readouts from data packets recieved from Target Modules.
-
__init__(relaxed_ip_range: bool = False, readout_tw: float = 100000.0, listen_ip: str = '0.0.0.0', listen_port: int = 2009, buffer_length: int = 1000, buffer_time: float = 10000000000.0, publishers: list = None, packet_debug_stream_file: str = None)[source]¶ Summary
Parameters: - relaxed_ip_range (bool, optional) – Description
- readout_tw (float, optional) – Description
- listen_ip (str, optional) – Description
- listen_port (int, optional) – Description
- buffer_length (int, optional) – Description
- buffer_time (float, optional) – Description
- publishers (list, optional) – Description
- packet_debug_stream_file (str, optional) – Description
-