9from typing
import List, Optional
15from pyarrow.parquet
import ParquetWriter
16from pyarrow.csv
import CSVWriter
17from pyarrow
import ipc
22Python utilities to help create or manage ntuples and work with them in pandas
25numpy_to_pyarrow_type_map = {
28 np.uint32: pa.uint32(),
29 np.uint64: pa.uint64(),
30 np.float32: pa.float32(),
31 np.float64: pa.float64(),
33 np.object_: pa.string(),
40 Base class to dump ntuples into a non root format of your choosing
43 listname(str): name of the particle list
44 variables(list[str]): list of variables to save for each particle
45 filename(str): name of the output file to be created.
46 Needs to end with ``.csv`` for csv output, ``.parquet`` or ``.pq`` for parquet output,
47 ``.h5``, ``.hdf`` or ``.hdf5`` for hdf5 output and ``.feather`` or ``.arrow`` for feather output
48 hdf_table_name(str): name of the table in the hdf5 file.
49 If not provided, it will be the same as the listname. Defaults to None.
50 event_buffer_size(int): number of events to buffer before writing to disk,
51 higher values will use more memory but result in smaller files.
52 For some formats, like parquet, this also sets the row group size. Defaults to 100.
53 **writer_kwargs: additional keyword arguments to pass to the writer.
54 For details, see the documentation of the respective writer in the apache arrow documentation.
55 For HDF5, these are passed to ``tables.File.create_table``.
56 Only use, if you know what you are doing!
64 hdf_table_name: Optional[str] =
None,
65 event_buffer_size: int = 100,
68 """Constructor to initialize the internal state"""
79 if file_type
in [
"csv"]:
81 elif file_type
in [
"parquet",
"pq"]:
83 elif file_type
in [
"h5",
"hdf",
"hdf5"]:
85 elif file_type
in [
"feather",
"arrow"]:
89 f
"Unknown file type ending .{file_type}, supported types are 'csv', "
90 "'parquet', 'pq', 'h5', 'hdf', 'hdf5', 'feather' or 'arrow'"
94 hdf_table_name
if hdf_table_name
is not None else self.
_listname
105 Setup variable lists, pointers, buffers and file writers
113 for varname
in variables.variables.resolveCollections(
122 self.
_evtmeta = ROOT.Belle2.PyStoreObj(
"EventMetaData")
130 (
"__experiment__", np.int32),
131 (
"__run__", np.int32),
132 (
"__event__", np.uint32),
133 (
"__production__", np.uint32),
134 (
"__candidate__", np.uint32),
135 (
"__ncandidates__", np.uint32),
139 dtypes.append((name, np.float64))
152 elif self.
_format ==
"parquet":
156 elif self.
_format ==
"feather":
162 The buffer slice across multiple entries
169 The buffer slice for the current event
175 Reset the buffer event counter and index
182 "Append" a new event to the buffer by moving the buffer index forward by particle list size
184 Automatically replaces the buffer by a larger one if necessary
186 plist_size = self.
_plist.getListSize()
188 new_buffer = np.empty(
200 Initialize the feather writer using pyarrow
204 (name, numpy_to_pyarrow_type_map[dt])
for name, dt
in self.
_dtypes
209 schema=pa.schema(self.
_schema),
215 Initialize the parquet writer using pyarrow
219 (name, numpy_to_pyarrow_type_map[dt])
for name, dt
in self.
_dtypes
228 Initialize the csv writer using pyarrow
232 (name, numpy_to_pyarrow_type_map[dt])
for name, dt
in self.
_dtypes
239 Initialize the hdf5 writer using pytables
243 self.
_filename, mode=
"w", title=
"Belle2 Variables to HDF5"
245 filters = tables.Filters(complevel=1, complib=
"blosc:lz4", fletcher32=
False)
249 with warnings.catch_warnings():
250 warnings.simplefilter(
"ignore")
258 Assign values for all variables for all particles in the particle list to the current event buffer
263 buf[
"__experiment__"] = self.
_evtmeta.getExperiment()
264 buf[
"__run__"] = self.
_evtmeta.getRun()
265 buf[
"__event__"] = self.
_evtmeta.getEvent()
266 buf[
"__production__"] = self.
_evtmeta.getProduction()
267 buf[
"__ncandidates__"] = len(buf)
268 buf[
"__candidate__"] = np.arange(len(buf))
272 values = np.array(vector.data()).reshape(-1, len(self.
_varnames))
273 for name, col
in zip(self.
_varnames, values.T):
279 check if the buffer is full
285 write the buffer to the output file
288 """Create a new row in the hdf5 file with for each particle in the list"""
293 table = {name: self.
buffer[name]
for name, _
in self.
_dtypes}
294 pa_table = pa.table(table, schema=pa.schema(self.
_schema))
299 elif self.
_format ==
"feather":
304 Event processing function
306 executes the fill_buffer function and writes the data to the output file
307 in chunks of event_buffer_size
318 """save and close the output"""
328 elif self.
_format ==
"parquet":
332 elif self.
_format ==
"feather":
334 ROOT.Belle2.MetadataService.Instance().addNtuple(self.
_filename)
339 Legacy class to not break existing code.
341 This class is a wrapper around `VariablesToTable` that enforces HDF5 output
342 and uses default settings for buffer size and writer arguments.
343 It mostly exists for legacy reasons and new code should use `VariablesToTable` directly.
346 listname(str): name of the particle list
347 variables(list[str]): list of variables to save for each particle
348 filename(str): name of the output file to be created.
349 Must end with ``.h5``, ``.hdf`` or ``.hdf5``.
350 hdf_table_name(str): name of the table in the hdf5 file.
351 If not provided, it will be the same as the listname. Defaults to None.
354 def __init__(self, listname, variables, filename, hdf_table_name: Optional[str] =
None):
356 Constructor for the legacy HDF5 writer.
358 super().
__init__(listname, variables, filename, hdf_table_name)
359 assert self.
_filename.split(
".")[-1]
in [
"h5",
"hdf",
"hdf5"], (
360 "Filename must end with .h5, .hdf or .hdf5 for HDF5 output. "
361 f
"Got {self._filename}"
365def make_mcerrors_readable(dataframe, column="mcErrors"):
367 Take a dataframe containing a column with the output of the :b2:var:`mcErrors`
368 variable from :b2:mod:`VariablesToNTuple` and convert it to a readable set
369 of columns of the form ``{column}_{name}`` where column is the value of the
370 ``column`` argument and ``name`` is one of the :ref:`mcmatching`
371 error flags (without the leading 'c_').
374 dataframe(pandas.DataFrame): the pandas dataframe containing an ntuple
375 with column containing the output of the mcErrors variable
376 column(str): the name containing the values from the mcErrors variable
381 if column
not in dataframe:
382 raise KeyError(f
"Cannot find column '{column}'")
385 mcErrors = dataframe[column].astype(int)
388 for flag
in (e
for e
in dir(ROOT.Belle2.MCMatching)
if e.startswith(
"c_")):
390 value = int(getattr(ROOT.Belle2.MCMatching, flag))
396 name = column + flag[1:]
398 dataframe[name] = mcErrors == 0
400 dataframe[name] = (mcErrors & value) == value
__init__(self, listname, variables, filename, Optional[str] hdf_table_name=None)
_event_buffer_size
Event buffer size.
_parquet_writer
A list of tuples and py.DataTypes to define the pyarrow schema.
_buffer
event variables buffer (will be automatically grown if necessary)
_csv_writer
A list of tuples and py.DataTypes to define the pyarrow schema.
list _varnames
variable names
int _event_buffer_counter
Event buffer counter.
initialize_feather_writer(self)
_filename
Output filename.
_writer_kwargs
writer kwargs
_variables
List of variables.
_std_varnames
std.vector of variable names
_plist
Pointer to the particle list.
initialize_csv_writer(self)
_feather_writer
a writer object to write data into a feather file
str _format
Output format.
_hdf5_writer
The pytable file.
initialize_hdf5_writer(self)
_listname
Particle list name.
list _schema
A list of tuples and py.DataTypes to define the pyarrow schema.
__init__(self, str listname, List[str] variables, str filename, Optional[str] hdf_table_name=None, int event_buffer_size=100, **writer_kwargs)
tuple _table_name
Table name in the hdf5 file.
initialize_parquet_writer(self)
int _buffer_index
current start index in the event variables buffer