Source code for ssdaq.core.basesubscribers

""" This module defines the BaseSubscribers for receiving data from Recievers
"""
from threading import Thread
import zmq
import zmq.asyncio
from queue import Queue
import logging
from ssdaq import sslogger
import asyncio
from .io import BaseFileWriter


[docs]class BasicSubscriber(Thread): """The base class to subscribe to a published data stream from a reciver. Data are retrived by the `get_data()` method once the listener has been started by the `start()` method. Attributes: log (logging.logger): logging instance """ _id_counter = 0
[docs] def __init__( self, ip: str, port: int, unpack=None, logger: logging.Logger = None, zmqcontext=None, ): """ The init of a BasicSubscriber Args: ip (str): The ip address where the datas are published (can be local or remote) port (int): The port number at which the datas are published unpack (None, optional): A callable which takes the received data packet as input and returns an unpacked object. If not given the packed data object is put in the subscribed buffer. logger (logging.Logger, optional): Optionally provide a logger instance zmqcontext (None, optional): Description """ Thread.__init__(self) BasicSubscriber._id_counter += 1 self.log = logger or logging.getLogger( "ssdaq.BasicSubscriber%d" % BasicSubscriber._id_counter ) self._context = zmqcontext or zmq.Context() self._sock = self._context.socket(zmq.SUB) self._sock.setsockopt(zmq.SUBSCRIBE, b"") con_str = "tcp://%s:%s" % (ip, port) if "0.0.0.0" == ip: self._sock.bind(con_str) else: self._sock.connect(con_str) self.log.info("Connected to : %s" % con_str) self._running = False self._data_buffer = Queue() self._id_counter = BasicSubscriber._id_counter self._inproc_sock_name = "SSdataListener%d" % (self._id_counter) self._close_sock = self._context.socket(zmq.PAIR) self._close_sock.bind("inproc://" + self._inproc_sock_name) self._unpack = (lambda x: x) if unpack is None else unpack
[docs] def close(self, hard=True): """Closes subscriber so no more data is put in the buffer args: hard (bool): If set to true the buffer will be emptied and any data still in the buffer will be lost. """ if self._running: self.log.debug("Sending close message to listener thread") self._close_sock.send(b"close") self._running = False if hard: self.log.info("Emptying data buffer") # Empty the buffer after closing the recv thread while not self._data_buffer.empty(): self._data_buffer.get() self._data_buffer.task_done() self._data_buffer.join()
[docs] def get_data(self, **kwargs): """Returns unpacked data from the published data stream. By default a blocking call. See python Queue docs. Args: **kwargs: See queue.Queue docs Returns: bytes: bytes data representing the published object """ data = self._data_buffer.get(**kwargs) self._data_buffer.task_done() return data
[docs] def empty(self): """Returns true if the subscriber buffer is empty Returns: bool: True if empty """ return self._data_buffer.empty()
[docs] def run(self): """This is the main method of the listener - the listening thread. """ self.log.info("Starting listener") recv_close = self._context.socket(zmq.PAIR) con_str = "inproc://" + self._inproc_sock_name recv_close.connect(con_str) self._running = True self.log.debug("Connecting close socket to %s" % con_str) poller = zmq.Poller() poller.register(self._sock, zmq.POLLIN) poller.register(recv_close, zmq.POLLIN) while self._running: socks = dict(poller.poll()) if self._sock in socks: data = self._sock.recv() self._data_buffer.put(self._unpack(data)) else: self.log.info("Stopping") self._data_buffer.put(None) break self._running = False
[docs]class WriterSubscriber(Thread, BaseFileWriter): """ Attributes: log (TYPE): Description stopping (bool): Description """
[docs] def __init__( self, file_prefix: str, ip: str, port: int, subscriber: BasicSubscriber, writer, file_ext: str, name: str, folder: str = "", file_enumerator: str = None, filesize_lim: int = None, ): """Summary Args: file_prefix (str): Description ip (str): Description port (int): Description subscriber (BasicSubscriber): Description writer (TYPE): Description file_ext (str): Description name (str): Description folder (str, optional): Description file_enumerator (str, optional): Description filesize_lim (int, optional): Description """ self.log = sslogger.getChild(name) BaseFileWriter.__init__( self, file_prefix=file_prefix, writer=writer, folder=folder, file_enumerator=file_enumerator, filesize_lim=filesize_lim, file_ext=file_ext, ) Thread.__init__(self) self._subscriber = subscriber( logger=self.log.getChild("Subscriber"), ip=ip, port=port ) self._running = False self.stopping = False
[docs] def close(self, hard: bool = False, non_block: bool = False): """ Stops the writer by closing the subscriber args: hard (bool): If set to true the subscriber buffer will be dropped and the file will be immediately closed. Any data still in the subscriber buffer will be lost. non_block (bool): If set to true will not block """ if hard: self._running = False # set stopping flag to true self.stopping = True # close subscriber self.log.info("Stopping Subscriber thread") self._subscriber.close(hard=hard) if not non_block: self.join() BaseFileWriter.close(self)
[docs] def run(self): self.log.info("Starting writer thread") self._subscriber.start() self._running = True while self._running: data = self._subscriber.get_data() if data == None: if self.stopping and self._subscriber.empty(): break continue self.write(data)
from distutils.version import LooseVersion if LooseVersion("17") > LooseVersion(zmq.__version__): zmq.asyncio.install()
[docs]class AsyncSubscriber: """ The ``AsyncSubscriber`` provides the same basic functionality as the ``BasicSubscriber``, however, it is intended to work in an asynchronous enviroment using ``asyncio``. The ``get_data`` and ``close`` methods are therefore coroutines which should be called in an event loop. An event loop can be passed to the subscriber but if none is passed then it tries to get one itself. Further a ``passoff_callback`` function can be passed that overrides the default behavior of putting the received unpacked data in a queue which accessed with ``get_data``. Attributes: log (logging.logger): logger instance """
[docs] def __init__( self, ip: str, port: int, unpack=None, logger: logging.Logger = None, zmqcontext=None, loop=None, passoff_callback=None, name: str = None, ): """The init of an AsyncSubscriber Args: ip (str): The ip address where the datas are published (can be local or remote) port (int): The port number at which the datas are published unpack (None, optional): A callable which takes the received data packet as input and returns an unpacked object. If not given the packed data object is put in the subscribed buffer. logger (logging.Logger, optional): Optionally provide a logger instance zmqcontext (None, optional): zmq context loop (None, optional): an asyncio event loop passoff_callback (None, optional): An optional callback for overriding the default buffer. Note: if this is used then ``get_data()`` will always be empty. name (str, optional): The name of the subscriber (used in logging) """ logger = logger or sslogger name = name or __class__.__name__ self.log = logger.getChild(name) self._context = zmqcontext or zmq.asyncio.Context() self._sock = self._context.socket(zmq.SUB) self._sock.setsockopt(zmq.SUBSCRIBE, b"") con_str = "tcp://%s:%s" % (ip, port) if "0.0.0.0" == ip: self._sock.bind(con_str) else: self._sock.connect(con_str) self.log.info("Connected to : %s" % con_str) self._running = False self._data_buffer = asyncio.Queue() self._loop = loop or asyncio.get_event_loop() self._running = True self._unpack = (lambda x: x) if unpack is None else unpack self._task = self._loop.create_task(self.receive()) self._passoff_callback = passoff_callback or ( lambda x: self._loop.create_task(self._data_buffer.put(x)) )
[docs] async def receive(self): """**(Coroutine)** The main method of the subscriber where the data is received. This method is placed in the event loop as a task at initialization. """ self.log.info("Start subscription") while self._running: data = None try: data = self._unpack(await self._sock.recv()) except asyncio.CancelledError: self.log.info("Subscription stopped") return except Exception as e: self.log.warning("An error ocurred while unpacking data {}".format(e)) self._passoff_callback(data) self.log.info("Subscription stopped")
[docs] async def get_data(self): """**(Coroutine)** Get data from the subscriber buffer. Returns: data: data object """ data = await self._data_buffer.get() self._data_buffer.task_done() return data
[docs] def empty(self): """ Returns true if the subscriber buffer is empty """ return self._data_buffer.empty()
[docs] async def close(self, hard=True): """**(Coroutine)** Closes subscriber so no more data is put in the buffer args: hard (bool): If set to true the buffer will be emptied and any data still in the buffer will be lost. """ self._running = False if not self._task.cancelled(): self._sock.close() self._task.cancel() await self._task
[docs]class AsyncWriterSubscriber(BaseFileWriter): """ An asynchronous writer subscriber This class subscribes to a zmq stream and writes the received data to a file using an appropriate writer. Attributes: log (logging.logger): logger instance loop (asyncio.eventloop): eventloop running (bool): True if receieving and writing stopping (bool): True after receiveing stop command """
[docs] def __init__( self, file_prefix: str, ip: str, port: int, subscriber: AsyncSubscriber, writer, file_ext: str, name: str, folder: str = "", file_enumerator: str = None, filesize_lim: int = None, loop=None, ): """ Summary Args: file_prefix (str): Description ip (str): Description port (int): Description subscriber (AsyncSubscriber): Description writer (TYPE): Description file_ext (str): Description name (str): Description folder (str, optional): Description file_enumerator (str, optional): Description filesize_lim (int, optional): Description loop (None, optional): Description """ self.log = sslogger.getChild(name) super().__init__( file_prefix=file_prefix, writer=writer, folder=folder, file_enumerator=file_enumerator, filesize_lim=filesize_lim, file_ext=file_ext, ) self._loop = loop or asyncio.get_event_loop() self._subscriber = subscriber( ip=ip, port=port, loop=self._loop, logger=self.log, name="mainsub" ) self._running = False self.stopping = False self._task = self._loop.create_task(self.run())
async def run(self): self.log.info("Starting writer") self._running = True while self._running: if self.stopping and self._subscriber.empty(): break try: data = await self._subscriber.get_data() except asyncio.CancelledError: continue except Exception as e: self.log.error("Exception: {}, occured while retreiving data".format(e)) continue if data == None: continue self.write(data)
[docs] async def close(self, hard: bool = False): """Stops the writer by closing the subscriber. args: hard (bool, optional): If set to true the subscriber buffer will be dropped and the file will be immediately closed. Any data still in the subscriber buffer will be lost. """ if hard: self._running = False self.log.info("Hard stop. Dropping buffers!!") # set stopping flag to true self.stopping = True # close subscriber self.log.info("Stopping Subscriber") await self._subscriber.close(hard=False) if self._subscriber.empty(): if not self._task.cancelled(): self.log.info("Cancelling") self._task.cancel() await self._task # Closing the BaseFilewriter to close the # filehandle and get a nice summary log message super().close()