Belle II Software development
b2pandas_utils.py
1
8
9from typing import List, Optional
10import basf2
11import variables
12import tables
13import numpy as np
14import warnings
15from pyarrow.parquet import ParquetWriter
16from pyarrow.csv import CSVWriter
17from pyarrow import ipc
18import pyarrow as pa
19
20
21"""
22Python utilities to help create or manage ntuples and work with them in pandas
23"""
24
25numpy_to_pyarrow_type_map = {
26 np.int32: pa.int32(),
27 np.int64: pa.int64(),
28 np.uint32: pa.uint32(),
29 np.uint64: pa.uint64(),
30 np.float32: pa.float32(),
31 np.float64: pa.float64(),
32 np.bool_: pa.bool_(),
33 np.object_: pa.string(),
34 np.str_: pa.string(),
35}
36
37
38class VariablesToTable(basf2.Module):
39 """
40 Base class to dump ntuples into a non root format of your choosing
41 """
42
44 self,
45 listname: str,
46 variables: List[str],
47 filename: str,
48 hdf_table_name: Optional[str] = None,
49 event_buffer_size: int = 100,
50 **writer_kwargs,
51 ):
52 """Constructor to initialize the internal state
53
54 Arguments:
55 listname(str): name of the particle list
56 variables(list(str)): list of variables to save for each particle
57 filename(str): name of the output file to be created.
58 Needs to end with `.csv` for csv output, `.parquet` or `.pq` for parquet output,
59 `.h5`, `.hdf` or `.hdf5` for hdf5 output and `.feather` or `.arrow` for feather output
60 hdf_table_name(str): name of the table in the hdf5 file.
61 If not provided, it will be the same as the listname
62 event_buffer_size(int): number of events to buffer before writing to disk,
63 higher values will use more memory but write faster and result in smaller files
64 **writer_kwargs: additional keyword arguments to pass to the writer.
65 For details, see the documentation of the writer in the apache arrow documentation.
66 Only use, if you know what you are doing!
67 """
68 super().__init__()
69
70 self._filename = filename
71
72 self._listname = listname
73
74 self._variables = list(set(variables))
75
76 file_type = self._filename.split(".")[-1]
77 if file_type in ["csv"]:
78 self._format = "csv"
79 elif file_type in ["parquet", "pq"]:
80 self._format = "parquet"
81 elif file_type in ["h5", "hdf", "hdf5"]:
82 self._format = "hdf5"
83 elif file_type in ["feather", "arrow"]:
84 self._format = "feather"
85 else:
86 raise ValueError(
87 f"Unknown file type ending .{file_type}, supported types are 'csv', "
88 "'parquet', 'pq', 'h5', 'hdf', 'hdf5', 'feather' or 'arrow'"
89 )
90
91 self._table_name = (
92 hdf_table_name if hdf_table_name is not None else self._listname
93 )
94
95 self._event_buffer_size = event_buffer_size
96
98
99 self._writer_kwargs = writer_kwargs
100
101 def initialize(self):
102 """
103 Setup variable lists, pointers, buffers and file writers
104 """
105 # Always avoid the top-level 'import ROOT'.
106 import ROOT # noqa
107
108
109 self._varnames = [
110 str(varname)
111 for varname in variables.variables.resolveCollections(
113 )
114 ]
115
116
118
119
120 self._evtmeta = ROOT.Belle2.PyStoreObj("EventMetaData")
121 self._evtmeta.isRequired()
122
123
124 self._plist = ROOT.Belle2.PyStoreObj(self._listname)
125 self._plist.isRequired()
126
127 dtypes = [
128 ("__experiment__", np.int32),
129 ("__run__", np.int32),
130 ("__event__", np.uint32),
131 ("__production__", np.uint32),
132 ("__candidate__", np.uint32),
133 ("__ncandidates__", np.uint32),
134 ]
135 for name in self._varnames:
136 # only float variables for now
137 dtypes.append((name, np.float64))
138
139
140 self._dtypes = dtypes
141
142
143 self._buffer = np.empty(self._event_buffer_size * 10, dtype=self._dtypes)
144
145
147
148 if self._format == "hdf5":
150 elif self._format == "parquet":
152 elif self._format == "csv":
154 elif self._format == "feather":
156
157 @property
158 def buffer(self):
159 """
160 The buffer slice across multiple entries
161 """
162 return self._buffer[:self._buffer_index]
163
164 @property
165 def event_buffer(self):
166 """
167 The buffer slice for the current event
168 """
169 return self._buffer[self._buffer_index - self._plist.getListSize(): self._buffer_index]
170
171 def clear_buffer(self):
172 """
173 Reset the buffer event counter and index
174 """
175 self._event_buffer_counter = 0
176 self._buffer_index = 0
177
178 def append_buffer(self):
179 """
180 "Append" a new event to the buffer by moving the buffer index forward by particle list size
181
182 Automatically replaces the buffer by a larger one if necessary
183 """
184 plist_size = self._plist.getListSize()
185 if (plist_size + self._buffer_index) > len(self._buffer):
186 new_buffer = np.empty(
187 # factor 1.5 larger or at least as large as necessary
188 max(int(len(self._buffer) * 1.5), self._buffer_index + plist_size),
189 dtype=self._dtypes,
190 )
191 new_buffer[:self._buffer_index] = self.buffer
192 self._buffer = new_buffer
193 self._buffer_index += plist_size
194 self._event_buffer_counter += 1
195
197 """
198 Initialize the feather writer using pyarrow
199 """
200
201 self._schema = [
202 (name, numpy_to_pyarrow_type_map[dt]) for name, dt in self._dtypes
203 ]
204
205 self._feather_writer = ipc.RecordBatchFileWriter(
206 sink=self._filename,
207 schema=pa.schema(self._schema),
208 **self._writer_kwargs,
209 )
210
212 """
213 Initialize the parquet writer using pyarrow
214 """
215
216 self._schema = [
217 (name, numpy_to_pyarrow_type_map[dt]) for name, dt in self._dtypes
218 ]
219
220 self._parquet_writer = ParquetWriter(
221 self._filename, schema=pa.schema(self._schema), **self._writer_kwargs
222 )
223
225 """
226 Initialize the csv writer using pyarrow
227 """
228
229 self._schema = [
230 (name, numpy_to_pyarrow_type_map[dt]) for name, dt in self._dtypes
231 ]
232
233 self._csv_writer = CSVWriter(self._filename, schema=pa.schema(self._schema), **self._writer_kwargs)
234
236 """
237 Initialize the hdf5 writer using pytables
238 """
239
240 self._hdf5_writer = tables.open_file(
241 self._filename, mode="w", title="Belle2 Variables to HDF5"
242 )
243 filters = tables.Filters(complevel=1, complib="blosc:lz4", fletcher32=False)
244
245 # some variable names are not just A-Za-z0-9 so pytables complains but
246 # seems to work. Ignore warning
247 with warnings.catch_warnings():
248 warnings.simplefilter("ignore")
249
250 self._table = self._hdf5_writer.create_table(
251 "/", self._table_name, obj=np.zeros(0, self._dtypes), filters=filters, **self._writer_kwargs
252 )
253
255 """
256 Assign values for all variables for all particles in the particle list to the current event buffer
257 """
258 buf = self.event_buffer
259
260 # add some extra columns for bookkeeping
261 buf["__experiment__"] = self._evtmeta.getExperiment()
262 buf["__run__"] = self._evtmeta.getRun()
263 buf["__event__"] = self._evtmeta.getEvent()
264 buf["__production__"] = self._evtmeta.getProduction()
265 buf["__ncandidates__"] = len(buf)
266 buf["__candidate__"] = np.arange(len(buf))
267
268 # fill variables into buffer
269 vector = variables.variables.evaluateVariables(self._std_varnames, self._plist)
270 values = np.array(vector.data()).reshape(-1, len(self._varnames))
271 for name, col in zip(self._varnames, values.T):
272 buf[name] = col
273
274 @property
275 def buffer_full(self):
276 """
277 check if the buffer is full
278 """
279 return self._event_buffer_counter == self._event_buffer_size
280
281 def write_buffer(self):
282 """
283 write the buffer to the output file
284 """
285 if self._format == "hdf5":
286 """Create a new row in the hdf5 file with for each particle in the list"""
287 self._table.append(self.buffer)
288 else:
289 table = {name: self.buffer[name] for name, _ in self._dtypes}
290 pa_table = pa.table(table, schema=pa.schema(self._schema))
291 if self._format == "parquet":
292 self._parquet_writer.write_table(pa_table)
293 elif self._format == "csv":
294 self._csv_writer.write(pa_table)
295 elif self._format == "feather":
296 self._feather_writer.write_table(pa_table)
297
298 def event(self):
299 """
300 Event processing function
301
302 executes the fill_buffer function and writes the data to the output file
303 in chunks of event_buffer_size
304 """
305 self.append_buffer()
306 self.fill_event_buffer()
307 if self.buffer_full:
308 self.write_buffer()
309 self.clear_buffer()
310
311 def terminate(self):
312 """save and close the output"""
313 import ROOT # noqa
314 if len(self.buffer) > 0:
315 self.write_buffer()
316
317 if self._format == "hdf5":
318 self._table.flush()
319 self._hdf5_writer.close()
320 elif self._format == "parquet":
321 self._parquet_writer.close()
322 elif self._format == "csv":
323 self._csv_writer.close()
324 elif self._format == "feather":
325 self._feather_writer.close()
326 ROOT.Belle2.MetadataService.Instance().addNtuple(self._filename)
327
328
330 """
331 Legacy class to not break existing code
332 """
333
334 def __init__(self, listname, variables, filename, hdf_table_name: Optional[str] = None,):
335 super().__init__(listname, variables, filename, hdf_table_name)
336 assert self._filename.split(".")[-1] in ["h5", "hdf", "hdf5"], (
337 "Filename must end with .h5, .hdf or .hdf5 for HDF5 output. "
338 f"Got {self._filename}"
339 )
340
341
342def make_mcerrors_readable(dataframe, column="mcErrors"):
343 """
344 Take a dataframe containing a column with the output of the :b2:var:`mcErrors`
345 variable from :b2:mod:`VariablesToNTuple` and convert it to a readable set
346 of columns of the form ``{column}_{name}`` where column is the value of the
347 ``column`` argument and ``name`` is one of the :ref:`mcmatching`
348 error flags (without the leading 'c_').
349
350 Arguments:
351 dataframe(pandas.DataFrame): the pandas dataframe containing an ntuple
352 with column containing the output of the mcErrors variable
353 column(str): the name containing the values from the mcErrors variable
354 """
355 # Always avoid the top-level 'import ROOT'.
356 import ROOT # noqa
357
358 if column not in dataframe:
359 raise KeyError(f"Cannot find column '{column}'")
360
361 # convert mcErrors to int to be able to logical operate on it
362 mcErrors = dataframe[column].astype(int)
363
364 # and loop over all the c_ constants in the Belle2.MCMatching class
365 for flag in (e for e in dir(ROOT.Belle2.MCMatching) if e.startswith("c_")):
366 try:
367 value = int(getattr(ROOT.Belle2.MCMatching, flag))
368 except ValueError:
369 # probably the extraInfo column name, ignore
370 continue
371
372 # and set the column
373 name = column + flag[1:]
374 if value == 0:
375 dataframe[name] = mcErrors == 0
376 else:
377 dataframe[name] = (mcErrors & value) == value
def std_vector(*args)
Definition: __init__.py:142
def __init__(self, listname, variables, filename, Optional[str] hdf_table_name=None)
_event_buffer_counter
Event buffer counter.
_event_buffer_size
Event buffer size.
_parquet_writer
a writer object to write data into a parquet file
def __init__(self, str listname, List[str] variables, str filename, Optional[str] hdf_table_name=None, int event_buffer_size=100, **writer_kwargs)
_buffer
event variables buffer (will be automatically grown if necessary)
_table_name
Table name in the hdf5 file.
_csv_writer
a writer object to write data into a csv file
_std_varnames
std::vector of variable names
_plist
Pointer to the particle list.
_feather_writer
a writer object to write data into a feather file
_schema
A list of tuples and py.DataTypes to define the pyarrow schema.
_buffer_index
current start index in the event variables buffer