Subscribers

The subscriber classes are intended as a common interface to the published data on zmq sockets from the ReceiverServers. These subscribers come in two flavors: the ssdaq.core.BasicSubscriber, which under the hood uses a separate thread to listen to and buffer the published data stream. This buffer can be later accessed via a blocking call to ssdaq.core.BasicSubscriber.get_data(), which makes the class easy to use, however combining multiple subscribers asynchronously in one process is quite cumbersome. The ssdaq.core.AsyncSubscriber instead designed to be used in an asynchronous context by running it in an asyncio eventloop. The ssdaq.core.AsyncSubscriber.get_data() is therefore a coroutine. Having several ssdaq.core.AsyncSubscriber subscribers running asynchronously in an application is simply a matter of adding them to the to the eventloop of that application.

Base Subscribers

ssdaq.core.BasicSubscriber

class ssdaq.core.BasicSubscriber(ip: str, port: int, unpack=None, logger: logging.Logger = None, zmqcontext=None)[source]
The base class to subscribe to a published data stream from a reciver.
Data are retrived by the get_data() method once the listener has been started by the start() method.
Variables:log (logging.logger) – logging instance
__init__(ip: str, port: int, unpack=None, logger: logging.Logger = None, zmqcontext=None)[source]

The init of a BasicSubscriber

Parameters:
  • ip (str) – The ip address where the datas are published (can be local or remote)
  • port (int) – The port number at which the datas are published
  • unpack (None, optional) – A callable which takes the received data packet as input and returns an unpacked object. If not given the packed data object is put in the subscribed buffer.
  • logger (logging.Logger, optional) – Optionally provide a logger instance
  • zmqcontext (None, optional) – Description
close(hard=True)[source]

Closes subscriber so no more data is put in the buffer

Parameters:hard (bool) – If set to true the buffer will be emptied and any data still in the buffer will be lost.
empty()[source]

Returns true if the subscriber buffer is empty

Returns:True if empty
Return type:bool
get_data(**kwargs)[source]

Returns unpacked data from the published data stream. By default a blocking call. See python Queue docs.

Parameters:**kwargs – See queue.Queue docs
Returns:bytes data representing the published object
Return type:bytes
run()[source]

This is the main method of the listener - the listening thread.

Examples

A simple raw listener:

from ssdaq.core import BasicSubscriber
sub = BasicSubscriber(port = 5555, ip ='127.0.0.1')
sub.start() #Starts listener thread
while(True):
    try:
        # retrieve data buffer (blocking call
        # if block or timeout not specified see queue.Queue docs)
        data = sub.get_data()
        #if the data is of type 'None' the listener thread has been closed.
    if(readout == None):
        break
    except :
        print("\nClosing listener")
        sub.close()
        break

Deriving a subscriber for a specific data type:

from ssdaq.core import BasicSubscriber
from mymodule import MyData
class MyDataSubscriber(BasicSubscriber):
def __init__(self, ip: str, port: int, logger: logging.Logger = None):
    super().__init__(ip=ip, port=port, logger=logger, unpack=MyData.unpack)

ssdaq.core.AsyncSubscriber

class ssdaq.core.AsyncSubscriber(ip: str, port: int, unpack=None, logger: logging.Logger = None, zmqcontext=None, loop=None, passoff_callback=None, name: str = None)[source]

The AsyncSubscriber provides the same basic functionality as the BasicSubscriber, however, it is intended to work in an asynchronous enviroment using asyncio.

The get_data and close methods are therefore coroutines which should be called in an event loop. An event loop can be passed to the subscriber but if none is passed then it tries to get one itself. Further a passoff_callback function can be passed that overrides the default behavior of putting the received unpacked data in a queue which accessed with get_data.

Variables:log (logging.logger) – logger instance
__init__(ip: str, port: int, unpack=None, logger: logging.Logger = None, zmqcontext=None, loop=None, passoff_callback=None, name: str = None)[source]

The init of an AsyncSubscriber

Parameters:
  • ip (str) – The ip address where the datas are published (can be local or remote)
  • port (int) – The port number at which the datas are published
  • unpack (None, optional) – A callable which takes the received data packet as input and returns an unpacked object. If not given the packed data object is put in the subscribed buffer.
  • logger (logging.Logger, optional) – Optionally provide a logger instance
  • zmqcontext (None, optional) – zmq context
  • loop (None, optional) – an asyncio event loop
  • passoff_callback (None, optional) – An optional callback for overriding the default buffer. Note: if this is used then get_data() will always be empty.
  • name (str, optional) – The name of the subscriber (used in logging)
close(hard=True)[source]

(Coroutine) Closes subscriber so no more data is put in the buffer

Parameters:hard (bool) – If set to true the buffer will be emptied and any data still in the buffer will be lost.
empty()[source]

Returns true if the subscriber buffer is empty

get_data()[source]

(Coroutine) Get data from the subscriber buffer.

Returns:data object
Return type:data
receive()[source]

(Coroutine) The main method of the subscriber where the data is received. This method is placed in the event loop as a task at initialization.

Examples

A simple raw listener:

import asyncio
from ssdaq.core import AsyncSubscriber
loop = asyncio.get_event_loop()
sub = AsyncSubscriber(ip="127.0.0.1",port =5555)

async def print_data(loop, sub):
    running = True
    while running:
        try:
            data = await sub.get_data()
            print(data)
            if data is None:
                running = False
                close = loop.create_task(sub.close())
                close.add_done_callback(lambda x: loop.stop())
        except asyncio.CancelledError:
            print("Exit")
            return


task = loop.create_task(print_data(loop, sub))
try:
    loop.run_forever()
except KeyboardInterrupt:
    print()
    if not task.cancelled():
        task.cancel()

    loop.run_until_complete(sub.close(True))

WriterSubscribers

class ssdaq.core.AsyncWriterSubscriber(file_prefix: str, ip: str, port: int, subscriber: ssdaq.core.basesubscribers.AsyncSubscriber, writer, file_ext: str, name: str, folder: str = '', file_enumerator: str = None, filesize_lim: int = None, loop=None)[source]

An asynchronous writer subscriber

This class subscribes to a zmq stream and writes the received data to a file using an appropriate writer.

Variables:
  • log (logging.logger) – logger instance
  • loop (asyncio.eventloop) – eventloop
  • running (bool) – True if receieving and writing
  • stopping (bool) – True after receiveing stop command
__init__(file_prefix: str, ip: str, port: int, subscriber: ssdaq.core.basesubscribers.AsyncSubscriber, writer, file_ext: str, name: str, folder: str = '', file_enumerator: str = None, filesize_lim: int = None, loop=None)[source]

Summary

Parameters:
  • file_prefix (str) – Description
  • ip (str) – Description
  • port (int) – Description
  • subscriber (AsyncSubscriber) – Description
  • writer (TYPE) – Description
  • file_ext (str) – Description
  • name (str) – Description
  • folder (str, optional) – Description
  • file_enumerator (str, optional) – Description
  • filesize_lim (int, optional) – Description
  • loop (None, optional) – Description
close(hard: bool = False)[source]

Stops the writer by closing the subscriber.

Parameters:hard (bool, optional) – If set to true the subscriber buffer will be dropped and the file will be immediately closed. Any data still in the subscriber buffer will be lost.
class ssdaq.core.WriterSubscriber(file_prefix: str, ip: str, port: int, subscriber: ssdaq.core.basesubscribers.BasicSubscriber, writer, file_ext: str, name: str, folder: str = '', file_enumerator: str = None, filesize_lim: int = None)[source]
Variables:
  • log (TYPE) – Description
  • stopping (bool) – Description
__init__(file_prefix: str, ip: str, port: int, subscriber: ssdaq.core.basesubscribers.BasicSubscriber, writer, file_ext: str, name: str, folder: str = '', file_enumerator: str = None, filesize_lim: int = None)[source]

Summary

Parameters:
  • file_prefix (str) – Description
  • ip (str) – Description
  • port (int) – Description
  • subscriber (BasicSubscriber) – Description
  • writer (TYPE) – Description
  • file_ext (str) – Description
  • name (str) – Description
  • folder (str, optional) – Description
  • file_enumerator (str, optional) – Description
  • filesize_lim (int, optional) – Description
close(hard: bool = False, non_block: bool = False)[source]

Stops the writer by closing the subscriber

Parameters:
  • hard (bool) – If set to true the subscriber buffer will be dropped and the file will be immediately closed. Any data still in the subscriber buffer will be lost.
  • non_block (bool) – If set to true will not block
run()[source]

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

Derived Subscribers

Inheritance diagram of ssdaq.subscribers
digraph {
   AsyncSubscriber [shape=box];
   BaseFileWriter [shape=box];
   RawObjectWriterBase [shape=box];
   AsyncWriterSubscriber [shape=box];
   "Async<Type>Subscriber" [shape=box];
   "<Type>Writer" [shape=box];
   "Async<Type>WriterSubscriber" [shape=box];
   AsyncSubscriber ->"Async<Type>Subscriber";
   BaseFileWriter -> AsyncWriterSubscriber ;
   AsyncSubscriber -> AsyncWriterSubscriber [style=dotted] ;
   RawObjectWriterBase ->AsyncWriterSubscriber [style=dotted] ;
   RawObjectWriterBase -> "<Type>Writer" ;
   AsyncWriterSubscriber -> "Async<Type>WriterSubscriber" ;
   "Async<Type>Subscriber"-> "Async<Type>WriterSubscriber" [style=dotted] ;
   "<Type>Writer"-> "Async<Type>WriterSubscriber" [style=dotted] ;
}

Listing derived subscribers

class ssdaq.subscribers.ASlowSignalWriter(file_prefix: str, ip: str, port: int, folder: str = '', file_enumerator: str = None, filesize_lim: int = None, loop=None, name='ASlowSignalWriter')[source]
__init__(file_prefix: str, ip: str, port: int, folder: str = '', file_enumerator: str = None, filesize_lim: int = None, loop=None, name='ASlowSignalWriter')[source]

Summary

Parameters:
  • file_prefix (str) – Description
  • ip (str) – Description
  • port (int) – Description
  • subscriber (AsyncSubscriber) – Description
  • writer (TYPE) – Description
  • file_ext (str) – Description
  • name (str) – Description
  • folder (str, optional) – Description
  • file_enumerator (str, optional) – Description
  • filesize_lim (int, optional) – Description
  • loop (None, optional) – Description
close(hard: bool = False)[source]

Stops the writer by closing the subscriber.

Parameters:hard (bool) – If set to true the subscriber buffer will be dropped and the file will be immediately closed. Any data still in the subscriber buffer will be lost.
class ssdaq.subscribers.ATimestampWriter(file_prefix: str, ip: str, port: int, folder: str = '', file_enumerator: str = None, filesize_lim: int = None, loop=None, name='ATimestampWriter')[source]
__init__(file_prefix: str, ip: str, port: int, folder: str = '', file_enumerator: str = None, filesize_lim: int = None, loop=None, name='ATimestampWriter')[source]

Summary

Parameters:
  • file_prefix (str) – Description
  • ip (str) – Description
  • port (int) – Description
  • subscriber (AsyncSubscriber) – Description
  • writer (TYPE) – Description
  • file_ext (str) – Description
  • name (str) – Description
  • folder (str, optional) – Description
  • file_enumerator (str, optional) – Description
  • filesize_lim (int, optional) – Description
  • loop (None, optional) – Description
class ssdaq.subscribers.ATriggerWriter(file_prefix: str, ip: str, port: int, folder: str = '', file_enumerator: str = None, filesize_lim: int = None, loop=None, name='ATriggerWriter')[source]
__init__(file_prefix: str, ip: str, port: int, folder: str = '', file_enumerator: str = None, filesize_lim: int = None, loop=None, name='ATriggerWriter')[source]

Summary

Parameters:
  • file_prefix (str) – Description
  • ip (str) – Description
  • port (int) – Description
  • subscriber (AsyncSubscriber) – Description
  • writer (TYPE) – Description
  • file_ext (str) – Description
  • name (str) – Description
  • folder (str, optional) – Description
  • file_enumerator (str, optional) – Description
  • filesize_lim (int, optional) – Description
  • loop (None, optional) – Description
class ssdaq.subscribers.AsyncLogProtoSubscriber(ip: str, port: int, logger: logging.Logger = None, loop=None, name=None)[source]
__init__(ip: str, port: int, logger: logging.Logger = None, loop=None, name=None)[source]

The init of an AsyncSubscriber

Parameters:
  • ip (str) – The ip address where the datas are published (can be local or remote)
  • port (int) – The port number at which the datas are published
  • unpack (None, optional) – A callable which takes the received data packet as input and returns an unpacked object. If not given the packed data object is put in the subscribed buffer.
  • logger (logging.Logger, optional) – Optionally provide a logger instance
  • zmqcontext (None, optional) – zmq context
  • loop (None, optional) – an asyncio event loop
  • passoff_callback (None, optional) – An optional callback for overriding the default buffer. Note: if this is used then get_data() will always be empty.
  • name (str, optional) – The name of the subscriber (used in logging)
class ssdaq.subscribers.AsyncLogSubscriber(ip: str, port: int, logger: logging.Logger = None, loop=None, name=None)[source]
__init__(ip: str, port: int, logger: logging.Logger = None, loop=None, name=None)[source]

The init of an AsyncSubscriber

Parameters:
  • ip (str) – The ip address where the datas are published (can be local or remote)
  • port (int) – The port number at which the datas are published
  • unpack (None, optional) – A callable which takes the received data packet as input and returns an unpacked object. If not given the packed data object is put in the subscribed buffer.
  • logger (logging.Logger, optional) – Optionally provide a logger instance
  • zmqcontext (None, optional) – zmq context
  • loop (None, optional) – an asyncio event loop
  • passoff_callback (None, optional) – An optional callback for overriding the default buffer. Note: if this is used then get_data() will always be empty.
  • name (str, optional) – The name of the subscriber (used in logging)
class ssdaq.subscribers.AsyncMonSubscriber(ip: str, port: int, logger: logging.Logger = None, loop=None, name=None)[source]
__init__(ip: str, port: int, logger: logging.Logger = None, loop=None, name=None)[source]

The init of an AsyncSubscriber

Parameters:
  • ip (str) – The ip address where the datas are published (can be local or remote)
  • port (int) – The port number at which the datas are published
  • unpack (None, optional) – A callable which takes the received data packet as input and returns an unpacked object. If not given the packed data object is put in the subscribed buffer.
  • logger (logging.Logger, optional) – Optionally provide a logger instance
  • zmqcontext (None, optional) – zmq context
  • loop (None, optional) – an asyncio event loop
  • passoff_callback (None, optional) – An optional callback for overriding the default buffer. Note: if this is used then get_data() will always be empty.
  • name (str, optional) – The name of the subscriber (used in logging)
class ssdaq.subscribers.AsyncSSReadoutSubscriber(ip: str, port: int, logger: logging.Logger = None, loop=None, name=None)[source]
__init__(ip: str, port: int, logger: logging.Logger = None, loop=None, name=None)[source]

The init of an AsyncSubscriber

Parameters:
  • ip (str) – The ip address where the datas are published (can be local or remote)
  • port (int) – The port number at which the datas are published
  • unpack (None, optional) – A callable which takes the received data packet as input and returns an unpacked object. If not given the packed data object is put in the subscribed buffer.
  • logger (logging.Logger, optional) – Optionally provide a logger instance
  • zmqcontext (None, optional) – zmq context
  • loop (None, optional) – an asyncio event loop
  • passoff_callback (None, optional) – An optional callback for overriding the default buffer. Note: if this is used then get_data() will always be empty.
  • name (str, optional) – The name of the subscriber (used in logging)
class ssdaq.subscribers.AsyncTimestampSubscriber(ip: str, port: int, logger: logging.Logger = None, loop=None, name=None)[source]
__init__(ip: str, port: int, logger: logging.Logger = None, loop=None, name=None)[source]

The init of an AsyncSubscriber

Parameters:
  • ip (str) – The ip address where the datas are published (can be local or remote)
  • port (int) – The port number at which the datas are published
  • unpack (None, optional) – A callable which takes the received data packet as input and returns an unpacked object. If not given the packed data object is put in the subscribed buffer.
  • logger (logging.Logger, optional) – Optionally provide a logger instance
  • zmqcontext (None, optional) – zmq context
  • loop (None, optional) – an asyncio event loop
  • passoff_callback (None, optional) – An optional callback for overriding the default buffer. Note: if this is used then get_data() will always be empty.
  • name (str, optional) – The name of the subscriber (used in logging)
class ssdaq.subscribers.AsyncTriggerSubscriber(ip: str, port: int, logger: logging.Logger = None, loop=None, name=None)[source]
__init__(ip: str, port: int, logger: logging.Logger = None, loop=None, name=None)[source]

The init of an AsyncSubscriber

Parameters:
  • ip (str) – The ip address where the datas are published (can be local or remote)
  • port (int) – The port number at which the datas are published
  • unpack (None, optional) – A callable which takes the received data packet as input and returns an unpacked object. If not given the packed data object is put in the subscribed buffer.
  • logger (logging.Logger, optional) – Optionally provide a logger instance
  • zmqcontext (None, optional) – zmq context
  • loop (None, optional) – an asyncio event loop
  • passoff_callback (None, optional) – An optional callback for overriding the default buffer. Note: if this is used then get_data() will always be empty.
  • name (str, optional) – The name of the subscriber (used in logging)
class ssdaq.subscribers.BasicLogSubscriber(ip: str, port: int, logger: logging.Logger = None)[source]
__init__(ip: str, port: int, logger: logging.Logger = None)[source]

The init of a BasicSubscriber

Parameters:
  • ip (str) – The ip address where the datas are published (can be local or remote)
  • port (int) – The port number at which the datas are published
  • unpack (None, optional) – A callable which takes the received data packet as input and returns an unpacked object. If not given the packed data object is put in the subscribed buffer.
  • logger (logging.Logger, optional) – Optionally provide a logger instance
  • zmqcontext (None, optional) – Description
class ssdaq.subscribers.BasicMonSubscriber(ip: str, port: int, logger: logging.Logger = None)[source]
__init__(ip: str, port: int, logger: logging.Logger = None)[source]

The init of a BasicSubscriber

Parameters:
  • ip (str) – The ip address where the datas are published (can be local or remote)
  • port (int) – The port number at which the datas are published
  • unpack (None, optional) – A callable which takes the received data packet as input and returns an unpacked object. If not given the packed data object is put in the subscribed buffer.
  • logger (logging.Logger, optional) – Optionally provide a logger instance
  • zmqcontext (None, optional) – Description
class ssdaq.subscribers.BasicTimestampSubscriber(ip: str, port: int, logger: logging.Logger = None)[source]
__init__(ip: str, port: int, logger: logging.Logger = None)[source]

The init of a BasicSubscriber

Parameters:
  • ip (str) – The ip address where the datas are published (can be local or remote)
  • port (int) – The port number at which the datas are published
  • unpack (None, optional) – A callable which takes the received data packet as input and returns an unpacked object. If not given the packed data object is put in the subscribed buffer.
  • logger (logging.Logger, optional) – Optionally provide a logger instance
  • zmqcontext (None, optional) – Description
class ssdaq.subscribers.BasicTriggerSubscriber(ip: str, port: int, logger: logging.Logger = None)[source]
__init__(ip: str, port: int, logger: logging.Logger = None)[source]

Init of a BasicTriggerSubscriber

Parameters:
  • ip (str) – The ip/interface where the data is published
  • port (int) – The port on which the data is published
  • logger (logging.Logger, optional) – A logger instance
class ssdaq.subscribers.LogProtoSubscriber(ip: str, port: int, logger: logging.Logger = None)[source]
__init__(ip: str, port: int, logger: logging.Logger = None)[source]

The init of a BasicSubscriber

Parameters:
  • ip (str) – The ip address where the datas are published (can be local or remote)
  • port (int) – The port number at which the datas are published
  • unpack (None, optional) – A callable which takes the received data packet as input and returns an unpacked object. If not given the packed data object is put in the subscribed buffer.
  • logger (logging.Logger, optional) – Optionally provide a logger instance
  • zmqcontext (None, optional) – Description
class ssdaq.subscribers.LogWriter(file_prefix: str, ip: str, port: int, folder: str = '', file_enumerator: str = None, filesize_lim: int = None)[source]
__init__(file_prefix: str, ip: str, port: int, folder: str = '', file_enumerator: str = None, filesize_lim: int = None)[source]

Summary

Parameters:
  • file_prefix (str) – Description
  • ip (str) – Description
  • port (int) – Description
  • subscriber (BasicSubscriber) – Description
  • writer (TYPE) – Description
  • file_ext (str) – Description
  • name (str) – Description
  • folder (str, optional) – Description
  • file_enumerator (str, optional) – Description
  • filesize_lim (int, optional) – Description
class ssdaq.subscribers.SSFileWriter(file_prefix: str, ip: str, port: int, folder: str = '', file_enumerator: str = None, filesize_lim: int = None)[source]
__init__(file_prefix: str, ip: str, port: int, folder: str = '', file_enumerator: str = None, filesize_lim: int = None)[source]

Summary

Parameters:
  • file_prefix (str) – Description
  • ip (str) – Description
  • port (int) – Description
  • subscriber (BasicSubscriber) – Description
  • writer (TYPE) – Description
  • file_ext (str) – Description
  • name (str) – Description
  • folder (str, optional) – Description
  • file_enumerator (str, optional) – Description
  • filesize_lim (int, optional) – Description
data_cond(data)[source]

A callback method which starts a new file when True is returned

Parameters:() (data) – received data
Returns:if true a new file will be started
Return type:bool
class ssdaq.subscribers.SSReadoutSubscriber(ip: str, port: int, logger: logging.Logger = None)[source]

A slow signal subscriber

__init__(ip: str, port: int, logger: logging.Logger = None)[source]

Init of a SSReadoutSubscriber

Parameters:
  • ip (str) – The ip/interface where the data is published
  • port (int) – The port on which the data is published
  • logger (logging.Logger, optional) – A logger instance
class ssdaq.subscribers.TimestampWriter(file_prefix: str, ip: str, port: int, folder: str = '', file_enumerator: str = None, filesize_lim: int = None)[source]
__init__(file_prefix: str, ip: str, port: int, folder: str = '', file_enumerator: str = None, filesize_lim: int = None)[source]

Summary

Parameters:
  • file_prefix (str) – Description
  • ip (str) – Description
  • port (int) – Description
  • subscriber (BasicSubscriber) – Description
  • writer (TYPE) – Description
  • file_ext (str) – Description
  • name (str) – Description
  • folder (str, optional) – Description
  • file_enumerator (str, optional) – Description
  • filesize_lim (int, optional) – Description
class ssdaq.subscribers.TriggerWriter(file_prefix: str, ip: str, port: int, folder: str = '', file_enumerator: str = None, filesize_lim: int = None)[source]
__init__(file_prefix: str, ip: str, port: int, folder: str = '', file_enumerator: str = None, filesize_lim: int = None)[source]

Summary

Parameters:
  • file_prefix (str) – Description
  • ip (str) – Description
  • port (int) – Description
  • subscriber (BasicSubscriber) – Description
  • writer (TYPE) – Description
  • file_ext (str) – Description
  • name (str) – Description
  • folder (str, optional) – Description
  • file_enumerator (str, optional) – Description
  • filesize_lim (int, optional) – Description