Source code for ssdaq.core.io

import struct
import binascii
from .utils import get_si_prefix
from datetime import datetime
import os
from typing import Union
import numpy as np
import bz2

_chunk_header = struct.Struct("<2I")
_file_header = struct.Struct("<Q4I")

###Raw object IO classes#####


[docs]class RawObjectWriterBase: """ Base class of a file object for writing indexable chunks of serialized data to file. """ _protocols = {}
[docs] def __init__(self, filename: str, protocol: int = 1, compressor="bz2", **kwargs): """Summary Args: filename (str): Description protocol (int, optional): Description compressor (str, optional): Description **kwargs: Description """ self._writer = RawObjectWriterBase._protocols[protocol]( filename, compressor=compressor, **kwargs ) self.protocol = protocol
def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self._writer.close()
[docs] def write(self, data: bytes): """ Writes a stream of bytes to file args: data (bytes): bytes to be writen to file """ self._writer.write(data)
[docs] def close(self): """ Closes the file handle """ self._writer.close()
@classmethod def _register(cls, scls): cls._protocols[scls._protocol_v] = scls return scls @property def data_counter(self) -> int: """Counter of the number of objects written to file Returns: int """ return self._writer.data_counter
@RawObjectWriterBase._register class RawObjectWriterV0: """ Acts as a file object for writing chunks of serialized data to file. Prepends each chunk with: chunk length in bytes (4 bytes) and a crc32 hash (4 bytes) The file header is 24 bytes long and has the following layout bytes: field: 0-7 Custom field for file format specifications (set by the header parameter) 8-11 Protocol version 12-15 Not used 16-19 Not used 20-23 Not used General file structure: +-------------+ | File Header | +-------------+ | Chunk Header| +-------------+ | Data | +-------------+ | Chunk Header| +-------------+ | Data | +-------------+ ... ... """ _protocol_v = 0 def __init__(self, filename: str, header: int = 0): self.filename = filename self.file = open(self.filename, "wb") self.data_counter = 0 self.version = 0 self.file.write(_file_header.pack(header, self.version, 0, 0, 0)) # self.protocol = protocol def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.file.close() def write(self, data: bytes): """ Writes a stream of bytes to file args: data (bytes): bytes to be writen to file """ self.file.write(_chunk_header.pack(len(data), binascii.crc32(data))) self.file.write(data) self.data_counter += 1 def close(self): self.file.close() @RawObjectWriterBase._register class RawObjectWriterV1: """ An Indexed container file. """ _protocol_v = 1 _file_header = struct.Struct("<4s4sIQ2H") _bunch_trailer_header = struct.Struct("<3Q2IH") _compressors = {"bz2": (bz2, 1)} def __init__( self, filename: str, header_ext: bytes = None, marker_ext: str = "", compressor=None, bunchsize: int = 1000000, ): """Summary Args: filename (str): Description header_ext (bytes, optional): Description marker_ext (str, optional): Description compressor (None, optional): Description bunchsize (int, optional): Description """ self.filename = filename self._file = open(self.filename, "wb") self.compress = False compressor_id = 0 if compressor is not None: self.compress = True self.compressor = RawObjectWriterV1._compressors[compressor][0] compressor_id = RawObjectWriterV1._compressors[compressor][1] self.data_counter = 0 self.version = RawObjectWriterV1._protocol_v self.time_stamp = int(datetime.now().timestamp()) self.bunchsize = bunchsize self._fp = 0 self._marker_ext = marker_ext header_ext = header_ext or [] self._write( RawObjectWriterV1._file_header.pack( "SOF".encode(), marker_ext.encode(), self.version, self.time_stamp, compressor_id, len(header_ext), ) ) if len(header_ext) > 0: self._write(header_ext) self._buffer = [] self._cbunchindex = [] self._cbunchoffset = 0 self._last_bunch_fp = 0 self._bunch_number = 0 def _write(self, data: bytes): self._file.write(data) self._fp += len(data) def write(self, data: bytes): """ Writes a stream of bytes to file args: data (bytes): bytes to be writen to file """ self._buffer.append(data) self._cbunchindex.append((binascii.crc32(data), len(data))) self._cbunchoffset += len(data) self.data_counter += 1 if self._cbunchoffset > self.bunchsize: self.flush() def flush(self): if len(self._buffer) < 1: return bunch_start_fp = self._fp # writing the data bunch if self.compress: byte_buff = bytearray() for data in self._buffer: byte_buff.extend(data) self._write(self.compressor.compress(byte_buff)) else: for data in self._buffer: self._write(data) # constructing the index and writing it in the bunch trailer index = list(zip(*self._cbunchindex)) n = len(self._buffer) bunch_index = struct.pack("{}I{}I".format(n, n), *index[0], *index[1]) self._write(bunch_index) bunch_crc = 0 bunch_index_trailer = RawObjectWriterV1._bunch_trailer_header.pack( self._fp - self._last_bunch_fp, self._fp - bunch_start_fp, self._fp, bunch_crc, len(self._buffer), self._bunch_number, ) # before writing the bunch trailer we update # the file pointer for the last bunch self._last_bunch_fp = self._fp self._write(bunch_index_trailer) # reseting/updating the last bunch descriptors self._cbunchindex.clear() self._buffer.clear() self._cbunchoffset = 0 self._bunch_number += 1 def close(self): self.flush() self._file.close()
[docs]class RawObjectReaderBase: """This class Attributes: file (TYPE): Description filename (TYPE): Description metadata (dict): Description """ _protocols = {}
[docs] def __init__(self, filename: str): """ Reads Streamed Object Files Args: filename (str): filename and path to the file Raises: TypeError: Raised if the file is not recognized as SOF """ self.filename = filename self.file = open(self.filename, "rb") self._fhead = self.file.read(12) # _file_header.size) self.file.seek(0) self.metadata = {} self.fhead, self.version = struct.unpack("QI", self._fhead) if self.version == 0 and self.fhead >= 0: readerclass = RawObjectReaderBase._protocols[self.version] self._reader = readerclass(self.file) elif self.version > 0: readerclass = RawObjectReaderBase._protocols[self.version] self._fhead = self.file.read(readerclass._file_header.size) # marker,extmarker,self.version,self.timestamp,self.compressed,self.lenheadext = readerclass._file_header.unpack(self._fhead) self._reader = readerclass(self.file) else: raise TypeError("This file appears not to be a stream object file (SOF)")
@classmethod def _register(cls, scls): cls._protocols[scls._protocol_v] = scls return scls def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.file.close()
[docs] def reload(self): """ Reload the index table. Useful if the file is being written too when read """ self._reader.reload()
[docs] def resetfp(self): """ Resets file pointer to the first object in file """ self._reader.resetfp()
def __getitem__(self, ind: Union[int, slice, list]): """Indexing interface to the streamed file. Objects are read by their index, slice or list of indices. Args: ind (Union[int, slice, list]): Description Returns: TYPE: Description """ if isinstance(ind, slice): data = [self.read_at(ii) for ii in range(*ind.indices(self.n_entries))] return data elif isinstance(ind, list): data = [self.read_at(ii) for ii in ind] return data elif isinstance(ind, int): return self.read_at(ind)
[docs] def read_at(self, ind: int) -> bytes: """Reads one object at the index indicated by `ind` Args: ind (int): the index of the object to be read Returns: bytes: that represent the object Raises: IndexError: if index out of range """ return self._reader.read_at(ind)
[docs] def read(self) -> bytes: """ Reads one object at the position of the file pointer. Returns: bytes: Bytes that represent the object """ return self._reader.read()
[docs] def close(self): """ closes file handle """ self.file.close()
@property def n_entries(self) -> int: """ number of objects in file. Returns: int: file object count """ return self._reader.n_entries @property def filesize(self) -> int: """ the file size on disk in bytes Returns: int: file size """ return self._reader.filesize @property def timestamp(self) -> datetime: """ unix timestamp of the file creation time Returns: datetime.datetime: creation datetime """ return datetime.fromtimestamp(self._reader._timestamp) def __str__(self): s = "{}:\n".format(self.__class__.__name__) s += "filename: {}\n".format(self.filename) s += "timestamp: {}\n".format(self.timestamp) s += "n_entries: {}\n".format(self._reader.n_entries) s += "file size: {} {}B\n".format(*get_si_prefix(self._reader.filesize)) for k, v in self.metadata.items(): s += "{}: {}\n".format(k, v) s += "file format version: {}".format(self.version) return s
@RawObjectReaderBase._register class RawObjectReaderV1: _protocol_v = 1 _file_header = struct.Struct("<4s4sIQ2H") _bunch_trailer_header = struct.Struct("<3Q2IH") _file_trailer = struct.Struct("<4s4sIQ") _compressors = {"bz2": (bz2, 1)} def __init__(self, file): fileheader_def = RawObjectReaderV1._file_header self.file = file self.file.seek(0) fileheader = self.file.read(fileheader_def.size) marker, extmarker, self._version, self._timestamp, self._compressed, self._lenheadext = fileheader_def.unpack( fileheader ) if marker[:3].decode() != "SOF": raise TypeError("This file appears not to be a stream object file (SOF)") if self._version != RawObjectReaderV1._protocol_v: raise TypeError( "This file is written with protocol V{}" " while this class reads protocol V{}".format( self._version, RawObjectReaderV1._protocol_v ) ) self._compressor = None if self._compressed > 0: compressorsr = {} for k, v in RawObjectReaderV1._compressors.items(): compressorsr[v[1]] = (v[0], k) self._compressor = compressorsr[self._compressed][0] self._compressor_name = compressorsr[self._compressed][1] self._headext = None if self._lenheadext > 0: self._headext = self.file.read(self._lenheadext) self._fp_start = self.file.tell() self.n_bunches = None self.n_entries = None self.filesize = None self._rawindex = {} self._bunch_buffer = {} self._current_index = 0 self._scan_file() def _scan_file(self): from collections import namedtuple BunchTrailer = namedtuple( "BunchTrailer", "bunchoff dataoff fileoff crc bunchsize ndata index objsize" ) self.file.seek(-RawObjectReaderV1._bunch_trailer_header.size, os.SEEK_END) self.filesize = self.file.tell() self.file_index = [0] while self.file.tell() > self._fp_start: # read bunch trailer last_bunch_trailer = self.file.read( RawObjectReaderV1._bunch_trailer_header.size ) bunchoff, dataoff, fileoff, crc, ndata, bunch_n = RawObjectReaderV1._bunch_trailer_header.unpack( last_bunch_trailer ) if(bunchoff>fileoff or dataoff>bunchoff): raise RuntimeError("File offsets are corrupted, unable to read file index") # read bunch index self.file.seek( self.file.tell() - RawObjectReaderV1._bunch_trailer_header.size - ndata * 2 * 4 ) index = struct.unpack( "{}I{}I".format(ndata, ndata), self.file.read(ndata * 2 * 4) ) objsize = np.array(index[ndata:], dtype=np.uint32) self._rawindex[(0, bunch_n)] = BunchTrailer( bunchoff, # Offset to earlier bunch or file header if first bunch dataoff, # Offset to beginning of data in bunch fileoff, # Offset to beginning of file crc, # bunch crc dataoff - ndata * 2 * 4, # Size of data bunch ndata, # number of objects in bunch [0] + list(np.cumsum(objsize[:-1])), # Object offsets in bunch objsize, # object sizes ) self.file.seek(self.file.tell() - bunchoff) self._index = [] self._bunch_index = {} for k, bunch in sorted(self._rawindex.items()): self._bunch_index[k] = (bunch.fileoff - bunch.dataoff, bunch.bunchsize) for i, obj in enumerate(bunch.index): self._index.append((k, int(obj), int(bunch.objsize[i]))) self.n_entries = len(self._index) def _get_bunch(self, bunch_id): if bunch_id in self._bunch_buffer: return self._bunch_buffer[bunch_id] else: self.file.seek(self._bunch_index[bunch_id][0]) bunch = self._compressor.decompress( self.file.read( self.file_index[bunch_id[0]] + self._bunch_index[bunch_id][1] ) ) self._bunch_buffer[bunch_id] = bunch return bunch def read_at(self, ind: int) -> bytes: """Reads one object at the index indicated by `ind` Args: ind (int): the index of the object to be read Returns: bytes: that represent the object Raises: IndexError: if index out of range """ if ind > self.n_entries - 1: raise IndexError( "The requested file object ({}) is out of range".format(ind) ) obji = self._index[ind] if self._compressed: bunch = self._get_bunch(obji[0]) return bunch[obji[1] : obji[1] + obji[2]] else: fpos = self.file_index[obji[0][0]] + self._bunch_index[obji[0]][0] + obji[1] self.file.seek(fpos) return self.file.read(obji[2]) def resetfp(self): """ Resets file pointer to the first object in file """ self._current_index = 0 def read(self) -> bytes: """ Reads one object at the position of the file pointer. Returns: bytes: Bytes that represent the object """ self._current_index += 1 return self.read_at(self._current_index - 1) @RawObjectReaderBase._register class RawObjectReaderV0: _protocol_v = 0 def __init__(self, file): self.file = file self._scan_file() self._timestamp = 0 def reload(self): """ Reload the index table. Useful if the file is being written too when read """ self._scan_file(self.filesize, self.n_entries, self.fpos) def resetfp(self): """ Resets file pointer to the first object in file """ self.file.seek(_file_header.size) def _scan_file(self, offset=0, n_entries=0, fpos=[]): """Summary Args: offset (int, optional): Description n_entries (int, optional): Description fpos (list, optional): Description """ self.file.seek(offset) fh = self.file self.n_entries = n_entries self.fpos = fpos # Skipping file header fp = _file_header.size while True: fh.seek(fp) rd = fh.read(_chunk_header.size) if rd == b"": break self.fpos.append(fp) offset, crc = _chunk_header.unpack(rd) self.n_entries += 1 fp = fh.tell() + offset self.file.seek(_file_header.size) self.filesize = self.fpos[-1] + offset def read_at(self, ind: int) -> bytes: """Reads one object at the index indicated by `ind` Args: ind (int): the index of the object to be read Returns: bytes: that represent the object Raises: IndexError: if index out of range """ if ind > len(self.fpos) - 1: raise IndexError( "The requested file object ({}) is out of range".format(ind) ) self.file.seek(self.fpos[ind]) return self.read() def read(self) -> bytes: """ Reads one object at the position of the file pointer. Returns: bytes: Bytes that represent the object """ sized = self.file.read(_chunk_header.size) if sized == b"": return None size, crc = _chunk_header.unpack(sized) return self.file.read(size) ###End of Raw object IO classes##### class BaseFileWriter: """ A data file writer wrapper class that handles filename enumerators and size limits. Filename enumerators can be date-time or order starting at ``000``. A new file is started if the preceeding file exceeds the filesize limit or if the method ``data_cond()`` returns true. This method may be overidden by inheriting classes """ def __init__( self, file_prefix: str, writer, file_ext: str, folder: str = "", file_enumerator: str = None, filesize_lim: int = None, ): """ Initialize a BaseFileWriter Args: file_prefix (str): The filename prefix (not path) writer (TYPE): A writer class file_ext (str): File extension to be used folder (str, optional): Path to folder where the file should be saved file_enumerator (str, optional): A string that sets which filename enumeration to use (if None no enumeration is used) filesize_lim (int, optional): If file size is above this limit a new file is open (if None no limit is imposed) """ self.file_enumerator = file_enumerator self.folder = folder self.file_prefix = file_prefix # self.log = sslogger.getChild(self.__class__.__name__) self.data_counter = 0 self.file_counter = 1 self.filesize_lim = ((filesize_lim or 0) * 1024 ** 2) or None self._writercls = writer # self.log = writer.log self.file_ext = file_ext self._open_file() def _open_file(self): """ Internal method to open a new file """ self.file_data_counter = 0 if self.file_enumerator == "date": suffix = datetime.utcnow().strftime("%Y-%m-%d_%H%M") elif self.file_enumerator == "order": suffix = "%0.3d" % self.file_counter else: suffix = "" self.filename = os.path.join( self.folder, self.file_prefix + suffix + self.file_ext ) self._writer = self._writercls(self.filename) self.log.info("Opened new file, will write events to file: %s" % self.filename) def _close_file(self): """ Internal method to close the current open file """ self.log.info("Closing file %s" % self.filename) self._writer.close() self.log.info( "FileWriter has written %d events in %g%sB to file %s" % ( self._writer.data_counter, *get_si_prefix(os.stat(self.filename).st_size), self.filename, ) ) def data_cond(self, data): """ A callback method which starts a new file when True is returned Args: data (): received data Returns: bool: if true a new file will be started """ return False def _start_new_file(self): """ Internal method to start a new file. The current file is closed and the file counter is incremented the new file is opened. """ self._close_file() self.file_counter += 1 self._open_file() def write(self, data): """ Write data to file Args: data (TYPE): The data to be written to file """ # Start a new file if self.data_cond(data) returns true if self.data_cond(data) and self.data_counter > 0: self._close_file() self.file_counter += 1 self._open_file() elif self.filesize_lim is not None: if ( self.data_counter % 100 == 0 and os.stat(self.filename).st_size > self.filesize_lim ): self._close_file() self.file_counter += 1 self._open_file() self._writer.write(data) self.data_counter += 1 def close(self): """ Closes the file writer """ self._close_file() self.log.info( "FileWriter has written a" " total of %d events to %d file(s)" % (self.data_counter, self.file_counter) )