Source code for ssdaq.receivers.readout_assembler

import asyncio
import struct
import numpy as np
from datetime import datetime, timedelta
from ssdaq.data._dataimpl import slowsignal_format as dc
from ssdaq.data import SSReadout
from ssdaq.core.receiver_server import ReceiverServer
from .mon_sender import ReceiverMonSender
from collections import defaultdict

READOUT_LENGTH = (
    dc.N_TM_PIX * 2 + 2 * 8
)  # 64 2-byte channel amplitudes and 2 8-byte timestamps
packet_format = struct.Struct(">Q32HQ32H")
first_tack = struct.Struct(">Q")


[docs]class SlowSignalDataProtocol(asyncio.Protocol):
[docs] def __init__(self, loop, log, relaxed_ip_range, mon, packet_debug_stream_file=None): self._buffer = asyncio.Queue() self.loop = loop self.log = log.getChild("SlowSignalDataProtocol") self.relaxed_ip_range = relaxed_ip_range self.mon = mon
[docs] def connection_made(self, transport): self.log.info("Connected to port") self.transport = transport
def datagram_received(self, data, addr): cpu_time = datetime.utcnow() self.mon.register_data_packet() if len(data) % (READOUT_LENGTH) != 0: self.log.warn("Got unsuported packet size, skipping packet") self.log.info("Bad package came from %s:%d" % tuple(data[0])) return # getting the module number from the last two digits of the ip ip = addr[0] module_nr = int(ip[-ip[::-1].find(".") :]) % 100 # self.log.info("Got packet from module {}".format(module_nr)) if module_nr > dc.N_TM - 1 and self.relaxed_ip_range: # ensure that the module number is in the allowed range # (mostly important for local or standalone setups simulations) module_nr = module_nr % dc.N_TM # self.log.debug('Got data from ip %s which is outsie the allowed range'%ip) elif module_nr > dc.N_TM - 1: self.log.error("Error: got packets from ip out of range:") self.log.error(" %s" % ip) self.log.error("This can be supressed if relaxed_ip_range=True") raise RuntimeError self.loop.create_task(self._buffer.put((module_nr, data, cpu_time)))
class MatchedPacket: def __init__(self, tm_num, data, tack, cpu_t, nreadouts): self.data = [None] * dc.N_TM self.data[tm_num] = data self.tm_parts = [0] * dc.N_TM self.tm_parts[tm_num] = 1 self.tms = [tm_num] self.tack = tack self.cpu_t = [cpu_t] self.nreadouts = nreadouts def add_part(self, tm_num, data, cpu_t): self.data[tm_num] = data self.tm_parts[tm_num] = 1 self.tms.append(tm_num) self.cpu_t.append(cpu_t)
[docs]class ReadoutAssembler(ReceiverServer): """ Slow signal readout assembler. Constructs slow signal readouts from data packets recieved from Target Modules. """
[docs] def __init__( self, relaxed_ip_range: bool = False, readout_tw: float = 0.0001 * 1e9, listen_ip: str = "0.0.0.0", listen_port: int = 2009, buffer_length: int = 1000, buffer_time: float = 10 * 1e9, publishers: list = None, packet_debug_stream_file: str = None, ): """Summary Args: relaxed_ip_range (bool, optional): Description readout_tw (float, optional): Description listen_ip (str, optional): Description listen_port (int, optional): Description buffer_length (int, optional): Description buffer_time (float, optional): Description publishers (list, optional): Description packet_debug_stream_file (str, optional): Description """ super().__init__(listen_ip, listen_port, publishers, "ReadoutAssembler") self.relaxed_ip_range = relaxed_ip_range self.transport, self.ss_data_protocol = self.setup_udp( lambda: SlowSignalDataProtocol( self.loop, self.log, self.relaxed_ip_range, ReceiverMonSender("ReadoutAssembler", self.loop, self._context), packet_debug_stream_file=packet_debug_stream_file, ) ) # settings self.readout_tw = int(readout_tw) self.listen_addr = (listen_ip, listen_port) self.buffer_len = buffer_length self.buffer_time = buffer_time # counters self.nprocessed_packets = 0 self.nconstructed_readouts = 0 self.readout_count = 1 self.packet_counter = {} self.readout_counter = defaultdict(lambda: 1) # controlers self.publish_readouts = True # buffers self.inter_buff = [] self.partial_ro_buff = asyncio.queues.collections.deque(maxlen=self.buffer_len) self.ro_part_buff = asyncio.queues.collections.deque(maxlen=self.buffer_len)
def cmd_reset_ro_count(self, arg): self.log.info("Readout count has been reset") self.readout_count = 1 return b"Readout count reset" def cmd_set_publish_readouts(self, arg): if arg[0] == "false" or arg[0] == "False": self.publish_readouts = False self.log.info("Pause publishing readouts") return b"Paused readout publishing" elif arg[0] == "true" or arg[0] == "True": self.publish_readouts = True self.log.info("Pause publishing readouts") return b"Unpaused readout publishing" else: self.log.info( "Unrecognized command for command `set_publish_readouts` \n no action taken" ) return ( "Unrecognized arg `%s` for command `set_publish_readouts` \nno action taken" % arg[0] ).encode("ascii") async def ct_assembler(self): n_packets = 0 self.log.info("Empty socket buffer before starting readout building") # _ = await self.ss_data_protocol._buffer.get() got_packet = True while got_packet: got_packet = False self.log.info("Thrown away %d packets in buffer before start" % n_packets) try: while True: await asyncio.wait_for( self.ss_data_protocol._buffer.get(), timeout=0 ) n_packets += 1 got_packet = True except: pass self.log.info("Thrown away %d packets in buffer before start" % n_packets) self.log.info("Fetching first packet") module, data, cpu_time = await self.ss_data_protocol._buffer.get() tack = first_tack.unpack_from(data, 0)[0] nreadouts = int(len(data) / (READOUT_LENGTH)) self.partial_ro_buff.append( MatchedPacket(module, data, tack, cpu_time, nreadouts) ) self.log.info("Starting readout build loop") while True: module, data, cpu_time = await self.ss_data_protocol._buffer.get() tack = first_tack.unpack_from(data, 0)[0] nreadouts = int(len(data) / (READOUT_LENGTH)) # self.log.debug('Got packet from front buffer with timestamp %f and tm id %d'%(packet[1]*1e-9,packet[0])) pro = self.partial_ro_buff[-1] dt = pro.tack - tack if abs(dt) < self.readout_tw: self.partial_ro_buff[-1].add_part(module, data, cpu_time) elif dt < 0: self.partial_ro_buff.append( MatchedPacket(module, data, tack, cpu_time, nreadouts) ) else: found = False for i in range(len(self.partial_ro_buff) - 1, 0, -1): pro = self.partial_ro_buff[i] dt = pro.timestamp - tack if abs(dt) < self.readout_tw: self.partial_ro_buff[-1].add_part(module, data, cpu_time) found = True break if not found: self.log.warning( "No matching packets found for packet with timestamp %d and tm id %d" % (tack, module) ) assembling = True while assembling: if abs( float(self.partial_ro_buff[-1].tack) - float(self.partial_ro_buff[0].tack) ) > (self.buffer_time): readouts = self.assemble_readouts(self.partial_ro_buff.popleft()) for readout in readouts: await self.publish(readout.pack()) else: assembling = False # self.log.info("Buffer length {}".format(len(self.partial_ro_buff)))
[docs] def assemble_readouts(self, matched): """Summary Args: matched (TYPE): Description Returns: TYPE: Description """ # construct readout r_cpu_time_0 = np.min(matched.cpu_t) readouts = [] tms = sorted(matched.tms) tack0 = matched.tack dts = [] for i in range(matched.nreadouts): tack = first_tack.unpack_from(matched.data[tms[0]], i * (READOUT_LENGTH))[0] dt = tack - tack0 dts.append(dt) if i > 0 and dt == 0: self.log.warn( "Subsequent readouts with the same tack {}, {}".format(tack, i) ) r_cpu_time = r_cpu_time_0 + timedelta(microseconds=dt * 1e-3) cpu_time_s = int(r_cpu_time.timestamp()) cpu_time_ns = int((r_cpu_time.timestamp() - cpu_time_s) * 1e9) readout = SSReadout(tack, self.readout_count, cpu_time_s, cpu_time_ns) for tm in tms: tmp_data = packet_format.unpack_from( matched.data[tm], i * (READOUT_LENGTH) ) self.readout_counter[tm] += 1 # put data into a temporary array of uint type tmp_array = np.empty(dc.N_TM_PIX, dtype=np.uint64) tmp_array[: dc.N_TM] = tmp_data[1 : dc.N_TM + 1] tmp_array[dc.N_TM :] = tmp_data[dc.N_TM + 2 :] # converting counts to mV m = tmp_array < 0x8000 tmp_array[m] += 0x8000 tmp_array[~m] = tmp_array[~m] & 0x7FFF readout.data[tm] = tmp_array * 0.03815 * 2.0 self.nconstructed_readouts += 1 self.readout_count += 1 readouts.append(readout) return readouts
if __name__ == "__main__": from ssdaq import sslogger import logging import os from subprocess import call from ssdaq.core.publishers.zmq_tcp_publisher import ZMQTCPPublisher call(["taskset", "-cp", "0,4", "%s" % (str(os.getpid()))]) sslogger.setLevel(logging.INFO) zmq_pub = ZMQTCPPublisher("127.0.0.101", 5555) ro_assembler = ReadoutAssembler(publishers=[zmq_pub]) ro_assembler.run()