9from typing
import List, Optional
15from pyarrow.parquet
import ParquetWriter
16from pyarrow.csv
import CSVWriter
17from pyarrow
import ipc
22Python utilities to help create or manage ntuples and work with them in pandas
25numpy_to_pyarrow_type_map = {
28 np.uint32: pa.uint32(),
29 np.uint64: pa.uint64(),
30 np.float32: pa.float32(),
31 np.float64: pa.float64(),
33 np.object_: pa.string(),
40 Base class to dump ntuples into a non root format of your choosing
48 hdf_table_name: Optional[str] =
None,
49 event_buffer_size: int = 100,
52 """Constructor to initialize the internal state
55 listname(str): name of the particle list
56 variables(list(str)): list of variables to save for each particle
57 filename(str): name of the output file to be created.
58 Needs to end with `.csv` for csv output, `.parquet` or `.pq` for parquet output,
59 `.h5`, `.hdf` or `.hdf5` for hdf5 output and `.feather` or `.arrow` for feather output
60 hdf_table_name(str): name of the table in the hdf5 file.
61 If not provided, it will be the same as the listname
62 event_buffer_size(int): number of events to buffer before writing to disk,
63 higher values will use more memory but write faster and result in smaller files
64 **writer_kwargs: additional keyword arguments to pass to the writer.
65 For details, see the documentation of the writer in the apache arrow documentation.
66 Only use, if you know what you are doing!
78 if file_type
in [
"csv"]:
80 elif file_type
in [
"parquet",
"pq"]:
82 elif file_type
in [
"h5",
"hdf",
"hdf5"]:
84 elif file_type
in [
"feather",
"arrow"]:
88 f
"Unknown file type ending .{file_type}, supported types are 'csv', "
89 "'parquet', 'pq', 'h5', 'hdf', 'hdf5', 'feather' or 'arrow'"
93 hdf_table_name
if hdf_table_name
is not None else self.
_listname
104 Setup variable lists, pointers, buffers and file writers
112 for varname
in variables.variables.resolveCollections(
121 self.
_evtmeta = ROOT.Belle2.PyStoreObj(
"EventMetaData")
129 (
"__experiment__", np.int32),
130 (
"__run__", np.int32),
131 (
"__event__", np.uint32),
132 (
"__production__", np.uint32),
133 (
"__candidate__", np.uint32),
134 (
"__ncandidates__", np.uint32),
138 dtypes.append((name, np.float64))
151 elif self.
_format ==
"parquet":
155 elif self.
_format ==
"feather":
161 The buffer slice across multiple entries
168 The buffer slice for the current event
174 Reset the buffer event counter and index
181 "Append" a new event to the buffer by moving the buffer index forward by particle list size
183 Automatically replaces the buffer by a larger one if necessary
185 plist_size = self.
_plist.getListSize()
187 new_buffer = np.empty(
199 Initialize the feather writer using pyarrow
203 (name, numpy_to_pyarrow_type_map[dt])
for name, dt
in self.
_dtypes
208 schema=pa.schema(self.
_schema),
214 Initialize the parquet writer using pyarrow
218 (name, numpy_to_pyarrow_type_map[dt])
for name, dt
in self.
_dtypes
227 Initialize the csv writer using pyarrow
231 (name, numpy_to_pyarrow_type_map[dt])
for name, dt
in self.
_dtypes
238 Initialize the hdf5 writer using pytables
242 self.
_filename, mode=
"w", title=
"Belle2 Variables to HDF5"
244 filters = tables.Filters(complevel=1, complib=
"blosc:lz4", fletcher32=
False)
248 with warnings.catch_warnings():
249 warnings.simplefilter(
"ignore")
257 Assign values for all variables for all particles in the particle list to the current event buffer
262 buf[
"__experiment__"] = self.
_evtmeta.getExperiment()
263 buf[
"__run__"] = self.
_evtmeta.getRun()
264 buf[
"__event__"] = self.
_evtmeta.getEvent()
265 buf[
"__production__"] = self.
_evtmeta.getProduction()
266 buf[
"__ncandidates__"] = len(buf)
267 buf[
"__candidate__"] = np.arange(len(buf))
271 values = np.array(vector.data()).reshape(-1, len(self.
_varnames))
272 for name, col
in zip(self.
_varnames, values.T):
276 def buffer_full(self):
278 check if the buffer is full
284 write the buffer to the output file
287 """Create a new row in the hdf5 file with for each particle in the list"""
290 table = {name: self.
buffer[name]
for name, _
in self.
_dtypes}
291 pa_table = pa.table(table, schema=pa.schema(self.
_schema))
296 elif self.
_format ==
"feather":
301 Event processing function
303 executes the fill_buffer function and writes the data to the output file
304 in chunks of event_buffer_size
313 """save and close the output"""
321 elif self.
_format ==
"parquet":
325 elif self.
_format ==
"feather":
327 ROOT.Belle2.MetadataService.Instance().addNtuple(self.
_filename)
332 Legacy class to not break existing code
335 def __init__(self, listname, variables, filename, hdf_table_name: Optional[str] =
None,):
336 super().__init__(listname, variables, filename, hdf_table_name)
337 assert self.
_filename.split(
".")[-1]
in [
"h5",
"hdf",
"hdf5"], (
338 "Filename must end with .h5, .hdf or .hdf5 for HDF5 output. "
339 f
"Got {self._filename}"
343def make_mcerrors_readable(dataframe, column="mcErrors"):
345 Take a dataframe containing a column with the output of the :b2:var:`mcErrors`
346 variable from :b2:mod:`VariablesToNTuple` and convert it to a readable set
347 of columns of the form ``{column}_{name}`` where column is the value of the
348 ``column`` argument and ``name`` is one of the :ref:`mcmatching`
349 error flags (without the leading 'c_').
352 dataframe(pandas.DataFrame): the pandas dataframe containing an ntuple
353 with column containing the output of the mcErrors variable
354 column(str): the name containing the values from the mcErrors variable
359 if column
not in dataframe:
360 raise KeyError(f
"Cannot find column '{column}'")
363 mcErrors = dataframe[column].astype(int)
366 for flag
in (e
for e
in dir(ROOT.Belle2.MCMatching)
if e.startswith(
"c_")):
368 value = int(getattr(ROOT.Belle2.MCMatching, flag))
374 name = column + flag[1:]
376 dataframe[name] = mcErrors == 0
378 dataframe[name] = (mcErrors & value) == value
_event_buffer_size
Event buffer size.
_parquet_writer
A list of tuples and py.DataTypes to define the pyarrow schema.
_buffer
event variables buffer (will be automatically grown if necessary)
_csv_writer
A list of tuples and py.DataTypes to define the pyarrow schema.
list _varnames
variable names
int _event_buffer_counter
Event buffer counter.
initialize_feather_writer(self)
_filename
Output filename.
_writer_kwargs
writer kwargs
_variables
List of variables.
_std_varnames
std.vector of variable names
_plist
Pointer to the particle list.
initialize_csv_writer(self)
_feather_writer
a writer object to write data into a feather file
str _format
Output format.
_hdf5_writer
The pytable file.
initialize_hdf5_writer(self)
_listname
Particle list name.
list _schema
A list of tuples and py.DataTypes to define the pyarrow schema.
__init__(self, str listname, List[str] variables, str filename, Optional[str] hdf_table_name=None, int event_buffer_size=100, **writer_kwargs)
tuple _table_name
Table name in the hdf5 file.
initialize_parquet_writer(self)
int _buffer_index
current start index in the event variables buffer