Belle II Software light-2509-fornax
b2pandas_utils.py
1
8
9from typing import List, Optional
10import basf2
11import variables
12import tables
13import numpy as np
14import warnings
15from pyarrow.parquet import ParquetWriter
16from pyarrow.csv import CSVWriter
17from pyarrow import ipc
18import pyarrow as pa
19
20
21"""
22Python utilities to help create or manage ntuples and work with them in pandas
23"""
24
25numpy_to_pyarrow_type_map = {
26 np.int32: pa.int32(),
27 np.int64: pa.int64(),
28 np.uint32: pa.uint32(),
29 np.uint64: pa.uint64(),
30 np.float32: pa.float32(),
31 np.float64: pa.float64(),
32 np.bool_: pa.bool_(),
33 np.object_: pa.string(),
34 np.str_: pa.string(),
35}
36
37
38class VariablesToTable(basf2.Module):
39 """
40 Base class to dump ntuples into a non root format of your choosing
41 """
42
44 self,
45 listname: str,
46 variables: List[str],
47 filename: str,
48 hdf_table_name: Optional[str] = None,
49 event_buffer_size: int = 100,
50 **writer_kwargs,
51 ):
52 """Constructor to initialize the internal state
53
54 Arguments:
55 listname(str): name of the particle list
56 variables(list(str)): list of variables to save for each particle
57 filename(str): name of the output file to be created.
58 Needs to end with `.csv` for csv output, `.parquet` or `.pq` for parquet output,
59 `.h5`, `.hdf` or `.hdf5` for hdf5 output and `.feather` or `.arrow` for feather output
60 hdf_table_name(str): name of the table in the hdf5 file.
61 If not provided, it will be the same as the listname
62 event_buffer_size(int): number of events to buffer before writing to disk,
63 higher values will use more memory but write faster and result in smaller files
64 **writer_kwargs: additional keyword arguments to pass to the writer.
65 For details, see the documentation of the writer in the apache arrow documentation.
66 Only use, if you know what you are doing!
67 """
68 super().__init__()
69
70 self._filename = filename
71
72 self._listname = listname
73
74 self._variables = list(set(variables))
75
76 file_type = self._filename.split(".")[-1]
77
78 if file_type in ["csv"]:
79 self._format = "csv"
80 elif file_type in ["parquet", "pq"]:
81 self._format = "parquet"
82 elif file_type in ["h5", "hdf", "hdf5"]:
83 self._format = "hdf5"
84 elif file_type in ["feather", "arrow"]:
85 self._format = "feather"
86 else:
87 raise ValueError(
88 f"Unknown file type ending .{file_type}, supported types are 'csv', "
89 "'parquet', 'pq', 'h5', 'hdf', 'hdf5', 'feather' or 'arrow'"
90 )
91
92 self._table_name = (
93 hdf_table_name if hdf_table_name is not None else self._listname
94 )
95
96 self._event_buffer_size = event_buffer_size
97
99
100 self._writer_kwargs = writer_kwargs
101
102 def initialize(self):
103 """
104 Setup variable lists, pointers, buffers and file writers
105 """
106 # Always avoid the top-level 'import ROOT'.
107 import ROOT # noqa
108
109
110 self._varnames = [
111 str(varname)
112 for varname in variables.variables.resolveCollections(
114 )
115 ]
116
117
119
120
121 self._evtmeta = ROOT.Belle2.PyStoreObj("EventMetaData")
122 self._evtmeta.isRequired()
123
124
125 self._plist = ROOT.Belle2.PyStoreObj(self._listname)
126 self._plist.isRequired()
127
128 dtypes = [
129 ("__experiment__", np.int32),
130 ("__run__", np.int32),
131 ("__event__", np.uint32),
132 ("__production__", np.uint32),
133 ("__candidate__", np.uint32),
134 ("__ncandidates__", np.uint32),
135 ]
136 for name in self._varnames:
137 # only float variables for now
138 dtypes.append((name, np.float64))
139
140
141 self._dtypes = dtypes
142
143
144 self._buffer = np.empty(self._event_buffer_size * 10, dtype=self._dtypes)
145
146
148
149 if self._format == "hdf5":
151 elif self._format == "parquet":
153 elif self._format == "csv":
155 elif self._format == "feather":
157
158 @property
159 def buffer(self):
160 """
161 The buffer slice across multiple entries
162 """
163 return self._buffer[:self._buffer_index]
164
165 @property
166 def event_buffer(self):
167 """
168 The buffer slice for the current event
169 """
170 return self._buffer[self._buffer_index - self._plist.getListSize(): self._buffer_index]
171
172 def clear_buffer(self):
173 """
174 Reset the buffer event counter and index
175 """
176 self._event_buffer_counter = 0
177 self._buffer_index = 0
178
179 def append_buffer(self):
180 """
181 "Append" a new event to the buffer by moving the buffer index forward by particle list size
182
183 Automatically replaces the buffer by a larger one if necessary
184 """
185 plist_size = self._plist.getListSize()
186 if (plist_size + self._buffer_index) > len(self._buffer):
187 new_buffer = np.empty(
188 # factor 1.5 larger or at least as large as necessary
189 max(int(len(self._buffer) * 1.5), self._buffer_index + plist_size),
190 dtype=self._dtypes,
191 )
192 new_buffer[:self._buffer_index] = self.buffer
193 self._buffer = new_buffer
194 self._buffer_index += plist_size
195 self._event_buffer_counter += 1
196
198 """
199 Initialize the feather writer using pyarrow
200 """
201
202 self._schema = [
203 (name, numpy_to_pyarrow_type_map[dt]) for name, dt in self._dtypes
204 ]
205
206 self._feather_writer = ipc.RecordBatchFileWriter(
207 sink=self._filename,
208 schema=pa.schema(self._schema),
209 **self._writer_kwargs,
210 )
211
213 """
214 Initialize the parquet writer using pyarrow
215 """
216
217 self._schema = [
218 (name, numpy_to_pyarrow_type_map[dt]) for name, dt in self._dtypes
219 ]
220
221 self._parquet_writer = ParquetWriter(
222 self._filename, schema=pa.schema(self._schema), **self._writer_kwargs
223 )
224
226 """
227 Initialize the csv writer using pyarrow
228 """
229
230 self._schema = [
231 (name, numpy_to_pyarrow_type_map[dt]) for name, dt in self._dtypes
232 ]
233
234 self._csv_writer = CSVWriter(self._filename, schema=pa.schema(self._schema), **self._writer_kwargs)
235
237 """
238 Initialize the hdf5 writer using pytables
239 """
240
241 self._hdf5_writer = tables.open_file(
242 self._filename, mode="w", title="Belle2 Variables to HDF5"
243 )
244 filters = tables.Filters(complevel=1, complib="blosc:lz4", fletcher32=False)
245
246 # some variable names are not just A-Za-z0-9 so pytables complains but
247 # seems to work. Ignore warning
248 with warnings.catch_warnings():
249 warnings.simplefilter("ignore")
250
251 self._table = self._hdf5_writer.create_table(
252 "/", self._table_name, obj=np.zeros(0, self._dtypes), filters=filters, **self._writer_kwargs
253 )
254
256 """
257 Assign values for all variables for all particles in the particle list to the current event buffer
258 """
259 buf = self.event_buffer
260
261 # add some extra columns for bookkeeping
262 buf["__experiment__"] = self._evtmeta.getExperiment()
263 buf["__run__"] = self._evtmeta.getRun()
264 buf["__event__"] = self._evtmeta.getEvent()
265 buf["__production__"] = self._evtmeta.getProduction()
266 buf["__ncandidates__"] = len(buf)
267 buf["__candidate__"] = np.arange(len(buf))
268
269 # fill variables into buffer
270 vector = variables.variables.evaluateVariables(self._std_varnames, self._plist)
271 values = np.array(vector.data()).reshape(-1, len(self._varnames))
272 for name, col in zip(self._varnames, values.T):
273 buf[name] = col
274
275 @property
276 def buffer_full(self):
277 """
278 check if the buffer is full
279 """
280 return self._event_buffer_counter == self._event_buffer_size
281
282 def write_buffer(self):
283 """
284 write the buffer to the output file
285 """
286 if self._format == "hdf5":
287 """Create a new row in the hdf5 file with for each particle in the list"""
288 # \cond false positive doxygen warning
289 self._table.append(self.buffer)
290 # \endcond
291 else:
292 table = {name: self.buffer[name] for name, _ in self._dtypes}
293 pa_table = pa.table(table, schema=pa.schema(self._schema))
294 if self._format == "parquet":
295 self._parquet_writer.write_table(pa_table)
296 elif self._format == "csv":
297 self._csv_writer.write(pa_table)
298 elif self._format == "feather":
299 self._feather_writer.write_table(pa_table)
300
301 def event(self):
302 """
303 Event processing function
304
305 executes the fill_buffer function and writes the data to the output file
306 in chunks of event_buffer_size
307 """
308 self.append_buffer()
309 self.fill_event_buffer()
310 # \cond false positive doxygen warning
311 if self.buffer_full:
312 self.write_buffer()
313 self.clear_buffer()
314 # \endcond
315
316 def terminate(self):
317 """save and close the output"""
318 import ROOT # noqa
319 # \cond false positive doxygen warning
320 if len(self.buffer) > 0:
321 self.write_buffer()
322 # \endcond
323
324 if self._format == "hdf5":
325 self._table.flush()
326 self._hdf5_writer.close()
327 elif self._format == "parquet":
328 self._parquet_writer.close()
329 elif self._format == "csv":
330 self._csv_writer.close()
331 elif self._format == "feather":
332 self._feather_writer.close()
333 ROOT.Belle2.MetadataService.Instance().addNtuple(self._filename)
334
335
337 """
338 Legacy class to not break existing code
339 """
340
341 def __init__(self, listname, variables, filename, hdf_table_name: Optional[str] = None):
342 """
343 Constructor for the legacy HDF5 writer.
344 """
345 super().__init__(listname, variables, filename, hdf_table_name)
346 assert self._filename.split(".")[-1] in ["h5", "hdf", "hdf5"], (
347 "Filename must end with .h5, .hdf or .hdf5 for HDF5 output. "
348 f"Got {self._filename}"
349 )
350
351
352def make_mcerrors_readable(dataframe, column="mcErrors"):
353 """
354 Take a dataframe containing a column with the output of the :b2:var:`mcErrors`
355 variable from :b2:mod:`VariablesToNTuple` and convert it to a readable set
356 of columns of the form ``{column}_{name}`` where column is the value of the
357 ``column`` argument and ``name`` is one of the :ref:`mcmatching`
358 error flags (without the leading 'c_').
359
360 Arguments:
361 dataframe(pandas.DataFrame): the pandas dataframe containing an ntuple
362 with column containing the output of the mcErrors variable
363 column(str): the name containing the values from the mcErrors variable
364 """
365 # Always avoid the top-level 'import ROOT'.
366 import ROOT # noqa
367
368 if column not in dataframe:
369 raise KeyError(f"Cannot find column '{column}'")
370
371 # convert mcErrors to int to be able to logical operate on it
372 mcErrors = dataframe[column].astype(int)
373
374 # and loop over all the c_ constants in the Belle2.MCMatching class
375 for flag in (e for e in dir(ROOT.Belle2.MCMatching) if e.startswith("c_")):
376 try:
377 value = int(getattr(ROOT.Belle2.MCMatching, flag))
378 except ValueError:
379 # probably the extraInfo column name, ignore
380 continue
381
382 # and set the column
383 name = column + flag[1:]
384 if value == 0:
385 dataframe[name] = mcErrors == 0
386 else:
387 dataframe[name] = (mcErrors & value) == value
std_vector(*args)
Definition __init__.py:144
__init__(self, listname, variables, filename, Optional[str] hdf_table_name=None)
_event_buffer_size
Event buffer size.
_parquet_writer
A list of tuples and py.DataTypes to define the pyarrow schema.
_buffer
event variables buffer (will be automatically grown if necessary)
_csv_writer
A list of tuples and py.DataTypes to define the pyarrow schema.
int _event_buffer_counter
Event buffer counter.
_std_varnames
std.vector of variable names
_plist
Pointer to the particle list.
_feather_writer
a writer object to write data into a feather file
list _schema
A list of tuples and py.DataTypes to define the pyarrow schema.
__init__(self, str listname, List[str] variables, str filename, Optional[str] hdf_table_name=None, int event_buffer_size=100, **writer_kwargs)
tuple _table_name
Table name in the hdf5 file.
int _buffer_index
current start index in the event variables buffer