# Filename: hdf5.py
# pylint: disable=C0103,R0903,C901
# vim:set ts=4 sts=4 sw=4 et:
"""
Read and write KM3NeT-formatted HDF5 files.
"""
from collections import OrderedDict, defaultdict, namedtuple
from functools import singledispatch
import os.path
import warnings
from uuid import uuid4
import numpy as np
import tables as tb
import km3io
from thepipe import Provenance
try:
from numba import jit
except ImportError:
import km3pipe as kp
from thepipe import Module, Blob
from km3pipe.dataclasses import Table, NDArray
from km3pipe.logger import get_logger
from km3pipe.tools import decamelise, camelise, split, istype
[docs]
log = get_logger(__name__) # pylint: disable=C0103
__author__ = "Tamas Gal and Moritz Lotze and Michael Moser"
__copyright__ = "Copyright 2016, Tamas Gal and the KM3NeT collaboration."
__credits__ = []
__license__ = "MIT"
__maintainer__ = "Tamas Gal and Moritz Lotze"
__email__ = "tgal@km3net.de"
__status__ = "Development"
[docs]
class H5VersionError(Exception):
pass
[docs]
def check_version(h5file):
try:
version = np.string_(h5file.root._v_attrs.format_version)
except AttributeError:
log.error(
"Could not determine HDF5 format version: '%s'."
"You may encounter unexpected errors! Good luck..." % h5file.filename
)
return
if split(version, int, np.string_(".")) < split(
MINIMUM_FORMAT_VERSION, int, np.string_(".")
):
raise H5VersionError(
"HDF5 format version {0} or newer required!\n"
"'{1}' has HDF5 format version {2}.".format(
MINIMUM_FORMAT_VERSION.decode("utf-8"),
h5file.filename,
version.decode("utf-8"),
)
)
[docs]
class HDF5IndexTable:
def __init__(self, h5loc, start=0):
self.h5loc = h5loc
self._data = defaultdict(list)
self._index = 0
if start > 0:
self._data["indices"] = [0] * start
self._data["n_items"] = [0] * start
[docs]
def append(self, n_items):
self._data["indices"].append(self._index)
self._data["n_items"].append(n_items)
self._index += n_items
@property
[docs]
def data(self):
return self._data
[docs]
def fillup(self, length):
missing = length - len(self)
self._data["indices"] += [self.data["indices"][-1]] * missing
self._data["n_items"] += [0] * missing
def __len__(self):
return len(self.data["indices"])
[docs]
class HDF5Sink(Module):
"""Write KM3NeT-formatted HDF5 files, event-by-event.
The data can be a ``kp.Table``, a numpy structured array,
a pandas DataFrame, or a simple scalar.
The name of the corresponding H5 table is the decamelised
blob-key, so values which are stored in the blob under `FooBar`
will be written to `/foo_bar` in the HDF5 file.
Parameters
----------
filename: str, optional [default: 'dump.h5']
Where to store the events.
h5file: pytables.File instance, optional [default: None]
Opened file to write to. This is mutually exclusive with filename.
keys: list of strings, optional
List of Blob-keys to write, everything else is ignored.
complib : str [default: zlib]
Compression library that should be used.
'zlib', 'lzf', 'blosc' and all other PyTables filters
are available.
complevel : int [default: 5]
Compression level.
chunksize : int [optional]
Chunksize that should be used for saving along the first axis
of the input array.
flush_frequency: int, optional [default: 500]
The number of iterations to cache tables and arrays before
dumping to disk.
pytab_file_args: dict [optional]
pass more arguments to the pytables File init
n_rows_expected = int, optional [default: 10000]
append: bool, optional [default: False]
reset_group_id: bool, optional [default: True]
Resets the group_id so that it's continuous in the output file.
Use this with care!
Notes
-----
Provides service write_table(tab, h5loc=None): tab:Table, h5loc:str
The table to write, with ".h5loc" set or to h5loc if specified.
"""
def _to_array(self, data, name=None):
if data is None:
return
if np.isscalar(data):
self.log.debug("toarray: is a scalar")
return Table(
{name: np.asarray(data).reshape((1,))},
h5loc="/misc/{}".format(decamelise(name)),
name=name,
)
if hasattr(data, "len") and len(data) <= 0: # a bit smelly ;)
self.log.debug("toarray: data has no length")
return
# istype instead isinstance, to avoid heavy pandas import (hmmm...)
if istype(data, "DataFrame"): # noqa
self.log.debug("toarray: pandas dataframe")
data = Table.from_dataframe(data)
return data
def _cache_ndarray(self, arr):
self._ndarrays_cache[arr.h5loc].append(arr)
def _write_ndarrays_cache_to_disk(self):
"""Writes all the cached NDArrays to disk and empties the cache"""
for h5loc, arrs in self._ndarrays_cache.items():
title = arrs[0].title
chunkshape = (
(self.chunksize,) + arrs[0].shape[1:]
if self.chunksize is not None
else None
)
arr = NDArray(np.concatenate(arrs), h5loc=h5loc, title=title)
if h5loc not in self._ndarrays:
loc, tabname = os.path.split(h5loc)
ndarr = self.h5file.create_earray(
loc,
tabname,
tb.Atom.from_dtype(arr.dtype),
(0,) + arr.shape[1:],
chunkshape=chunkshape,
title=title,
filters=self.filters,
createparents=True,
)
self._ndarrays[h5loc] = ndarr
else:
ndarr = self._ndarrays[h5loc]
# for arr_length in (len(a) for a in arrs):
# self._record_index(h5loc, arr_length)
ndarr.append(arr)
self._ndarrays_cache = defaultdict(list)
[docs]
def write_table(self, table, h5loc=None):
"""Write a single table to the HDF5 file, exposed as a service"""
self.log.debug("Writing table %s", table.name)
if h5loc is None:
h5loc = table.h5loc
self._write_table(h5loc, table, table.name)
def _write_table(self, h5loc, arr, title):
level = len(h5loc.split("/"))
if h5loc not in self._tables:
dtype = arr.dtype
if any("U" in str(dtype.fields[f][0]) for f in dtype.fields):
self.log.error(
"Cannot write data to '{}'. Unicode strings are not supported!".format(
h5loc
)
)
return
loc, tabname = os.path.split(h5loc)
self.log.debug(
"h5loc '{}', Loc '{}', tabname '{}'".format(h5loc, loc, tabname)
)
with warnings.catch_warnings():
warnings.simplefilter("ignore", tb.NaturalNameWarning)
tab = self.h5file.create_table(
loc,
tabname,
chunkshape=self.chunksize,
description=dtype,
title=title,
filters=self.filters,
createparents=True,
expectedrows=self.n_rows_expected,
)
tab._v_attrs.datatype = title
if level < 5:
self._tables[h5loc] = tab
else:
tab = self._tables[h5loc]
h5_colnames = set(tab.colnames)
tab_colnames = set(arr.dtype.names)
if h5_colnames != tab_colnames:
missing_cols = h5_colnames - tab_colnames
if missing_cols:
self.log.info("Missing columns in table, trying to append NaNs.")
arr = arr.append_columns(
missing_cols, np.full((len(missing_cols), len(arr)), np.nan)
)
if arr.dtype != tab.dtype:
self.log.error(
"Differing dtypes after appending "
"missing columns to the table! Skipping..."
)
return
if arr.dtype != tab.dtype:
try:
arr = Table(arr, dtype=tab.dtype)
except ValueError:
self.log.critical(
"Cannot write a table to '%s' since its dtype is "
"different compared to the previous table with the same "
"HDF5 location, which was used to fix the dtype of the "
"HDF5 compund type." % h5loc
)
raise
tab.append(arr)
if level < 4:
tab.flush()
def _write_separate_columns(self, where, obj, title):
f = self.h5file
loc, group_name = os.path.split(where)
if where not in f:
group = f.create_group(loc, group_name, title, createparents=True)
group._v_attrs.datatype = title
else:
group = f.get_node(where)
for col, (dt, _) in obj.dtype.fields.items():
data = obj.__array__()[col]
if col not in group:
a = tb.Atom.from_dtype(dt)
arr = f.create_earray(
group, col, a, (0,), col.capitalize(), filters=self.filters
)
else:
arr = getattr(group, col)
arr.append(data)
# create index table
# if where not in self.indices:
# self.indices[where] = HDF5IndexTable(where + "/_indices", start=self.index)
self._record_index(where, len(data), split=True)
def _process_entry(self, key, entry):
self.log.debug("Inspecting {}".format(key))
if (
hasattr(entry, "h5singleton")
and entry.h5singleton
and entry.h5loc in self._singletons_written
):
self.log.debug(
"Skipping '%s' since it's a singleton and already written."
% entry.h5loc
)
return
if not hasattr(entry, "h5loc"):
self.log.debug("Ignoring '%s': no h5loc attribute" % key)
return
if isinstance(entry, NDArray):
self._cache_ndarray(entry)
self._record_index(entry.h5loc, len(entry))
return entry
try:
title = entry.name
except AttributeError:
title = key
if isinstance(entry, Table) and not entry.h5singleton:
if "group_id" not in entry:
entry = entry.append_columns("group_id", self.index)
elif self._reset_group_id:
# reset group_id to the HDF5Sink's continuous counter
entry.group_id = self.index
self.log.debug("h5l: '{}', title '{}'".format(entry.h5loc, title))
if hasattr(entry, "split_h5") and entry.split_h5:
self.log.debug("Writing into separate columns...")
self._write_separate_columns(entry.h5loc, entry, title=title)
else:
self.log.debug("Writing into single Table...")
self._write_table(entry.h5loc, entry, title=title)
if hasattr(entry, "h5singleton") and entry.h5singleton:
self._singletons_written[entry.h5loc] = True
return entry
[docs]
def process(self, blob):
written_blob = Blob()
for key, entry in sorted(blob.items()):
if self.keys and key not in self.keys:
self.log.info("Skipping blob, since it's not in the keys list")
continue
self.log.debug("Processing %s", key)
data = self._process_entry(key, entry)
if data is not None:
written_blob[key] = data
if "GroupInfo" not in blob:
gi = Table(
{"group_id": self.index, "blob_length": len(written_blob)},
h5loc="/group_info",
name="Group Info",
)
self._process_entry("GroupInfo", gi)
# fill up NDArray indices with 0 entries if needed
if written_blob:
ndarray_h5locs = set(self._ndarrays.keys()).union(
self._ndarrays_cache.keys()
)
written_h5locs = set(
e.h5loc for e in written_blob.values() if isinstance(e, NDArray)
)
missing_h5locs = ndarray_h5locs - written_h5locs
for h5loc in missing_h5locs:
self.log.info("Filling up %s with 0 length entry", h5loc)
self._record_index(h5loc, 0)
if not self.index % self.flush_frequency:
self.flush()
self.index += 1
return blob
def _record_index(self, h5loc, count, split=False):
"""Add an index entry (optionally create table) for an NDArray h5loc.
Parameters
----------
h5loc : str
location in HDF5
count : int
number of elements (can be 0)
split : bool
if it's a split table
"""
suffix = "/_indices" if split else "_indices"
idx_table_h5loc = h5loc + suffix
if idx_table_h5loc not in self.indices:
self.indices[idx_table_h5loc] = HDF5IndexTable(
idx_table_h5loc, start=self.index
)
idx_tab = self.indices[idx_table_h5loc]
idx_tab.append(count)
[docs]
def flush(self):
"""Flush tables and arrays to disk"""
self.log.info("Flushing tables and arrays to disk...")
for tab in self._tables.values():
tab.flush()
self._write_ndarrays_cache_to_disk()
[docs]
def finish(self):
self.flush()
self.h5file.root._v_attrs.km3pipe = np.string_(kp.__version__)
self.h5file.root._v_attrs.pytables = np.string_(tb.__version__)
self.h5file.root._v_attrs.kid = np.string_(self._uuid)
self.h5file.root._v_attrs.format_version = np.string_(FORMAT_VERSION)
self.log.info("Adding index tables.")
for where, idx_tab in self.indices.items():
# any skipped NDArrays or split groups will be filled with 0 entries
idx_tab.fillup(self.index)
self.log.debug("Creating index table for '%s'" % where)
h5loc = idx_tab.h5loc
self.log.info(" -> {0}".format(h5loc))
indices = Table(
{"index": idx_tab.data["indices"], "n_items": idx_tab.data["n_items"]},
h5loc=h5loc,
)
self._write_table(h5loc, indices, title="Indices")
self.log.info(
"Creating pytables index tables. " "This may take a few minutes..."
)
for tab in self._tables.values():
if "frame_id" in tab.colnames:
tab.cols.frame_id.create_index()
if "slice_id" in tab.colnames:
tab.cols.slice_id.create_index()
if "dom_id" in tab.colnames:
tab.cols.dom_id.create_index()
if "event_id" in tab.colnames:
try:
tab.cols.event_id.create_index()
except NotImplementedError:
log.warning(
"Table '{}' has an uint64 column, "
"not indexing...".format(tab._v_name)
)
if "group_id" in tab.colnames:
try:
tab.cols.group_id.create_index()
except NotImplementedError:
log.warning(
"Table '{}' has an uint64 column, "
"not indexing...".format(tab._v_name)
)
tab.flush()
if "HDF5MetaData" in self.services:
self.log.info("Writing HDF5 meta data.")
metadata = self.services["HDF5MetaData"]
for name, value in metadata.items():
self.h5file.set_node_attr("/", name, value)
if not self.keep_open:
self.h5file.close()
self.cprint("HDF5 file written to: {}".format(self.filename))
[docs]
class HDF5Pump(Module):
"""Read KM3NeT-formatted HDF5 files, event-by-event.
Parameters
----------
filename: str
From where to read events. Either this OR ``filenames`` needs to be
defined.
skip_version_check: bool [default: False]
Don't check the H5 version. Might lead to unintended consequences.
shuffle: bool, optional [default: False]
Shuffle the group_ids, so that the blobs are mixed up.
shuffle_function: function, optional [default: np.random.shuffle
The function to be used to shuffle the group IDs.
reset_index: bool, optional [default: True]
When shuffle is set to true, reset the group ID - start to count
the group_id by 0.
Notes
-----
Provides service h5singleton(h5loc): h5loc:str -> kp.Table
Singleton tables for a given HDF5 location.
"""
def _read_group_info(self):
h5file = self.h5file
if "/group_info" not in h5file:
self.log.critical("Missing /group_info '%s', aborting..." % h5file.filename)
raise SystemExit
self.log.info("Reading group information from '/group_info'.")
group_info = h5file.get_node("/", "group_info")
self.group_ids = group_info.cols.group_id[:]
self._n_groups = len(self.group_ids)
if "/raw_header" in h5file:
self.log.info("Reading /raw_header")
try:
self.header = HDF5Header.from_pytable(h5file.get_node("/raw_header"))
except TypeError:
self.log.error("Could not parse the raw header, skipping!")
if self.shuffle:
self.log.info("Shuffling group IDs")
self.shuffle_function(self.group_ids)
[docs]
def h5singleton(self, h5loc):
"""Returns the singleton table for a given HDF5 location"""
return self._singletons[h5loc]
[docs]
def process(self, blob):
self.log.info("Reading blob at index %s" % self.index)
if self.index >= self._n_groups:
self.log.info("All groups are read.")
raise StopIteration
blob = self.get_blob(self.index)
self.index += 1
return blob
[docs]
def get_blob(self, index):
blob = Blob()
group_id = self.group_ids[index]
# skip groups with separate columns
# and deal with them later
# this should be solved using hdf5 attributes in near future
split_table_locs = []
ndarray_locs = []
for tab in self.h5file.walk_nodes(classname="Table"):
h5loc = tab._v_pathname
loc, tabname = os.path.split(h5loc)
if tabname in self.indices:
self.log.info("index table '%s' already read, skip..." % h5loc)
continue
if loc in split_table_locs:
self.log.info("get_blob: '%s' is noted, skip..." % h5loc)
continue
if tabname == "_indices":
self.log.debug("get_blob: found index table '%s'" % h5loc)
split_table_locs.append(loc)
self.indices[loc] = self.h5file.get_node(h5loc)
continue
if tabname.endswith("_indices"):
self.log.debug("get_blob: found index table '%s' for NDArray" % h5loc)
ndarr_loc = h5loc.replace("_indices", "")
ndarray_locs.append(ndarr_loc)
if ndarr_loc in self.indices:
self.log.info(
"index table for NDArray '%s' already read, skip..." % ndarr_loc
)
continue
_index_table = self.h5file.get_node(h5loc)
self.indices[ndarr_loc] = {
"index": _index_table.col("index")[:],
"n_items": _index_table.col("n_items")[:],
}
continue
tabname = camelise(tabname)
if "group_id" in tab.dtype.names:
try:
if h5loc not in self._tab_indices:
self._read_tab_indices(h5loc)
tab_idx_start = self._tab_indices[h5loc][0][group_id]
tab_n_items = self._tab_indices[h5loc][1][group_id]
if tab_n_items == 0:
continue
arr = tab[tab_idx_start : tab_idx_start + tab_n_items]
except IndexError:
self.log.debug("No data for h5loc '%s'" % h5loc)
continue
except NotImplementedError:
# 64-bit unsigned integer columns like ``group_id``
# are not yet supported in conditions
self.log.debug(
"get_blob: found uint64 column at '{}'...".format(h5loc)
)
arr = tab.read()
arr = arr[arr["group_id"] == group_id]
except ValueError:
# "there are no columns taking part
# in condition ``group_id == 0``"
self.log.info(
"get_blob: no `%s` column found in '%s'! "
"skipping... " % ("group_id", h5loc)
)
continue
else:
if h5loc not in self._singletons:
log.info("Caching H5 singleton: {} ({})".format(tabname, h5loc))
self._singletons[h5loc] = Table(
tab.read(),
h5loc=h5loc,
split_h5=False,
name=tabname,
h5singleton=True,
)
blob[tabname] = self._singletons[h5loc]
continue
self.log.debug("h5loc: '{}'".format(h5loc))
tab = Table(arr, h5loc=h5loc, split_h5=False, name=tabname)
if self.shuffle and self.reset_index:
tab.group_id[:] = index
blob[tabname] = tab
# skipped locs are now column wise datasets (usually hits)
# currently hardcoded, in future using hdf5 attributes
# to get the right constructor
for loc in split_table_locs:
# if some events are missing (group_id not continuous),
# this does not work as intended
# idx, n_items = self.indices[loc][group_id]
idx = self.indices[loc].col("index")[group_id]
n_items = self.indices[loc].col("n_items")[group_id]
end = idx + n_items
node = self.h5file.get_node(loc)
columns = (c for c in node._v_children if c != "_indices")
data = {}
for col in columns:
data[col] = self.h5file.get_node(loc + "/" + col)[idx:end]
tabname = camelise(loc.split("/")[-1])
s_tab = Table(data, h5loc=loc, split_h5=True, name=tabname)
if self.shuffle and self.reset_index:
s_tab.group_id[:] = index
blob[tabname] = s_tab
if self.header is not None:
blob["Header"] = self.header
for ndarr_loc in ndarray_locs:
self.log.info("Reading %s" % ndarr_loc)
try:
idx = self.indices[ndarr_loc]["index"][group_id]
n_items = self.indices[ndarr_loc]["n_items"][group_id]
except IndexError:
continue
end = idx + n_items
ndarr = self.h5file.get_node(ndarr_loc)
ndarr_name = camelise(ndarr_loc.split("/")[-1])
_ndarr = NDArray(
ndarr[idx:end], h5loc=ndarr_loc, title=ndarr.title, group_id=group_id
)
if self.shuffle and self.reset_index:
_ndarr.group_id = index
blob[ndarr_name] = _ndarr
return blob
def _read_tab_indices(self, h5loc):
self.log.info("Reading table indices for '{}'".format(h5loc))
node = self.h5file.get_node(h5loc)
group_ids = None
if "group_id" in node.dtype.names:
group_ids = self.h5file.get_node(h5loc).cols.group_id[:]
else:
self.log.error("No data found in '{}'".format(h5loc))
return
self._tab_indices[h5loc] = create_index_tuple(group_ids)
def __len__(self):
self.log.info("Opening all HDF5 files to check the number of groups")
n_groups = 0
for filename in self.filenames:
with tb.open_file(filename, "r") as h5file:
group_info = h5file.get_node("/", "group_info")
self.group_ids = group_info.cols.group_id[:]
n_groups += len(self.group_ids)
return n_groups
def __iter__(self):
return self
def __next__(self):
# TODO: wrap that in self._check_if_next_file_is_needed(self.index)
if self.index >= self._n_groups:
self.log.info("All groups are read")
raise StopIteration
blob = self.get_blob(self.index)
self.index += 1
return blob
def __getitem__(self, index):
if isinstance(index, int):
return self.get_blob(index)
elif isinstance(index, slice):
return self._slice_generator(index)
else:
raise TypeError("index must be int or slice")
def _slice_generator(self, index):
"""A simple slice generator for iterations"""
start, stop, step = index.indices(len(self))
for i in range(start, stop, step):
yield self.get_blob(i)
self.filename = None
def _close_h5file(self):
if self.h5file:
self.h5file.close()
[docs]
def finish(self):
self._close_h5file()
@jit(nopython=True)
[docs]
def create_index_tuple(group_ids):
"""An helper function to create index tuples for fast lookup in HDF5Pump"""
max_group_id = np.max(group_ids)
start_idx_arr = np.full(max_group_id + 1, 0)
n_items_arr = np.full(max_group_id + 1, 0)
current_group_id = group_ids[0]
current_idx = 0
item_count = 0
for group_id in group_ids:
if group_id != current_group_id:
start_idx_arr[current_group_id] = current_idx
n_items_arr[current_group_id] = item_count
current_idx += item_count
item_count = 0
current_group_id = group_id
item_count += 1
else:
start_idx_arr[current_group_id] = current_idx
n_items_arr[current_group_id] = item_count
return (start_idx_arr, n_items_arr)
@singledispatch
@header2table.register(dict)
def _(header_dict):
if not header_dict:
print("Empty header dictionary.")
return
tab_dict = defaultdict(list)
for parameter, data in header_dict.items():
fields = []
values = []
types = []
for field_name, field_value in data.items():
fields.append(field_name)
values.append(str(field_value))
try:
_ = float(field_value) # noqa
types.append("f4")
except ValueError:
types.append("a{}".format(len(field_value)))
except TypeError: # e.g. values is None
types.append("a{}".format(len(str(field_value))))
tab_dict["parameter"].append(parameter.encode())
tab_dict["field_names"].append(" ".join(fields).encode())
tab_dict["field_values"].append(" ".join(values).encode())
tab_dict["dtype"].append(" ".join(types).encode())
log.debug(
"{}: {} {} {}".format(
tab_dict["parameter"][-1],
tab_dict["field_names"][-1],
tab_dict["field_values"][-1],
tab_dict["dtype"][-1],
)
)
return Table(tab_dict, h5loc="/raw_header", name="RawHeader", h5singleton=True)
@header2table.register(km3io.offline.Header)
def _(header):
out = {}
for parameter, values in header._data.items():
try:
values = values._asdict()
except AttributeError:
# single entry without further parameter name
# in specification
values = {parameter + "_0": values}
out[parameter] = values
return header2table(out)
@header2table.register(HDF5Header)
def _(header):
return header2table(header._data)