"""
A tool to extract data from KM3NeT ROOT files (offline format)
to HDF5 fast (hence the f).
Usage:
h5extractf [options] FILENAME
h5extractf (-h | --help)
Options:
-o OUTFILE Output file.
--provenance-file=FILENAME The file to store the provenance information.
--without-full-reco Don't include all reco tracks, only the best.
--without-calibration Don't include calibration information for offline hits.
-h --help Show this screen.
"""
import time
from uuid import uuid4
import numpy as np
import h5py
import awkward as ak
from thepipe import Provenance
import km3io
import km3pipe as kp
[docs]
log = kp.logger.get_logger("h5extractf")
def _parse_w2list(f):
try:
w2list_idx = km3io.tools.get_w2list_idx(f)
except AttributeError:
return None
if w2list_idx is None:
return None
return _ak_to_numpy(f.w2list, list(w2list_idx.values()))
def _branch_to_numpy(branch, fields):
"""
Read 1D or 2D fields from a branch and convert them to numpy arrays.
Parameters
----------
branch : km3io.rootio.Branch
Branch of an offline file.
fields : list
The field names. If an entry is a tuple instead of a str,
first entry is the field name, secound entry is its h5 name.
Returns
-------
tuple
Numpy-fied awkward array, length 2.
First entry is a dict with fields as keys, and 1D np.arrays as values.
Secound entry is a 1D np.array, the number of items for each event.
"""
data, n_items = {}, None
# would be better to read out all fields at once
for field in fields:
if isinstance(field, (tuple, list)):
field, column = field
else:
column = field
d = branch.__getattr__(field)
n_dims = d.ndim
if n_dims == 1:
if n_items is None:
n_items = np.ones(len(d), dtype="int64")
elif n_dims == 2:
if n_items is None:
n_items = ak.num(d).to_numpy()
d = ak.flatten(d)
else:
raise ValueError("Can not process field", field)
data[column] = d.to_numpy()
return data, n_items
def _ak_to_numpy(ak_array, fields):
"""
Convert the given awkward array to a numpy table.
Parameters
----------
ak_array : awkward.Array
The awkward array, 2D or 3D.
fields : List
The column names of the last axis of the array.
Returns
-------
np_branch : tuple
Numpy-fied awkward array. See output of _branch_to_numpy.
"""
n_dims = ak_array.ndim - 1
if n_dims == 1:
n_items = np.ones(len(ak_array), dtype="int64")
elif n_dims == 2:
n_items = ak.num(ak_array).to_numpy()
ak_array = ak.flatten(ak_array)
else:
raise ValueError("Cannot process array")
if len(ak_array) == 0:
return {
fields[i]: ak_array.to_numpy().reshape((0,)) for i in range(len(fields))
}, n_items
filled = np.ma.filled(
ak.pad_none(ak_array, target=len(fields), axis=-1).to_numpy(),
fill_value=np.nan,
)
return {fields[i]: filled[:, i] for i in range(len(fields))}, n_items
def _yield_tracks(tracks, fields, without_full_reco=False):
"""
Yield info from the tracks branch.
Parameters
----------
tracks : km3io.rootio.Branch
The tracks branch.
fields : list
The fields to read for each track.
without_full_reco : bool
Don't include all reco tracks, only the best.
Yields
------
name : str
Name of how the tracks where chosen.
np_branch : tuple
Numpy-fied awkward array. See output of _branch_to_numpy.
"""
nmbr_events = len(tracks)
if nmbr_events == 0:
log.warn("No events found")
return
track_fmap = {
"best_jmuon": (
km3io.definitions.reconstruction.JMUONPREFIT,
km3io.tools.best_jmuon,
),
"best_jshower": (
km3io.definitions.reconstruction.JSHOWERPREFIT,
km3io.tools.best_jshower,
),
"best_dusjshower": (
km3io.definitions.reconstruction.DUSJSHOWERPREFIT,
km3io.tools.best_dusjshower,
),
"best_aashower": (
km3io.definitions.reconstruction.AASHOWERFITPREFIT,
km3io.tools.best_aashower,
),
}
if not without_full_reco:
track_fmap["tracks"] = (None, None)
n_columns = max(km3io.definitions.fitparameters.values()) + 1
for name, (stage, func) in track_fmap.items():
if stage is None or stage in tracks.rec_stages:
if func is None:
sel_tracks = tracks
else:
sel_tracks = func(tracks)
np_branch = _branch_to_numpy(sel_tracks, fields)
try:
np_fitinf = _ak_to_numpy(sel_tracks.fitinf, range(n_columns))
except ValueError as e:
log.warning(
"Cannot convert fitinf (%s): probably unsupported KM3NeT dataformat version or empty branch. Skipping..."
% e
)
continue
for fitparam, idx in km3io.definitions.fitparameters.items():
np_branch[0][fitparam] = np_fitinf[0][idx].astype("float32")
yield name, np_branch
def _to_hdf(f, name, np_branch, with_group_id=True):
"""
Save a numpy-fied awkward array as a dataset to hdf5.
Parameters
----------
f : h5py.File
The opened h5 file.
name : str
Name of the dataset to create.
np_branch : tuple
The data to save. See output of _branch_to_numpy.
with_group_id : bool
If False (default), save with indices as a sperate dataset.
If true, save with group_id as an additional column.
"""
# TODO set default of with_group_id to False once km3pipe supports
# indexed recarrays
hdf_settings = {
"compression": "gzip",
"compression_opts": 5,
"shuffle": True,
"fletcher32": True,
"chunks": True,
}
data, n_items = np_branch
if with_group_id:
data["group_id"] = np.repeat(np.arange(len(n_items)), n_items)
else:
index = np.concatenate([[0], np.cumsum(n_items)[:-1]])
indices = np.core.records.fromarrays(
[index, n_items], names=["index", "n_items"]
)
f.create_dataset(name + "_indices", data=indices, **hdf_settings)
f.create_dataset(
name,
data=np.core.records.fromarrays(list(data.values()), names=list(data.keys())),
**hdf_settings,
)
[docs]
def main():
from docopt import docopt
args = docopt(__doc__, version=kp.version)
outfile = args["-o"]
if outfile is None:
outfile = args["FILENAME"] + ".h5"
provfile = args["--provenance-file"]
if provfile is None:
provfile = outfile + ".prov.json"
Provenance().outfile = provfile
h5extractf(
args["FILENAME"],
outfile,
without_full_reco=args["--without-full-reco"],
without_calibration=args["--without-calibration"],
)
if __name__ == "__main__":
main()