Subscribers¶
The subscriber classes are intended as a common interface to the published
data on zmq sockets from the ReceiverServers. These subscribers come in two flavors:
the ssdaq.core.BasicSubscriber, which under the hood uses a separate thread
to listen to and buffer the published data stream. This buffer can be later accessed via
a blocking call to ssdaq.core.BasicSubscriber.get_data(), which makes the class
easy to use, however combining multiple subscribers asynchronously in one process is
quite cumbersome. The ssdaq.core.AsyncSubscriber instead designed to be used in
an asynchronous context by running it in an asyncio eventloop. The
ssdaq.core.AsyncSubscriber.get_data() is therefore a coroutine. Having several
ssdaq.core.AsyncSubscriber subscribers running asynchronously in an application
is simply a matter of adding them to the to the eventloop of that application.
Base Subscribers¶
ssdaq.core.BasicSubscriber¶
-
class
ssdaq.core.BasicSubscriber(ip: str, port: int, unpack=None, logger: logging.Logger = None, zmqcontext=None)[source]¶ - The base class to subscribe to a published data stream from a reciver.
- Data are retrived by the get_data() method once the listener has been started by the start() method.
Variables: log (logging.logger) – logging instance -
__init__(ip: str, port: int, unpack=None, logger: logging.Logger = None, zmqcontext=None)[source]¶ The init of a BasicSubscriber
Parameters: - ip (str) – The ip address where the datas are published (can be local or remote)
- port (int) – The port number at which the datas are published
- unpack (None, optional) – A callable which takes the received data packet as input and returns an unpacked object. If not given the packed data object is put in the subscribed buffer.
- logger (logging.Logger, optional) – Optionally provide a logger instance
- zmqcontext (None, optional) – Description
-
close(hard=True)[source]¶ Closes subscriber so no more data is put in the buffer
Parameters: hard (bool) – If set to true the buffer will be emptied and any data still in the buffer will be lost.
-
empty()[source]¶ Returns true if the subscriber buffer is empty
Returns: True if empty Return type: bool
Examples¶
A simple raw listener:
from ssdaq.core import BasicSubscriber
sub = BasicSubscriber(port = 5555, ip ='127.0.0.1')
sub.start() #Starts listener thread
while(True):
try:
# retrieve data buffer (blocking call
# if block or timeout not specified see queue.Queue docs)
data = sub.get_data()
#if the data is of type 'None' the listener thread has been closed.
if(readout == None):
break
except :
print("\nClosing listener")
sub.close()
break
Deriving a subscriber for a specific data type:
from ssdaq.core import BasicSubscriber
from mymodule import MyData
class MyDataSubscriber(BasicSubscriber):
def __init__(self, ip: str, port: int, logger: logging.Logger = None):
super().__init__(ip=ip, port=port, logger=logger, unpack=MyData.unpack)
ssdaq.core.AsyncSubscriber¶
-
class
ssdaq.core.AsyncSubscriber(ip: str, port: int, unpack=None, logger: logging.Logger = None, zmqcontext=None, loop=None, passoff_callback=None, name: str = None)[source]¶ The
AsyncSubscriberprovides the same basic functionality as theBasicSubscriber, however, it is intended to work in an asynchronous enviroment usingasyncio.The
get_dataandclosemethods are therefore coroutines which should be called in an event loop. An event loop can be passed to the subscriber but if none is passed then it tries to get one itself. Further apassoff_callbackfunction can be passed that overrides the default behavior of putting the received unpacked data in a queue which accessed withget_data.Variables: log (logging.logger) – logger instance -
__init__(ip: str, port: int, unpack=None, logger: logging.Logger = None, zmqcontext=None, loop=None, passoff_callback=None, name: str = None)[source]¶ The init of an AsyncSubscriber
Parameters: - ip (str) – The ip address where the datas are published (can be local or remote)
- port (int) – The port number at which the datas are published
- unpack (None, optional) – A callable which takes the received data packet as input and returns an unpacked object. If not given the packed data object is put in the subscribed buffer.
- logger (logging.Logger, optional) – Optionally provide a logger instance
- zmqcontext (None, optional) – zmq context
- loop (None, optional) – an asyncio event loop
- passoff_callback (None, optional) – An optional callback for overriding the default
buffer. Note: if this is used then
get_data()will always be empty. - name (str, optional) – The name of the subscriber (used in logging)
-
close(hard=True)[source]¶ (Coroutine) Closes subscriber so no more data is put in the buffer
Parameters: hard (bool) – If set to true the buffer will be emptied and any data still in the buffer will be lost.
-
Examples¶
A simple raw listener:
import asyncio
from ssdaq.core import AsyncSubscriber
loop = asyncio.get_event_loop()
sub = AsyncSubscriber(ip="127.0.0.1",port =5555)
async def print_data(loop, sub):
running = True
while running:
try:
data = await sub.get_data()
print(data)
if data is None:
running = False
close = loop.create_task(sub.close())
close.add_done_callback(lambda x: loop.stop())
except asyncio.CancelledError:
print("Exit")
return
task = loop.create_task(print_data(loop, sub))
try:
loop.run_forever()
except KeyboardInterrupt:
print()
if not task.cancelled():
task.cancel()
loop.run_until_complete(sub.close(True))
WriterSubscribers¶
-
class
ssdaq.core.AsyncWriterSubscriber(file_prefix: str, ip: str, port: int, subscriber: ssdaq.core.basesubscribers.AsyncSubscriber, writer, file_ext: str, name: str, folder: str = '', file_enumerator: str = None, filesize_lim: int = None, loop=None)[source]¶ An asynchronous writer subscriber
This class subscribes to a zmq stream and writes the received data to a file using an appropriate writer.
Variables: - log (logging.logger) – logger instance
- loop (asyncio.eventloop) – eventloop
- running (bool) – True if receieving and writing
- stopping (bool) – True after receiveing stop command
-
__init__(file_prefix: str, ip: str, port: int, subscriber: ssdaq.core.basesubscribers.AsyncSubscriber, writer, file_ext: str, name: str, folder: str = '', file_enumerator: str = None, filesize_lim: int = None, loop=None)[source]¶ Summary
Parameters: - file_prefix (str) – Description
- ip (str) – Description
- port (int) – Description
- subscriber (AsyncSubscriber) – Description
- writer (TYPE) – Description
- file_ext (str) – Description
- name (str) – Description
- folder (str, optional) – Description
- file_enumerator (str, optional) – Description
- filesize_lim (int, optional) – Description
- loop (None, optional) – Description
-
class
ssdaq.core.WriterSubscriber(file_prefix: str, ip: str, port: int, subscriber: ssdaq.core.basesubscribers.BasicSubscriber, writer, file_ext: str, name: str, folder: str = '', file_enumerator: str = None, filesize_lim: int = None)[source]¶ Variables: - log (TYPE) – Description
- stopping (bool) – Description
-
__init__(file_prefix: str, ip: str, port: int, subscriber: ssdaq.core.basesubscribers.BasicSubscriber, writer, file_ext: str, name: str, folder: str = '', file_enumerator: str = None, filesize_lim: int = None)[source]¶ Summary
Parameters: - file_prefix (str) – Description
- ip (str) – Description
- port (int) – Description
- subscriber (BasicSubscriber) – Description
- writer (TYPE) – Description
- file_ext (str) – Description
- name (str) – Description
- folder (str, optional) – Description
- file_enumerator (str, optional) – Description
- filesize_lim (int, optional) – Description
-
close(hard: bool = False, non_block: bool = False)[source]¶ Stops the writer by closing the subscriber
Parameters: - hard (bool) – If set to true the subscriber buffer will be dropped and the file will be immediately closed. Any data still in the subscriber buffer will be lost.
- non_block (bool) – If set to true will not block
-
run()[source]¶ Method representing the thread’s activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
Derived Subscribers¶

![digraph {
AsyncSubscriber [shape=box];
BaseFileWriter [shape=box];
RawObjectWriterBase [shape=box];
AsyncWriterSubscriber [shape=box];
"Async<Type>Subscriber" [shape=box];
"<Type>Writer" [shape=box];
"Async<Type>WriterSubscriber" [shape=box];
AsyncSubscriber ->"Async<Type>Subscriber";
BaseFileWriter -> AsyncWriterSubscriber ;
AsyncSubscriber -> AsyncWriterSubscriber [style=dotted] ;
RawObjectWriterBase ->AsyncWriterSubscriber [style=dotted] ;
RawObjectWriterBase -> "<Type>Writer" ;
AsyncWriterSubscriber -> "Async<Type>WriterSubscriber" ;
"Async<Type>Subscriber"-> "Async<Type>WriterSubscriber" [style=dotted] ;
"<Type>Writer"-> "Async<Type>WriterSubscriber" [style=dotted] ;
}](_images/graphviz-01464f82bdae14492d2b9856e47ea01ff97bf1c6.png)
Listing derived subscribers¶
-
class
ssdaq.subscribers.ASlowSignalWriter(file_prefix: str, ip: str, port: int, folder: str = '', file_enumerator: str = None, filesize_lim: int = None, loop=None, name='ASlowSignalWriter')[source]¶ -
__init__(file_prefix: str, ip: str, port: int, folder: str = '', file_enumerator: str = None, filesize_lim: int = None, loop=None, name='ASlowSignalWriter')[source]¶ Summary
Parameters: - file_prefix (str) – Description
- ip (str) – Description
- port (int) – Description
- subscriber (AsyncSubscriber) – Description
- writer (TYPE) – Description
- file_ext (str) – Description
- name (str) – Description
- folder (str, optional) – Description
- file_enumerator (str, optional) – Description
- filesize_lim (int, optional) – Description
- loop (None, optional) – Description
-
-
class
ssdaq.subscribers.ATimestampWriter(file_prefix: str, ip: str, port: int, folder: str = '', file_enumerator: str = None, filesize_lim: int = None, loop=None, name='ATimestampWriter')[source]¶ -
__init__(file_prefix: str, ip: str, port: int, folder: str = '', file_enumerator: str = None, filesize_lim: int = None, loop=None, name='ATimestampWriter')[source]¶ Summary
Parameters: - file_prefix (str) – Description
- ip (str) – Description
- port (int) – Description
- subscriber (AsyncSubscriber) – Description
- writer (TYPE) – Description
- file_ext (str) – Description
- name (str) – Description
- folder (str, optional) – Description
- file_enumerator (str, optional) – Description
- filesize_lim (int, optional) – Description
- loop (None, optional) – Description
-
-
class
ssdaq.subscribers.ATriggerWriter(file_prefix: str, ip: str, port: int, folder: str = '', file_enumerator: str = None, filesize_lim: int = None, loop=None, name='ATriggerWriter')[source]¶ -
__init__(file_prefix: str, ip: str, port: int, folder: str = '', file_enumerator: str = None, filesize_lim: int = None, loop=None, name='ATriggerWriter')[source]¶ Summary
Parameters: - file_prefix (str) – Description
- ip (str) – Description
- port (int) – Description
- subscriber (AsyncSubscriber) – Description
- writer (TYPE) – Description
- file_ext (str) – Description
- name (str) – Description
- folder (str, optional) – Description
- file_enumerator (str, optional) – Description
- filesize_lim (int, optional) – Description
- loop (None, optional) – Description
-
-
class
ssdaq.subscribers.AsyncLogProtoSubscriber(ip: str, port: int, logger: logging.Logger = None, loop=None, name=None)[source]¶ -
__init__(ip: str, port: int, logger: logging.Logger = None, loop=None, name=None)[source]¶ The init of an AsyncSubscriber
Parameters: - ip (str) – The ip address where the datas are published (can be local or remote)
- port (int) – The port number at which the datas are published
- unpack (None, optional) – A callable which takes the received data packet as input and returns an unpacked object. If not given the packed data object is put in the subscribed buffer.
- logger (logging.Logger, optional) – Optionally provide a logger instance
- zmqcontext (None, optional) – zmq context
- loop (None, optional) – an asyncio event loop
- passoff_callback (None, optional) – An optional callback for overriding the default
buffer. Note: if this is used then
get_data()will always be empty. - name (str, optional) – The name of the subscriber (used in logging)
-
-
class
ssdaq.subscribers.AsyncLogSubscriber(ip: str, port: int, logger: logging.Logger = None, loop=None, name=None)[source]¶ -
__init__(ip: str, port: int, logger: logging.Logger = None, loop=None, name=None)[source]¶ The init of an AsyncSubscriber
Parameters: - ip (str) – The ip address where the datas are published (can be local or remote)
- port (int) – The port number at which the datas are published
- unpack (None, optional) – A callable which takes the received data packet as input and returns an unpacked object. If not given the packed data object is put in the subscribed buffer.
- logger (logging.Logger, optional) – Optionally provide a logger instance
- zmqcontext (None, optional) – zmq context
- loop (None, optional) – an asyncio event loop
- passoff_callback (None, optional) – An optional callback for overriding the default
buffer. Note: if this is used then
get_data()will always be empty. - name (str, optional) – The name of the subscriber (used in logging)
-
-
class
ssdaq.subscribers.AsyncMonSubscriber(ip: str, port: int, logger: logging.Logger = None, loop=None, name=None)[source]¶ -
__init__(ip: str, port: int, logger: logging.Logger = None, loop=None, name=None)[source]¶ The init of an AsyncSubscriber
Parameters: - ip (str) – The ip address where the datas are published (can be local or remote)
- port (int) – The port number at which the datas are published
- unpack (None, optional) – A callable which takes the received data packet as input and returns an unpacked object. If not given the packed data object is put in the subscribed buffer.
- logger (logging.Logger, optional) – Optionally provide a logger instance
- zmqcontext (None, optional) – zmq context
- loop (None, optional) – an asyncio event loop
- passoff_callback (None, optional) – An optional callback for overriding the default
buffer. Note: if this is used then
get_data()will always be empty. - name (str, optional) – The name of the subscriber (used in logging)
-
-
class
ssdaq.subscribers.AsyncSSReadoutSubscriber(ip: str, port: int, logger: logging.Logger = None, loop=None, name=None)[source]¶ -
__init__(ip: str, port: int, logger: logging.Logger = None, loop=None, name=None)[source]¶ The init of an AsyncSubscriber
Parameters: - ip (str) – The ip address where the datas are published (can be local or remote)
- port (int) – The port number at which the datas are published
- unpack (None, optional) – A callable which takes the received data packet as input and returns an unpacked object. If not given the packed data object is put in the subscribed buffer.
- logger (logging.Logger, optional) – Optionally provide a logger instance
- zmqcontext (None, optional) – zmq context
- loop (None, optional) – an asyncio event loop
- passoff_callback (None, optional) – An optional callback for overriding the default
buffer. Note: if this is used then
get_data()will always be empty. - name (str, optional) – The name of the subscriber (used in logging)
-
-
class
ssdaq.subscribers.AsyncTimestampSubscriber(ip: str, port: int, logger: logging.Logger = None, loop=None, name=None)[source]¶ -
__init__(ip: str, port: int, logger: logging.Logger = None, loop=None, name=None)[source]¶ The init of an AsyncSubscriber
Parameters: - ip (str) – The ip address where the datas are published (can be local or remote)
- port (int) – The port number at which the datas are published
- unpack (None, optional) – A callable which takes the received data packet as input and returns an unpacked object. If not given the packed data object is put in the subscribed buffer.
- logger (logging.Logger, optional) – Optionally provide a logger instance
- zmqcontext (None, optional) – zmq context
- loop (None, optional) – an asyncio event loop
- passoff_callback (None, optional) – An optional callback for overriding the default
buffer. Note: if this is used then
get_data()will always be empty. - name (str, optional) – The name of the subscriber (used in logging)
-
-
class
ssdaq.subscribers.AsyncTriggerSubscriber(ip: str, port: int, logger: logging.Logger = None, loop=None, name=None)[source]¶ -
__init__(ip: str, port: int, logger: logging.Logger = None, loop=None, name=None)[source]¶ The init of an AsyncSubscriber
Parameters: - ip (str) – The ip address where the datas are published (can be local or remote)
- port (int) – The port number at which the datas are published
- unpack (None, optional) – A callable which takes the received data packet as input and returns an unpacked object. If not given the packed data object is put in the subscribed buffer.
- logger (logging.Logger, optional) – Optionally provide a logger instance
- zmqcontext (None, optional) – zmq context
- loop (None, optional) – an asyncio event loop
- passoff_callback (None, optional) – An optional callback for overriding the default
buffer. Note: if this is used then
get_data()will always be empty. - name (str, optional) – The name of the subscriber (used in logging)
-
-
class
ssdaq.subscribers.BasicLogSubscriber(ip: str, port: int, logger: logging.Logger = None)[source]¶ -
__init__(ip: str, port: int, logger: logging.Logger = None)[source]¶ The init of a BasicSubscriber
Parameters: - ip (str) – The ip address where the datas are published (can be local or remote)
- port (int) – The port number at which the datas are published
- unpack (None, optional) – A callable which takes the received data packet as input and returns an unpacked object. If not given the packed data object is put in the subscribed buffer.
- logger (logging.Logger, optional) – Optionally provide a logger instance
- zmqcontext (None, optional) – Description
-
-
class
ssdaq.subscribers.BasicMonSubscriber(ip: str, port: int, logger: logging.Logger = None)[source]¶ -
__init__(ip: str, port: int, logger: logging.Logger = None)[source]¶ The init of a BasicSubscriber
Parameters: - ip (str) – The ip address where the datas are published (can be local or remote)
- port (int) – The port number at which the datas are published
- unpack (None, optional) – A callable which takes the received data packet as input and returns an unpacked object. If not given the packed data object is put in the subscribed buffer.
- logger (logging.Logger, optional) – Optionally provide a logger instance
- zmqcontext (None, optional) – Description
-
-
class
ssdaq.subscribers.BasicTimestampSubscriber(ip: str, port: int, logger: logging.Logger = None)[source]¶ -
__init__(ip: str, port: int, logger: logging.Logger = None)[source]¶ The init of a BasicSubscriber
Parameters: - ip (str) – The ip address where the datas are published (can be local or remote)
- port (int) – The port number at which the datas are published
- unpack (None, optional) – A callable which takes the received data packet as input and returns an unpacked object. If not given the packed data object is put in the subscribed buffer.
- logger (logging.Logger, optional) – Optionally provide a logger instance
- zmqcontext (None, optional) – Description
-
-
class
ssdaq.subscribers.BasicTriggerSubscriber(ip: str, port: int, logger: logging.Logger = None)[source]¶
-
class
ssdaq.subscribers.LogProtoSubscriber(ip: str, port: int, logger: logging.Logger = None)[source]¶ -
__init__(ip: str, port: int, logger: logging.Logger = None)[source]¶ The init of a BasicSubscriber
Parameters: - ip (str) – The ip address where the datas are published (can be local or remote)
- port (int) – The port number at which the datas are published
- unpack (None, optional) – A callable which takes the received data packet as input and returns an unpacked object. If not given the packed data object is put in the subscribed buffer.
- logger (logging.Logger, optional) – Optionally provide a logger instance
- zmqcontext (None, optional) – Description
-
-
class
ssdaq.subscribers.LogWriter(file_prefix: str, ip: str, port: int, folder: str = '', file_enumerator: str = None, filesize_lim: int = None)[source]¶ -
__init__(file_prefix: str, ip: str, port: int, folder: str = '', file_enumerator: str = None, filesize_lim: int = None)[source]¶ Summary
Parameters: - file_prefix (str) – Description
- ip (str) – Description
- port (int) – Description
- subscriber (BasicSubscriber) – Description
- writer (TYPE) – Description
- file_ext (str) – Description
- name (str) – Description
- folder (str, optional) – Description
- file_enumerator (str, optional) – Description
- filesize_lim (int, optional) – Description
-
-
class
ssdaq.subscribers.SSFileWriter(file_prefix: str, ip: str, port: int, folder: str = '', file_enumerator: str = None, filesize_lim: int = None)[source]¶ -
__init__(file_prefix: str, ip: str, port: int, folder: str = '', file_enumerator: str = None, filesize_lim: int = None)[source]¶ Summary
Parameters: - file_prefix (str) – Description
- ip (str) – Description
- port (int) – Description
- subscriber (BasicSubscriber) – Description
- writer (TYPE) – Description
- file_ext (str) – Description
- name (str) – Description
- folder (str, optional) – Description
- file_enumerator (str, optional) – Description
- filesize_lim (int, optional) – Description
-
-
class
ssdaq.subscribers.SSReadoutSubscriber(ip: str, port: int, logger: logging.Logger = None)[source]¶ A slow signal subscriber
-
class
ssdaq.subscribers.TimestampWriter(file_prefix: str, ip: str, port: int, folder: str = '', file_enumerator: str = None, filesize_lim: int = None)[source]¶ -
__init__(file_prefix: str, ip: str, port: int, folder: str = '', file_enumerator: str = None, filesize_lim: int = None)[source]¶ Summary
Parameters: - file_prefix (str) – Description
- ip (str) – Description
- port (int) – Description
- subscriber (BasicSubscriber) – Description
- writer (TYPE) – Description
- file_ext (str) – Description
- name (str) – Description
- folder (str, optional) – Description
- file_enumerator (str, optional) – Description
- filesize_lim (int, optional) – Description
-
-
class
ssdaq.subscribers.TriggerWriter(file_prefix: str, ip: str, port: int, folder: str = '', file_enumerator: str = None, filesize_lim: int = None)[source]¶ -
__init__(file_prefix: str, ip: str, port: int, folder: str = '', file_enumerator: str = None, filesize_lim: int = None)[source]¶ Summary
Parameters: - file_prefix (str) – Description
- ip (str) – Description
- port (int) – Description
- subscriber (BasicSubscriber) – Description
- writer (TYPE) – Description
- file_ext (str) – Description
- name (str) – Description
- folder (str, optional) – Description
- file_enumerator (str, optional) – Description
- filesize_lim (int, optional) – Description
-