Belle II Software light-2511-gacrux
b2pandas_utils.py
1
8
9from typing import List, Optional
10import basf2
11import variables
12import tables
13import numpy as np
14import warnings
15from pyarrow.parquet import ParquetWriter
16from pyarrow.csv import CSVWriter
17from pyarrow import ipc
18import pyarrow as pa
19
20
21"""
22Python utilities to help create or manage ntuples and work with them in pandas
23"""
24
25numpy_to_pyarrow_type_map = {
26 np.int32: pa.int32(),
27 np.int64: pa.int64(),
28 np.uint32: pa.uint32(),
29 np.uint64: pa.uint64(),
30 np.float32: pa.float32(),
31 np.float64: pa.float64(),
32 np.bool_: pa.bool_(),
33 np.object_: pa.string(),
34 np.str_: pa.string(),
35}
36
37
38class VariablesToTable(basf2.Module):
39 """
40 Base class to dump ntuples into a non root format of your choosing
41
42 Arguments:
43 listname(str): name of the particle list
44 variables(list[str]): list of variables to save for each particle
45 filename(str): name of the output file to be created.
46 Needs to end with ``.csv`` for csv output, ``.parquet`` or ``.pq`` for parquet output,
47 ``.h5``, ``.hdf`` or ``.hdf5`` for hdf5 output and ``.feather`` or ``.arrow`` for feather output
48 hdf_table_name(str): name of the table in the hdf5 file.
49 If not provided, it will be the same as the listname. Defaults to None.
50 event_buffer_size(int): number of events to buffer before writing to disk,
51 higher values will use more memory but result in smaller files.
52 For some formats, like parquet, this also sets the row group size. Defaults to 100.
53 **writer_kwargs: additional keyword arguments to pass to the writer.
54 For details, see the documentation of the respective writer in the apache arrow documentation.
55 For HDF5, these are passed to ``tables.File.create_table``.
56 Only use, if you know what you are doing!
57 """
58
60 self,
61 listname: str,
62 variables: List[str],
63 filename: str,
64 hdf_table_name: Optional[str] = None,
65 event_buffer_size: int = 100,
66 **writer_kwargs,
67 ):
68 """Constructor to initialize the internal state"""
69 super().__init__()
70
71 self._filename = filename
72
73 self._listname = listname
74
75 self._variables = list(set(variables))
76
77 file_type = self._filename.split(".")[-1]
78
79 if file_type in ["csv"]:
80 self._format = "csv"
81 elif file_type in ["parquet", "pq"]:
82 self._format = "parquet"
83 elif file_type in ["h5", "hdf", "hdf5"]:
84 self._format = "hdf5"
85 elif file_type in ["feather", "arrow"]:
86 self._format = "feather"
87 else:
88 raise ValueError(
89 f"Unknown file type ending .{file_type}, supported types are 'csv', "
90 "'parquet', 'pq', 'h5', 'hdf', 'hdf5', 'feather' or 'arrow'"
91 )
92
93 self._table_name = (
94 hdf_table_name if hdf_table_name is not None else self._listname
95 )
96
97 self._event_buffer_size = event_buffer_size
98
100
101 self._writer_kwargs = writer_kwargs
102
103 def initialize(self):
104 """
105 Setup variable lists, pointers, buffers and file writers
106 """
107 # Always avoid the top-level 'import ROOT'.
108 import ROOT # noqa
109
110
111 self._varnames = [
112 str(varname)
113 for varname in variables.variables.resolveCollections(
115 )
116 ]
117
118
120
121
122 self._evtmeta = ROOT.Belle2.PyStoreObj("EventMetaData")
123 self._evtmeta.isRequired()
124
125
126 self._plist = ROOT.Belle2.PyStoreObj(self._listname)
127 self._plist.isRequired()
128
129 dtypes = [
130 ("__experiment__", np.int32),
131 ("__run__", np.int32),
132 ("__event__", np.uint32),
133 ("__production__", np.uint32),
134 ("__candidate__", np.uint32),
135 ("__ncandidates__", np.uint32),
136 ]
137 for name in self._varnames:
138 # only float variables for now
139 dtypes.append((name, np.float64))
140
141
142 self._dtypes = dtypes
143
144
145 self._buffer = np.empty(self._event_buffer_size * 10, dtype=self._dtypes)
146
147
149
150 if self._format == "hdf5":
152 elif self._format == "parquet":
154 elif self._format == "csv":
156 elif self._format == "feather":
158
159 @property
160 def buffer(self):
161 """
162 The buffer slice across multiple entries
163 """
164 return self._buffer[:self._buffer_index]
165
166 @property
167 def event_buffer(self):
168 """
169 The buffer slice for the current event
170 """
171 return self._buffer[self._buffer_index - self._plist.getListSize(): self._buffer_index]
172
173 def clear_buffer(self):
174 """
175 Reset the buffer event counter and index
176 """
177 self._event_buffer_counter = 0
178 self._buffer_index = 0
179
180 def append_buffer(self):
181 """
182 "Append" a new event to the buffer by moving the buffer index forward by particle list size
183
184 Automatically replaces the buffer by a larger one if necessary
185 """
186 plist_size = self._plist.getListSize()
187 if (plist_size + self._buffer_index) > len(self._buffer):
188 new_buffer = np.empty(
189 # factor 1.5 larger or at least as large as necessary
190 max(int(len(self._buffer) * 1.5), self._buffer_index + plist_size),
191 dtype=self._dtypes,
192 )
193 new_buffer[:self._buffer_index] = self.buffer
194 self._buffer = new_buffer
195 self._buffer_index += plist_size
196 self._event_buffer_counter += 1
197
199 """
200 Initialize the feather writer using pyarrow
201 """
202
203 self._schema = [
204 (name, numpy_to_pyarrow_type_map[dt]) for name, dt in self._dtypes
205 ]
206
207 self._feather_writer = ipc.RecordBatchFileWriter(
208 sink=self._filename,
209 schema=pa.schema(self._schema),
210 **self._writer_kwargs,
211 )
212
214 """
215 Initialize the parquet writer using pyarrow
216 """
217
218 self._schema = [
219 (name, numpy_to_pyarrow_type_map[dt]) for name, dt in self._dtypes
220 ]
221
222 self._parquet_writer = ParquetWriter(
223 self._filename, schema=pa.schema(self._schema), **self._writer_kwargs
224 )
225
227 """
228 Initialize the csv writer using pyarrow
229 """
230
231 self._schema = [
232 (name, numpy_to_pyarrow_type_map[dt]) for name, dt in self._dtypes
233 ]
234
235 self._csv_writer = CSVWriter(self._filename, schema=pa.schema(self._schema), **self._writer_kwargs)
236
238 """
239 Initialize the hdf5 writer using pytables
240 """
241
242 self._hdf5_writer = tables.open_file(
243 self._filename, mode="w", title="Belle2 Variables to HDF5"
244 )
245 filters = tables.Filters(complevel=1, complib="blosc:lz4", fletcher32=False)
246
247 # some variable names are not just A-Za-z0-9 so pytables complains but
248 # seems to work. Ignore warning
249 with warnings.catch_warnings():
250 warnings.simplefilter("ignore")
251
252 self._table = self._hdf5_writer.create_table(
253 "/", self._table_name, obj=np.zeros(0, self._dtypes), filters=filters, **self._writer_kwargs
254 )
255
257 """
258 Assign values for all variables for all particles in the particle list to the current event buffer
259 """
260 buf = self.event_buffer
261
262 # add some extra columns for bookkeeping
263 buf["__experiment__"] = self._evtmeta.getExperiment()
264 buf["__run__"] = self._evtmeta.getRun()
265 buf["__event__"] = self._evtmeta.getEvent()
266 buf["__production__"] = self._evtmeta.getProduction()
267 buf["__ncandidates__"] = len(buf)
268 buf["__candidate__"] = np.arange(len(buf))
269
270 # fill variables into buffer
271 vector = variables.variables.evaluateVariables(self._std_varnames, self._plist)
272 values = np.array(vector.data()).reshape(-1, len(self._varnames))
273 for name, col in zip(self._varnames, values.T):
274 buf[name] = col
275
276 @property
277 def buffer_full(self):
278 """
279 check if the buffer is full
280 """
281 return self._event_buffer_counter == self._event_buffer_size
282
283 def write_buffer(self):
284 """
285 write the buffer to the output file
286 """
287 if self._format == "hdf5":
288 """Create a new row in the hdf5 file with for each particle in the list"""
289 # \cond false positive doxygen warning
290 self._table.append(self.buffer)
291 # \endcond
292 else:
293 table = {name: self.buffer[name] for name, _ in self._dtypes}
294 pa_table = pa.table(table, schema=pa.schema(self._schema))
295 if self._format == "parquet":
296 self._parquet_writer.write_table(pa_table)
297 elif self._format == "csv":
298 self._csv_writer.write(pa_table)
299 elif self._format == "feather":
300 self._feather_writer.write_table(pa_table)
301
302 def event(self):
303 """
304 Event processing function
305
306 executes the fill_buffer function and writes the data to the output file
307 in chunks of event_buffer_size
308 """
309 self.append_buffer()
310 self.fill_event_buffer()
311 # \cond false positive doxygen warning
312 if self.buffer_full:
313 self.write_buffer()
314 self.clear_buffer()
315 # \endcond
316
317 def terminate(self):
318 """save and close the output"""
319 import ROOT # noqa
320 # \cond false positive doxygen warning
321 if len(self.buffer) > 0:
322 self.write_buffer()
323 # \endcond
324
325 if self._format == "hdf5":
326 self._table.flush()
327 self._hdf5_writer.close()
328 elif self._format == "parquet":
329 self._parquet_writer.close()
330 elif self._format == "csv":
331 self._csv_writer.close()
332 elif self._format == "feather":
333 self._feather_writer.close()
334 ROOT.Belle2.MetadataService.Instance().addNtuple(self._filename)
335
336
338 """
339 Legacy class to not break existing code.
340
341 This class is a wrapper around `VariablesToTable` that enforces HDF5 output
342 and uses default settings for buffer size and writer arguments.
343 It mostly exists for legacy reasons and new code should use `VariablesToTable` directly.
344
345 Arguments:
346 listname(str): name of the particle list
347 variables(list[str]): list of variables to save for each particle
348 filename(str): name of the output file to be created.
349 Must end with ``.h5``, ``.hdf`` or ``.hdf5``.
350 hdf_table_name(str): name of the table in the hdf5 file.
351 If not provided, it will be the same as the listname. Defaults to None.
352 """
353
354 def __init__(self, listname, variables, filename, hdf_table_name: Optional[str] = None):
355 """
356 Constructor for the legacy HDF5 writer.
357 """
358 super().__init__(listname, variables, filename, hdf_table_name)
359 assert self._filename.split(".")[-1] in ["h5", "hdf", "hdf5"], (
360 "Filename must end with .h5, .hdf or .hdf5 for HDF5 output. "
361 f"Got {self._filename}"
362 )
363
364
365def make_mcerrors_readable(dataframe, column="mcErrors"):
366 """
367 Take a dataframe containing a column with the output of the :b2:var:`mcErrors`
368 variable from :b2:mod:`VariablesToNTuple` and convert it to a readable set
369 of columns of the form ``{column}_{name}`` where column is the value of the
370 ``column`` argument and ``name`` is one of the :ref:`mcmatching`
371 error flags (without the leading 'c_').
372
373 Arguments:
374 dataframe(pandas.DataFrame): the pandas dataframe containing an ntuple
375 with column containing the output of the mcErrors variable
376 column(str): the name containing the values from the mcErrors variable
377 """
378 # Always avoid the top-level 'import ROOT'.
379 import ROOT # noqa
380
381 if column not in dataframe:
382 raise KeyError(f"Cannot find column '{column}'")
383
384 # convert mcErrors to int to be able to logical operate on it
385 mcErrors = dataframe[column].astype(int)
386
387 # and loop over all the c_ constants in the Belle2.MCMatching class
388 for flag in (e for e in dir(ROOT.Belle2.MCMatching) if e.startswith("c_")):
389 try:
390 value = int(getattr(ROOT.Belle2.MCMatching, flag))
391 except ValueError:
392 # probably the extraInfo column name, ignore
393 continue
394
395 # and set the column
396 name = column + flag[1:]
397 if value == 0:
398 dataframe[name] = mcErrors == 0
399 else:
400 dataframe[name] = (mcErrors & value) == value
std_vector(*args)
Definition __init__.py:144
__init__(self, listname, variables, filename, Optional[str] hdf_table_name=None)
_event_buffer_size
Event buffer size.
_parquet_writer
A list of tuples and py.DataTypes to define the pyarrow schema.
_buffer
event variables buffer (will be automatically grown if necessary)
_csv_writer
A list of tuples and py.DataTypes to define the pyarrow schema.
int _event_buffer_counter
Event buffer counter.
_std_varnames
std.vector of variable names
_plist
Pointer to the particle list.
_feather_writer
a writer object to write data into a feather file
list _schema
A list of tuples and py.DataTypes to define the pyarrow schema.
__init__(self, str listname, List[str] variables, str filename, Optional[str] hdf_table_name=None, int event_buffer_size=100, **writer_kwargs)
tuple _table_name
Table name in the hdf5 file.
int _buffer_index
current start index in the event variables buffer