Belle II Software prerelease-10-00-00a
b2pandas_utils.py
1
8
9from typing import List, Optional
10import basf2
11import variables
12import tables
13import numpy as np
14import warnings
15from pyarrow.parquet import ParquetWriter
16from pyarrow.csv import CSVWriter
17from pyarrow import ipc
18import pyarrow as pa
19
20
21"""
22Python utilities to help create or manage ntuples and work with them in pandas
23"""
24
25numpy_to_pyarrow_type_map = {
26 np.int32: pa.int32(),
27 np.int64: pa.int64(),
28 np.uint32: pa.uint32(),
29 np.uint64: pa.uint64(),
30 np.float32: pa.float32(),
31 np.float64: pa.float64(),
32 np.bool_: pa.bool_(),
33 np.object_: pa.string(),
34 np.str_: pa.string(),
35}
36
37
38class VariablesToTable(basf2.Module):
39 """
40 Base class to dump ntuples into a non root format of your choosing
41 """
42
44 self,
45 listname: str,
46 variables: List[str],
47 filename: str,
48 hdf_table_name: Optional[str] = None,
49 event_buffer_size: int = 100,
50 **writer_kwargs,
51 ):
52 """Constructor to initialize the internal state
53
54 Arguments:
55 listname(str): name of the particle list
56 variables(list(str)): list of variables to save for each particle
57 filename(str): name of the output file to be created.
58 Needs to end with `.csv` for csv output, `.parquet` or `.pq` for parquet output,
59 `.h5`, `.hdf` or `.hdf5` for hdf5 output and `.feather` or `.arrow` for feather output
60 hdf_table_name(str): name of the table in the hdf5 file.
61 If not provided, it will be the same as the listname
62 event_buffer_size(int): number of events to buffer before writing to disk,
63 higher values will use more memory but write faster and result in smaller files
64 **writer_kwargs: additional keyword arguments to pass to the writer.
65 For details, see the documentation of the writer in the apache arrow documentation.
66 Only use, if you know what you are doing!
67 """
68 super().__init__()
69
70 self._filename = filename
71
72 self._listname = listname
73
74 self._variables = list(set(variables))
75
76 file_type = self._filename.split(".")[-1]
77
78 if file_type in ["csv"]:
79 self._format = "csv"
80 elif file_type in ["parquet", "pq"]:
81 self._format = "parquet"
82 elif file_type in ["h5", "hdf", "hdf5"]:
83 self._format = "hdf5"
84 elif file_type in ["feather", "arrow"]:
85 self._format = "feather"
86 else:
87 raise ValueError(
88 f"Unknown file type ending .{file_type}, supported types are 'csv', "
89 "'parquet', 'pq', 'h5', 'hdf', 'hdf5', 'feather' or 'arrow'"
90 )
91
92 self._table_name = (
93 hdf_table_name if hdf_table_name is not None else self._listname
94 )
95
96 self._event_buffer_size = event_buffer_size
97
99
100 self._writer_kwargs = writer_kwargs
101
102 def initialize(self):
103 """
104 Setup variable lists, pointers, buffers and file writers
105 """
106 # Always avoid the top-level 'import ROOT'.
107 import ROOT # noqa
108
109
110 self._varnames = [
111 str(varname)
112 for varname in variables.variables.resolveCollections(
114 )
115 ]
116
117
119
120
121 self._evtmeta = ROOT.Belle2.PyStoreObj("EventMetaData")
122 self._evtmeta.isRequired()
123
124
125 self._plist = ROOT.Belle2.PyStoreObj(self._listname)
126 self._plist.isRequired()
127
128 dtypes = [
129 ("__experiment__", np.int32),
130 ("__run__", np.int32),
131 ("__event__", np.uint32),
132 ("__production__", np.uint32),
133 ("__candidate__", np.uint32),
134 ("__ncandidates__", np.uint32),
135 ]
136 for name in self._varnames:
137 # only float variables for now
138 dtypes.append((name, np.float64))
139
140
141 self._dtypes = dtypes
142
143
144 self._buffer = np.empty(self._event_buffer_size * 10, dtype=self._dtypes)
145
146
148
149 if self._format == "hdf5":
151 elif self._format == "parquet":
153 elif self._format == "csv":
155 elif self._format == "feather":
157
158 @property
159 def buffer(self):
160 """
161 The buffer slice across multiple entries
162 """
163 return self._buffer[:self._buffer_index]
164
165 @property
166 def event_buffer(self):
167 """
168 The buffer slice for the current event
169 """
170 return self._buffer[self._buffer_index - self._plist.getListSize(): self._buffer_index]
171
172 def clear_buffer(self):
173 """
174 Reset the buffer event counter and index
175 """
176 self._event_buffer_counter = 0
177 self._buffer_index = 0
178
179 def append_buffer(self):
180 """
181 "Append" a new event to the buffer by moving the buffer index forward by particle list size
182
183 Automatically replaces the buffer by a larger one if necessary
184 """
185 plist_size = self._plist.getListSize()
186 if (plist_size + self._buffer_index) > len(self._buffer):
187 new_buffer = np.empty(
188 # factor 1.5 larger or at least as large as necessary
189 max(int(len(self._buffer) * 1.5), self._buffer_index + plist_size),
190 dtype=self._dtypes,
191 )
192 new_buffer[:self._buffer_index] = self.buffer
193 self._buffer = new_buffer
194 self._buffer_index += plist_size
195 self._event_buffer_counter += 1
196
198 """
199 Initialize the feather writer using pyarrow
200 """
201
202 self._schema = [
203 (name, numpy_to_pyarrow_type_map[dt]) for name, dt in self._dtypes
204 ]
205
206 self._feather_writer = ipc.RecordBatchFileWriter(
207 sink=self._filename,
208 schema=pa.schema(self._schema),
209 **self._writer_kwargs,
210 )
211
213 """
214 Initialize the parquet writer using pyarrow
215 """
216
217 self._schema = [
218 (name, numpy_to_pyarrow_type_map[dt]) for name, dt in self._dtypes
219 ]
220
221 self._parquet_writer = ParquetWriter(
222 self._filename, schema=pa.schema(self._schema), **self._writer_kwargs
223 )
224
226 """
227 Initialize the csv writer using pyarrow
228 """
229
230 self._schema = [
231 (name, numpy_to_pyarrow_type_map[dt]) for name, dt in self._dtypes
232 ]
233
234 self._csv_writer = CSVWriter(self._filename, schema=pa.schema(self._schema), **self._writer_kwargs)
235
237 """
238 Initialize the hdf5 writer using pytables
239 """
240
241 self._hdf5_writer = tables.open_file(
242 self._filename, mode="w", title="Belle2 Variables to HDF5"
243 )
244 filters = tables.Filters(complevel=1, complib="blosc:lz4", fletcher32=False)
245
246 # some variable names are not just A-Za-z0-9 so pytables complains but
247 # seems to work. Ignore warning
248 with warnings.catch_warnings():
249 warnings.simplefilter("ignore")
250
251 self._table = self._hdf5_writer.create_table(
252 "/", self._table_name, obj=np.zeros(0, self._dtypes), filters=filters, **self._writer_kwargs
253 )
254
256 """
257 Assign values for all variables for all particles in the particle list to the current event buffer
258 """
259 buf = self.event_buffer
260
261 # add some extra columns for bookkeeping
262 buf["__experiment__"] = self._evtmeta.getExperiment()
263 buf["__run__"] = self._evtmeta.getRun()
264 buf["__event__"] = self._evtmeta.getEvent()
265 buf["__production__"] = self._evtmeta.getProduction()
266 buf["__ncandidates__"] = len(buf)
267 buf["__candidate__"] = np.arange(len(buf))
268
269 # fill variables into buffer
270 vector = variables.variables.evaluateVariables(self._std_varnames, self._plist)
271 values = np.array(vector.data()).reshape(-1, len(self._varnames))
272 for name, col in zip(self._varnames, values.T):
273 buf[name] = col
274
275 @property
276 def buffer_full(self):
277 """
278 check if the buffer is full
279 """
280 return self._event_buffer_counter == self._event_buffer_size
281
282 def write_buffer(self):
283 """
284 write the buffer to the output file
285 """
286 if self._format == "hdf5":
287 """Create a new row in the hdf5 file with for each particle in the list"""
288 self._table.append(self.buffer)
289 else:
290 table = {name: self.buffer[name] for name, _ in self._dtypes}
291 pa_table = pa.table(table, schema=pa.schema(self._schema))
292 if self._format == "parquet":
293 self._parquet_writer.write_table(pa_table)
294 elif self._format == "csv":
295 self._csv_writer.write(pa_table)
296 elif self._format == "feather":
297 self._feather_writer.write_table(pa_table)
298
299 def event(self):
300 """
301 Event processing function
302
303 executes the fill_buffer function and writes the data to the output file
304 in chunks of event_buffer_size
305 """
306 self.append_buffer()
307 self.fill_event_buffer()
308 if self.buffer_full:
309 self.write_buffer()
310 self.clear_buffer()
311
312 def terminate(self):
313 """save and close the output"""
314 import ROOT # noqa
315 if len(self.buffer) > 0:
316 self.write_buffer()
317
318 if self._format == "hdf5":
319 self._table.flush()
320 self._hdf5_writer.close()
321 elif self._format == "parquet":
322 self._parquet_writer.close()
323 elif self._format == "csv":
324 self._csv_writer.close()
325 elif self._format == "feather":
326 self._feather_writer.close()
327 ROOT.Belle2.MetadataService.Instance().addNtuple(self._filename)
328
329
331 """
332 Legacy class to not break existing code
333 """
334
335 def __init__(self, listname, variables, filename, hdf_table_name: Optional[str] = None,):
336 super().__init__(listname, variables, filename, hdf_table_name)
337 assert self._filename.split(".")[-1] in ["h5", "hdf", "hdf5"], (
338 "Filename must end with .h5, .hdf or .hdf5 for HDF5 output. "
339 f"Got {self._filename}"
340 )
341
342
343def make_mcerrors_readable(dataframe, column="mcErrors"):
344 """
345 Take a dataframe containing a column with the output of the :b2:var:`mcErrors`
346 variable from :b2:mod:`VariablesToNTuple` and convert it to a readable set
347 of columns of the form ``{column}_{name}`` where column is the value of the
348 ``column`` argument and ``name`` is one of the :ref:`mcmatching`
349 error flags (without the leading 'c_').
350
351 Arguments:
352 dataframe(pandas.DataFrame): the pandas dataframe containing an ntuple
353 with column containing the output of the mcErrors variable
354 column(str): the name containing the values from the mcErrors variable
355 """
356 # Always avoid the top-level 'import ROOT'.
357 import ROOT # noqa
358
359 if column not in dataframe:
360 raise KeyError(f"Cannot find column '{column}'")
361
362 # convert mcErrors to int to be able to logical operate on it
363 mcErrors = dataframe[column].astype(int)
364
365 # and loop over all the c_ constants in the Belle2.MCMatching class
366 for flag in (e for e in dir(ROOT.Belle2.MCMatching) if e.startswith("c_")):
367 try:
368 value = int(getattr(ROOT.Belle2.MCMatching, flag))
369 except ValueError:
370 # probably the extraInfo column name, ignore
371 continue
372
373 # and set the column
374 name = column + flag[1:]
375 if value == 0:
376 dataframe[name] = mcErrors == 0
377 else:
378 dataframe[name] = (mcErrors & value) == value
std_vector(*args)
Definition __init__.py:142
_event_buffer_size
Event buffer size.
_parquet_writer
A list of tuples and py.DataTypes to define the pyarrow schema.
_buffer
event variables buffer (will be automatically grown if necessary)
_csv_writer
A list of tuples and py.DataTypes to define the pyarrow schema.
int _event_buffer_counter
Event buffer counter.
_std_varnames
std.vector of variable names
_plist
Pointer to the particle list.
_feather_writer
a writer object to write data into a feather file
list _schema
A list of tuples and py.DataTypes to define the pyarrow schema.
__init__(self, str listname, List[str] variables, str filename, Optional[str] hdf_table_name=None, int event_buffer_size=100, **writer_kwargs)
tuple _table_name
Table name in the hdf5 file.
int _buffer_index
current start index in the event variables buffer