Source code for ssdaq.core.publishers

class Publisher:
    def __init__(self, name: str):
        self.name = name
        self.loop = None

    def publish(self, packet):
        raise NotImplementedError

    async def apublish(self, packet):
        raise NotImplementedError

    def set_loop(self, loop):
        self.loop = loop


class RawWriter(Publisher):
    def __init__(self, file_name, name="RawWriter"):
        super().__init__(name)
        self.file_name = file_name
        self.file = open(self.file_name, "wb")

    async def apublish(self, packet: bytes):
        self.file.write(packet)

    def publish(self, packet: bytes):
        self.file.write(packet)


[docs]class ZMQTCPPublisher(Publisher): """ A generic zmq tcp publisher Publishes on a TCP/IP socket using the zmq PUB-SUB protocol. """
[docs] def __init__( self, ip: str, port: int, name: str = "ZMQTCPPublisher", logger=None, mode: str = "local", ): """A generic zmq tcp publisher Args: ip (str): the name (ip) interface which the readouts are published on port (int): the port number of the interface which the readouts are published on name (str, optional): The name of this instance (usefull for logging) logger (None, optional): An instance of a logger to inherit from mode (str, optional): The mode of publishing. Possible modes ('local','outbound', 'remote') The three different modes defines how the socket is setup for different use cases. * 'local' this is the normal use case where readouts are published on localhost and ip should be '127.x.x.x' * 'outbound' is when the readouts are published on an outbound network interface of the machine so that remote clients can connect to the machine to receive the readouts. In this case ip is the ip address of the interface on which the readouts should be published * 'remote' specifies that the given ip is of a remote machine to which the readouts should be sent to. """ super().__init__(name) import zmq import logging self.context = zmq.Context() self.sock = self.context.socket(zmq.PUB) con_str = "tcp://%s:" % ip + str(port) if mode == "local" or mode == "outbound": self.sock.bind(con_str) if mode == "outbound" or mode == "remote": self.sock.connect(con_str) if logger is None: self.log = logging.getLogger("ssdaq.%s" % name) else: self.log = logger.getChild(name) self.log.info( "Initialized a(n) %s TCP publisher socket on port: %s" % (mode, con_str) )
async def apublish(self, packet: bytes): self.sock.send(packet) def publish(self, packet: bytes): self.sock.send(packet)