Source code for ssdaq.data._ioimpl

### Specialization to different protobuf protocols#####
from . import LogData, TriggerMessage, TriggerPacket, Frame

from ssdaq.core.io import RawObjectWriterBase, RawObjectReaderBase
from ssdaq import version as checvers
import struct

_header = struct.Struct("<2H")  # version:datatype
_headers = {1: _header}


class CHECFileHeader:
    def __init__(self, unpacker: int, version: int = 1, ssdaq_version: str = None):
        self.unpacker = unpacker
        self.ssdaq_version = ssdaq_version or str(checvers.get_version())
        self.version = version

    def pack(self):
        return struct.pack(
            "<2HB{}s".format(len(self.ssdaq_version)),
            self.version,
            self.unpacker,
            len(self.ssdaq_version),
            self.ssdaq_version.encode(),
        )

    @classmethod
    def unpack(cls, data):
        header = struct.Struct("<2HB")
        version, unpacker, n = header.unpack(data[: header.size])
        ssdaq_version = struct.unpack("{}s".format(n), data[header.size :])
        return version, unpacker, ssdaq_version  # cls(version,unpacker,ssdaq_version)


class LogWriter(RawObjectWriterBase):
    """
    """

    def __init__(self, filename: str, **kwargs):
        super().__init__(filename, header_ext=CHECFileHeader(1).pack(), **kwargs)

    def write(self, log: LogData):
        super().write(log.SerializeToString())


class LogReader(RawObjectReaderBase):
    def read(self):
        log = LogData()
        data = super().read()
        log.ParseFromString(data)
        return log


class TimestampWriter(RawObjectWriterBase):
    def __init__(self, filename: str, **kwargs):
        super().__init__(filename, header_ext=CHECFileHeader(2).pack(), **kwargs)

    def write(self, timestamp):
        super().write(timestamp.SerializeToString())


class TimestampReader(RawObjectReaderBase):
    def read(self):
        timestamp = TriggerMessage()
        data = super().read()
        timestamp.ParseFromString(data)
        return timestamp


class TriggerWriter(RawObjectWriterBase):
    def __init__(self, filename: str, **kwargs):
        super().__init__(filename, header_ext=CHECFileHeader(3).pack(), **kwargs)

    def write(self, trigg):
        super().write(trigg.pack())


class RawTriggerWriter(RawObjectWriterBase):
    def __init__(self, filename: str, **kwargs):
        super().__init__(filename, header_ext=CHECFileHeader(3).pack(), **kwargs)

    def write(self, data: bytes):
        super().write(data)


class FrameWriter(RawObjectWriterBase):
    """
    """

    def __init__(self, filename: str, **kwargs):
        super().__init__(filename, header_ext=CHECFileHeader(4).pack(), **kwargs)

    def write(self, frame):
        super().write(frame.serialize())


# Manually list unpackers for now
class TriggerReader(RawObjectReaderBase):
    def read(self):
        return TriggerPacket.unpack(super().read())
        # return TriggerPacketData.unpack(super().read())


def log_unpack(data):
    log = LogData()
    log.ParseFromString(data)
    return log


def timestamp_unpack(data):
    timestamp = TriggerMessage()
    timestamp.ParseFromString(data)
    return timestamp


_unpackers = [(log_unpack,LogData),
                (timestamp_unpack,TriggerMessage),
                (TriggerPacket.unpack,TriggerPacket), (Frame.unpack,Frame)]


[docs]class DataReader(RawObjectReaderBase):
[docs] def __init__(self, filename: str): """ Open a Streamed Object File to read. Args: filename (str): path and filename Raises: ValueError: Is raised if no suitable unpacker is found """ super().__init__(filename) if self.version == 0: if self.fhead == 0 or self.fhead > len(_unpackers): raise ValueError("No compatible reader found for this file...") self._unpack,_ = _unpackers[self.fhead - 1] self.resetfp() else: chec_file_version, datatype, ssdaq_version = CHECFileHeader.unpack( self._reader._headext ) self._unpack,cls = _unpackers[datatype - 1] self.metadata["ssdaq_version"] = ssdaq_version[0].decode() self.metadata["chec_file_version"] = chec_file_version self.metadata["data_type"] = "<{}.{}>".format(cls.__module__,cls.__name__)
[docs] def read(self): return self._unpack(super().read())
[docs] def read_at(self, ind: int): return self._unpack(super().read_at(ind))
def readobjects(self): self.resetfp() for i in range(self.n_entries): yield self._unpack(super().read())
# ====================HDF5 Slow signal data reader and writer definitions=============== import numpy as np import tables from tables import IsDescription, UInt64Col, Float32Col, Float64Col from collections import namedtuple as _nt from ssdaq.version import get_version from ._dataimpl.slowsignal_format import SSReadout, N_TM_PIX, N_TM, ss_mappings class TelData(IsDescription): db_t = Float64Col() db_t_s = UInt64Col() db_t_ns = UInt64Col() ra = Float64Col() dec = Float64Col() class SSReadoutTableDs(IsDescription): iro = UInt64Col() # readout numner/index time = UInt64Col() # TACK timestamp cpu_t = Float64Col() # native python timestamp float64 cpu_t_s = UInt64Col() # seconds time stamp uint64 cpu_t_ns = UInt64Col() # nano seconds time stamp uint64 data = Float32Col((N_TM, N_TM_PIX)) # 2D data array containing
[docs]class SSDataWriter(object): """A writer for Slow Signal data"""
[docs] def __init__( self, filename: str, attrs: dict = None, filters: tables.Filters = None, buffer: int = 1000, tel_table=True, ): self.filename = filename filters = ( filters if filters != None else tables.Filters( complevel=9, complib="bzip2", # gives us 5 times the speed for the same compression as zlib fletcher32=True, ) ) self.file = tables.open_file( self.filename, mode="w", title="CHEC-S Slow signal monitor data", filters=filters, ) self.group = self.file.create_group( self.file.root, "SlowSignal", "Slow signal data" ) self.table = self.file.create_table( self.group, "readout", SSReadoutTableDs, "Slow signal readout" ) self.tel_table = None self.tables = [self.table] if tel_table: self.tel_table = self.file.create_table( self.group, "tel_table", TelData, "Telescope data" ) self.tel_row = self.tel_table.row self.tables.append(self.tel_table) self.ro_row = self.table.row self.data_counter = 0 self.buffer = buffer self._cur_buf = 0 if attrs is not None: for k, v in attrs.items(): self.table.attrs[k] = v self.table.attrs["ss_data_version"] = 0 self.table.attrs["ssdaq_version"] = get_version(pep440=True) self.table.attrs["ssl2asic_ch"] = ss_mappings.ssl2asic_ch
def write_tel_data(self, ra, dec, time, seconds, ns): self.tel_row["db_t"] = time self.tel_row["db_t_s"] = seconds self.tel_row["db_t_ns"] = ns self.tel_row["ra"] = ra self.tel_row["dec"] = dec self.tel_row.append() def write(self, ro): self.write_readout(ro) def write_readout(self, ro): self.ro_row["iro"] = ro.iro self.ro_row["time"] = ro.time self.ro_row["cpu_t"] = ro.cpu_t self.ro_row["cpu_t_s"] = ro.cpu_t_s self.ro_row["cpu_t_ns"] = ro.cpu_t_ns self.ro_row["data"] = np.asarray(ro.data, dtype=np.float32) self.ro_row.append() self._cur_buf += 1 if self._cur_buf >= self.buffer: self._flush() self._cur_buf = 0 self.data_counter += 1 def _flush(self): for table in self.tables: table.flush() def close(self): self.close_file()
[docs] def close_file(self): """Closes file handle """ self._flush() self.file.close()
[docs]class SSDataReader(object): """A reader for Slow Signal data"""
[docs] def __init__(self, filename, mapping="ssl2asic_ch"): """ Args: filename (str): path to file Kwargs: mapping (str): determines how the pixels are mapped. Two mappings are availabe: 'ssl2colrow' and 'ssl2asic_ch' which correspond to a 2D col-row layout and ASIC-channel which is the same ordering used for the fast signal data. """ self.filename = filename self.file = tables.open_file(self.filename, mode="r") self.raw_data = np.zeros((N_TM, N_TM_PIX), dtype=np.float32) self.data = np.zeros((N_TM, N_TM_PIX), dtype=np.float32) self.iro = None self.time = None self.cpu_t = None self._readversions = {0: self._read0} self.attrs = self.file.root.SlowSignal.readout.attrs try: self.map = ss_mappings.__getattribute__(mapping) except: raise ValueError("No mapping found with name %s" % mapping) if "ss_data_version" not in self.attrs: raise RuntimeError( "This is probably a pre v0.9.0 file which is not supported anymore" ) else: self._read = self._readversions[self.attrs.ss_data_version]
def __iter__(self, start=None, stop=None, step=None): return self.read(start, stop, step) def __getitem__(self, iro): if isinstance(iro, slice): ro_list = [self[ii] for ii in range(*iro.indices(self.n_readouts))] return np.array(ro_list) elif isinstance(iro, list): ro_list = [self[ii] for ii in iro] return np.array(ro_list) elif isinstance(iro, int): if iro < 0: iro += self.n_events if iro < 0 or iro >= len(self): raise IndexError("The requested event ({}) is out of range".format(iro)) return np.copy(self.read(iro).__next__()) else: raise TypeError("Invalid argument type") def __len__(self): return self.n_readouts
[docs] def read(self, start=None, stop=None, step=None): """ A data file iterator for reading data rows. Args: start (int): starting row number stop (int): stopping row number step (int): size of the step at each iteration """ if stop is None and start is not None: stop = start + 1 return self._read(start, stop, step)
def _read0(self, start=None, stop=None, step=None): for r in self.file.root.SlowSignal.readout.iterrows(start, stop, step): self.raw_data[:] = r["data"] self.data[:] = r["data"][:, self.map] self.iro = r["iro"] self.time = r["time"] self.cpu_t = r["cpu_t"] self.cpu_t_s = r["cpu_t_s"] self.cpu_t_ns = r["cpu_t_ns"] yield self.data @property def n_readouts(self): return self.file.root.SlowSignal.readout.nrows @property def ssreadout(self): return SSReadout(self.time, self.iro, data=self.raw_data, cpu_t=self.cpu_t)
[docs] def load_all_data(self, tm, calib=None, mapping=None): """ Loads all rows of data for a particular target moduel into memory (in the future a selection of modules) Args: tm (int): The slot number of the target module Kwargs: calib (arraylike): an array with calibration coefficient that should be applied to the data mapping (str or arraylike): a string to select a mapping or an array with the mapping ['ssl2colrow','ssl2asic_ch','raw'] """ if calib is None: calib = 1.0 if mapping is None: mapping = self.map elif isinstance(mapping, str): if mapping == "raw": mapping = np.arange(N_TM_PIX) else: try: mapping = ss_mappings.__getattribute__(mapping) except: raise ValueError("No mapping found with name %s" % mapping) amps = np.zeros((self.n_readouts, N_TM_PIX)) time = np.zeros(self.n_readouts, dtype=np.uint64) cpu_t = np.zeros(self.n_readouts) iro = np.zeros(self.n_readouts, dtype=np.uint64) for i, r in enumerate(self.read()): amps[i, :] = self.raw_data[tm, :] * calib time[i] = self.time cpu_t[i] = self.cpu_t iro[i] = self.iro amps = amps[:, mapping] ssdata = _nt("ssdata", "iro amps time cpu_t tm") return ssdata(iro, amps, time, cpu_t, tm)
def load_as_pd_table(self): import pandas as pd data = [] for r in self.read(): amps = r.flatten() # self.data.flatten() for i in range(2048): data.append( { "iro": self.iro, "time": self.time, "cpu_t": self.cpu_t, "pix": i, "amp": amps[i], } ) df = pd.DataFrame(data) df.set_index(["iro", "pix"], inplace=True) # return df def __repr__(self): return repr(self.file) def __str__(self): s = "SSDataReader:\n" s += " Filename:%s\n" % self.filename s += " Title: CHEC-S Slow signal monitor data\n" s += " n_readouts: %d\n" % self.n_readouts s += " ssdata-version: %d\n" % self.attrs.ss_data_version return s
[docs] def close_file(self): """Closes file handle """ self.file.close()