9from typing
import List, Optional
15from pyarrow.parquet
import ParquetWriter
16from pyarrow.csv
import CSVWriter
17from pyarrow
import ipc
22Python utilities to help create or manage ntuples and work with them in pandas
25numpy_to_pyarrow_type_map = {
28 np.uint32: pa.uint32(),
29 np.uint64: pa.uint64(),
30 np.float32: pa.float32(),
31 np.float64: pa.float64(),
33 np.object_: pa.string(),
40 Base class to dump ntuples into a non root format of your choosing
48 hdf_table_name: Optional[str] =
None,
49 event_buffer_size: int = 100,
52 """Constructor to initialize the internal state
55 listname(str): name of the particle list
56 variables(list(str)): list of variables to save for each particle
57 filename(str): name of the output file to be created.
58 Needs to end
with `.csv`
for csv output, `.parquet`
or `.pq`
for parquet output,
59 `.h5`, `.hdf`
or `.hdf5`
for hdf5 output
and `.feather`
or `.arrow`
for feather output
60 hdf_table_name(str): name of the table
in the hdf5 file.
61 If
not provided, it will be the same
as the listname
62 event_buffer_size(int): number of events to buffer before writing to disk,
63 higher values will use more memory but write faster
and result
in smaller files
64 **writer_kwargs: additional keyword arguments to
pass to the writer.
65 For details, see the documentation of the writer
in the apache arrow documentation.
66 Only use,
if you know what you are doing!
77 if file_type
in [
"csv"]:
79 elif file_type
in [
"parquet",
"pq"]:
81 elif file_type
in [
"h5",
"hdf",
"hdf5"]:
83 elif file_type
in [
"feather",
"arrow"]:
87 f
"Unknown file type ending .{file_type}, supported types are 'csv', "
88 "'parquet', 'pq', 'h5', 'hdf', 'hdf5', 'feather' or 'arrow'"
92 hdf_table_name
if hdf_table_name
is not None else self.
_listname
104 """Create the hdf5 file and list of variable objects to be used during
112 for varname
in variables.variables.resolveCollections(
120 self.
_evtmeta = ROOT.Belle2.PyStoreObj(
"EventMetaData")
127 (
"__experiment__", np.int32),
128 (
"__run__", np.int32),
129 (
"__event__", np.uint32),
130 (
"__production__", np.uint32),
131 (
"__candidate__", np.uint32),
132 (
"__ncandidates__", np.uint32),
136 dtypes.append((name, np.float64))
143 elif self.
_format ==
"parquet":
147 elif self.
_format ==
"feather":
152 Initialize the feather writer using pyarrow
156 (name, numpy_to_pyarrow_type_map[dt]) for name, dt
in self.
_dtypes
161 schema=pa.schema(self.
_schema),
167 Initialize the parquet writer using pyarrow
171 (name, numpy_to_pyarrow_type_map[dt]) for name, dt
in self.
_dtypes
180 Initialize the csv writer using pyarrow
184 (name, numpy_to_pyarrow_type_map[dt]) for name, dt
in self.
_dtypes
191 Initialize the hdf5 writer using pytables
195 self._filename, mode="w", title=
"Belle2 Variables to HDF5"
197 filters = tables.Filters(complevel=1, complib=
"blosc:lz4", fletcher32=
False)
201 with warnings.catch_warnings():
202 warnings.simplefilter(
"ignore")
210 collect all variables for the particle
in a numpy array
214 buf = np.empty(self.
_plist.getListSize(), dtype=self.
_dtypes)
216 buf[
"__experiment__"] = self.
_evtmeta.getExperiment()
217 buf[
"__run__"] = self.
_evtmeta.getRun()
218 buf[
"__event__"] = self.
_evtmeta.getEvent()
219 buf[
"__production__"] = self.
_evtmeta.getProduction()
220 buf[
"__ncandidates__"] = len(buf)
221 buf[
"__candidate__"] = np.arange(len(buf))
223 for row, p
in zip(buf, self.
_plist):
228 row[name] = variables.variables.evaluate(v.name, p)
233 fill a buffer over multiple events and return it, when self.
248 write the buffer to the output file
252 """Create a new row in the hdf5 file with for each particle in the list"""
255 table = {name: buf[name]
for name, _
in self.
_dtypes}
256 pa_table = pa.table(table, schema=pa.schema(self.
_schema))
261 elif self.
_format ==
"feather":
266 Event processing function
267 executes the fill_buffer function and writes the data to the output file
275 """save and close the output"""
283 elif self.
_format ==
"parquet":
287 elif self.
_format ==
"feather":
289 ROOT.Belle2.MetadataService.Instance().addNtuple(self.
_filename)
294 Legacy class to not break existing code
297 def __init__(self, listname, variables, filename, hdf_table_name: Optional[str] =
None,):
298 super().
__init__(listname, variables, filename, hdf_table_name)
299 assert self.
_filename.split(
".")[-1]
in [
"h5",
"hdf",
"hdf5"], (
300 "Filename must end with .h5, .hdf or .hdf5 for HDF5 output. "
301 f
"Got {self._filename}"
305def make_mcerrors_readable(dataframe, column="mcErrors"):
307 Take a dataframe containing a column with the output of the :b2:var:`mcErrors`
308 variable
from :b2:mod:`VariablesToNTuple`
and convert it to a readable set
309 of columns of the form ``{column}_{name}`` where column
is the value of the
310 ``column`` argument
and ``name``
is one of the :ref:`mcmatching`
311 error flags (without the leading
'c_').
314 dataframe(pandas.DataFrame): the pandas dataframe containing an ntuple
315 with column containing the output of the mcErrors variable
316 column(str): the name containing the values
from the mcErrors variable
321 if column
not in dataframe:
322 raise KeyError(f
"Cannot find column '{column}'")
325 mcErrors = dataframe[column].astype(int)
328 for flag
in (e
for e
in dir(ROOT.Belle2.MCMatching)
if e.startswith(
"c_")):
330 value = int(getattr(ROOT.Belle2.MCMatching, flag))
336 name = column + flag[1:]
338 dataframe[name] = mcErrors == 0
340 dataframe[name] = (mcErrors & value) == value
def __init__(self, listname, variables, filename, Optional[str] hdf_table_name=None)
def initialize_parquet_writer(self)
def write_buffer(self, buf)
_event_buffer_counter
Event buffer counter.
def initialize_csv_writer(self)
_event_buffer_size
Event buffer size.
_parquet_writer
a writer object to write data into a parquet file
def __init__(self, str listname, List[str] variables, str filename, Optional[str] hdf_table_name=None, int event_buffer_size=100, **writer_kwargs)
_table_name
Table name in the hdf5 file.
def fill_event_buffer(self)
_csv_writer
a writer object to write data into a csv file
def initialize_feather_writer(self)
def initialize_hdf5_writer(self)
_filename
Output filename.
_writer_kwargs
writer kwargs
_variables
List of variables.
_plist
Pointer to the particle list.
_var_objects
variable objects for each variable
_feather_writer
a writer object to write data into a feather file
_hdf5_writer
The pytable file.
_listname
Particle list name.
_schema
A list of tuples and py.DataTypes to define the pyarrow schema.