9from typing
import List, Optional
15from pyarrow.parquet
import ParquetWriter
16from pyarrow.csv
import CSVWriter
17from pyarrow
import ipc
22Python utilities to help create or manage ntuples and work with them in pandas
25numpy_to_pyarrow_type_map = {
28 np.uint32: pa.uint32(),
29 np.uint64: pa.uint64(),
30 np.float32: pa.float32(),
31 np.float64: pa.float64(),
33 np.object_: pa.string(),
40 Base class to dump ntuples into a non root format of your choosing
48 hdf_table_name: Optional[str] =
None,
49 event_buffer_size: int = 100,
52 """Constructor to initialize the internal state
55 listname(str): name of the particle list
56 variables(list(str)): list of variables to save for each particle
57 filename(str): name of the output file to be created.
58 Needs to end
with `.csv`
for csv output, `.parquet`
or `.pq`
for parquet output,
59 `.h5`, `.hdf`
or `.hdf5`
for hdf5 output
and `.feather`
or `.arrow`
for feather output
60 hdf_table_name(str): name of the table
in the hdf5 file.
61 If
not provided, it will be the same
as the listname
62 event_buffer_size(int): number of events to buffer before writing to disk,
63 higher values will use more memory but write faster
and result
in smaller files
64 **writer_kwargs: additional keyword arguments to
pass to the writer.
65 For details, see the documentation of the writer
in the apache arrow documentation.
66 Only use,
if you know what you are doing!
77 if file_type
in [
"csv"]:
79 elif file_type
in [
"parquet",
"pq"]:
81 elif file_type
in [
"h5",
"hdf",
"hdf5"]:
83 elif file_type
in [
"feather",
"arrow"]:
87 f
"Unknown file type ending .{file_type}, supported types are 'csv', "
88 "'parquet', 'pq', 'h5', 'hdf', 'hdf5', 'feather' or 'arrow'"
92 hdf_table_name
if hdf_table_name
is not None else self.
_listname
103 Setup variable lists, pointers, buffers and file writers
111 for varname
in variables.variables.resolveCollections(
120 self.
_evtmeta = ROOT.Belle2.PyStoreObj(
"EventMetaData")
128 (
"__experiment__", np.int32),
129 (
"__run__", np.int32),
130 (
"__event__", np.uint32),
131 (
"__production__", np.uint32),
132 (
"__candidate__", np.uint32),
133 (
"__ncandidates__", np.uint32),
137 dtypes.append((name, np.float64))
150 elif self.
_format ==
"parquet":
154 elif self.
_format ==
"feather":
160 The buffer slice across multiple entries
167 The buffer slice for the current event
173 Reset the buffer event counter and index
180 "Append" a new event to the buffer by moving the buffer index forward by particle list size
182 Automatically replaces the buffer by a larger one
if necessary
184 plist_size = self._plist.getListSize()
186 new_buffer = np.empty(
198 Initialize the feather writer using pyarrow
202 (name, numpy_to_pyarrow_type_map[dt]) for name, dt
in self.
_dtypes
207 schema=pa.schema(self.
_schema),
213 Initialize the parquet writer using pyarrow
217 (name, numpy_to_pyarrow_type_map[dt]) for name, dt
in self.
_dtypes
226 Initialize the csv writer using pyarrow
230 (name, numpy_to_pyarrow_type_map[dt]) for name, dt
in self.
_dtypes
237 Initialize the hdf5 writer using pytables
241 self._filename, mode="w", title=
"Belle2 Variables to HDF5"
243 filters = tables.Filters(complevel=1, complib=
"blosc:lz4", fletcher32=
False)
247 with warnings.catch_warnings():
248 warnings.simplefilter(
"ignore")
256 Assign values for all variables
for all particles
in the particle list to the current event buffer
261 buf[
"__experiment__"] = self.
_evtmeta.getExperiment()
262 buf[
"__run__"] = self.
_evtmeta.getRun()
263 buf[
"__event__"] = self.
_evtmeta.getEvent()
264 buf[
"__production__"] = self.
_evtmeta.getProduction()
265 buf[
"__ncandidates__"] = len(buf)
266 buf[
"__candidate__"] = np.arange(len(buf))
270 values = np.array(vector.data()).reshape(-1, len(self.
_varnames))
271 for name, col
in zip(self.
_varnames, values.T):
277 check if the buffer
is full
283 write the buffer to the output file
286 """Create a new row in the hdf5 file with for each particle in the list"""
289 table = {name: self.
buffer[name]
for name, _
in self.
_dtypes}
290 pa_table = pa.table(table, schema=pa.schema(self.
_schema))
295 elif self.
_format ==
"feather":
300 Event processing function
302 executes the fill_buffer function and writes the data to the output file
303 in chunks of event_buffer_size
312 """save and close the output"""
320 elif self.
_format ==
"parquet":
324 elif self.
_format ==
"feather":
326 ROOT.Belle2.MetadataService.Instance().addNtuple(self.
_filename)
331 Legacy class to not break existing code
334 def __init__(self, listname, variables, filename, hdf_table_name: Optional[str] =
None,):
335 super().
__init__(listname, variables, filename, hdf_table_name)
336 assert self.
_filename.split(
".")[-1]
in [
"h5",
"hdf",
"hdf5"], (
337 "Filename must end with .h5, .hdf or .hdf5 for HDF5 output. "
338 f
"Got {self._filename}"
342def make_mcerrors_readable(dataframe, column="mcErrors"):
344 Take a dataframe containing a column with the output of the :b2:var:`mcErrors`
345 variable
from :b2:mod:`VariablesToNTuple`
and convert it to a readable set
346 of columns of the form ``{column}_{name}`` where column
is the value of the
347 ``column`` argument
and ``name``
is one of the :ref:`mcmatching`
348 error flags (without the leading
'c_').
351 dataframe(pandas.DataFrame): the pandas dataframe containing an ntuple
352 with column containing the output of the mcErrors variable
353 column(str): the name containing the values
from the mcErrors variable
358 if column
not in dataframe:
359 raise KeyError(f
"Cannot find column '{column}'")
362 mcErrors = dataframe[column].astype(int)
365 for flag
in (e
for e
in dir(ROOT.Belle2.MCMatching)
if e.startswith(
"c_")):
367 value = int(getattr(ROOT.Belle2.MCMatching, flag))
373 name = column + flag[1:]
375 dataframe[name] = mcErrors == 0
377 dataframe[name] = (mcErrors & value) == value
def __init__(self, listname, variables, filename, Optional[str] hdf_table_name=None)
def initialize_parquet_writer(self)
_event_buffer_counter
Event buffer counter.
def initialize_csv_writer(self)
_event_buffer_size
Event buffer size.
_parquet_writer
a writer object to write data into a parquet file
def __init__(self, str listname, List[str] variables, str filename, Optional[str] hdf_table_name=None, int event_buffer_size=100, **writer_kwargs)
_buffer
event variables buffer (will be automatically grown if necessary)
_table_name
Table name in the hdf5 file.
def fill_event_buffer(self)
_csv_writer
a writer object to write data into a csv file
def initialize_feather_writer(self)
def initialize_hdf5_writer(self)
_filename
Output filename.
_writer_kwargs
writer kwargs
_variables
List of variables.
_std_varnames
std::vector of variable names
_plist
Pointer to the particle list.
_feather_writer
a writer object to write data into a feather file
_hdf5_writer
The pytable file.
_listname
Particle list name.
_schema
A list of tuples and py.DataTypes to define the pyarrow schema.
_buffer_index
current start index in the event variables buffer