Belle II Software light-2411-aldebaran
b2pandas_utils.py
1
8
9from typing import List, Optional
10import basf2
11import variables
12import tables
13import numpy as np
14import warnings
15from pyarrow.parquet import ParquetWriter
16from pyarrow.csv import CSVWriter
17from pyarrow import ipc
18import pyarrow as pa
19
20
21"""
22Python utilities to help create or manage ntuples and work with them in pandas
23"""
24
25numpy_to_pyarrow_type_map = {
26 np.int32: pa.int32(),
27 np.int64: pa.int64(),
28 np.uint32: pa.uint32(),
29 np.uint64: pa.uint64(),
30 np.float32: pa.float32(),
31 np.float64: pa.float64(),
32 np.bool_: pa.bool_(),
33 np.object_: pa.string(),
34 np.str_: pa.string(),
35}
36
37
38class VariablesToTable(basf2.Module):
39 """
40 Base class to dump ntuples into a non root format of your choosing
41 """
42
44 self,
45 listname: str,
46 variables: List[str],
47 filename: str,
48 hdf_table_name: Optional[str] = None,
49 event_buffer_size: int = 100,
50 **writer_kwargs,
51 ):
52 """Constructor to initialize the internal state
53
54 Arguments:
55 listname(str): name of the particle list
56 variables(list(str)): list of variables to save for each particle
57 filename(str): name of the output file to be created.
58 Needs to end with `.csv` for csv output, `.parquet` or `.pq` for parquet output,
59 `.h5`, `.hdf` or `.hdf5` for hdf5 output and `.feather` or `.arrow` for feather output
60 hdf_table_name(str): name of the table in the hdf5 file.
61 If not provided, it will be the same as the listname
62 event_buffer_size(int): number of events to buffer before writing to disk,
63 higher values will use more memory but write faster and result in smaller files
64 **writer_kwargs: additional keyword arguments to pass to the writer.
65 For details, see the documentation of the writer in the apache arrow documentation.
66 Only use, if you know what you are doing!
67 """
68 super().__init__()
69
70 self._filename = filename
71
72 self._listname = listname
73
74 self._variables = list(set(variables))
75
76 file_type = self._filename.split(".")[-1]
77 if file_type in ["csv"]:
78 self._format = "csv"
79 elif file_type in ["parquet", "pq"]:
80 self._format = "parquet"
81 elif file_type in ["h5", "hdf", "hdf5"]:
82 self._format = "hdf5"
83 elif file_type in ["feather", "arrow"]:
84 self._format = "feather"
85 else:
86 raise ValueError(
87 f"Unknown file type ending .{file_type}, supported types are 'csv', "
88 "'parquet', 'pq', 'h5', 'hdf', 'hdf5', 'feather' or 'arrow'"
89 )
90
91 self._table_name = (
92 hdf_table_name if hdf_table_name is not None else self._listname
93 )
94
95 self._event_buffer_size = event_buffer_size
96
97 self._buffer = None
98
100
101 self._writer_kwargs = writer_kwargs
102
103 def initialize(self):
104 """Create the hdf5 file and list of variable objects to be used during
105 event processing."""
106 # Always avoid the top-level 'import ROOT'.
107 import ROOT # noqa
108
109
110 self._varnames = [
111 str(varname)
112 for varname in variables.variables.resolveCollections(
114 )
115 ]
116
117 self._var_objects = [variables.variables.getVariable(n) for n in self._varnames]
118
119
120 self._evtmeta = ROOT.Belle2.PyStoreObj("EventMetaData")
121 self._evtmeta.isRequired()
122
123 self._plist = ROOT.Belle2.PyStoreObj(self._listname)
124 self._plist.isRequired()
125
126 dtypes = [
127 ("__experiment__", np.int32),
128 ("__run__", np.int32),
129 ("__event__", np.uint32),
130 ("__production__", np.uint32),
131 ("__candidate__", np.uint32),
132 ("__ncandidates__", np.uint32),
133 ]
134 for name in self._varnames:
135 # only float variables for now
136 dtypes.append((name, np.float64))
137
138
139 self._dtypes = dtypes
140
141 if self._format == "hdf5":
143 elif self._format == "parquet":
145 elif self._format == "csv":
147 elif self._format == "feather":
149
151 """
152 Initialize the feather writer using pyarrow
153 """
154
155 self._schema = [
156 (name, numpy_to_pyarrow_type_map[dt]) for name, dt in self._dtypes
157 ]
158
159 self._feather_writer = ipc.RecordBatchFileWriter(
160 sink=self._filename,
161 schema=pa.schema(self._schema),
162 **self._writer_kwargs
163 )
164
166 """
167 Initialize the parquet writer using pyarrow
168 """
169
170 self._schema = [
171 (name, numpy_to_pyarrow_type_map[dt]) for name, dt in self._dtypes
172 ]
173
174 self._parquet_writer = ParquetWriter(
175 self._filename, schema=pa.schema(self._schema), **self._writer_kwargs
176 )
177
179 """
180 Initialize the csv writer using pyarrow
181 """
182
183 self._schema = [
184 (name, numpy_to_pyarrow_type_map[dt]) for name, dt in self._dtypes
185 ]
186
187 self._csv_writer = CSVWriter(self._filename, schema=pa.schema(self._schema), **self._writer_kwargs)
188
190 """
191 Initialize the hdf5 writer using pytables
192 """
193
194 self._hdf5_writer = tables.open_file(
195 self._filename, mode="w", title="Belle2 Variables to HDF5"
196 )
197 filters = tables.Filters(complevel=1, complib="blosc:lz4", fletcher32=False)
198
199 # some variable names are not just A-Za-z0-9 so pytables complains but
200 # seems to work. Ignore warning
201 with warnings.catch_warnings():
202 warnings.simplefilter("ignore")
203
204 self._table = self._hdf5_writer.create_table(
205 "/", self._table_name, obj=np.zeros(0, self._dtypes), filters=filters, **self._writer_kwargs
206 )
207
209 """
210 collect all variables for the particle in a numpy array
211 """
212
213 # create a numpy array with the data
214 buf = np.empty(self._plist.getListSize(), dtype=self._dtypes)
215 # add some extra columns for bookkeeping
216 buf["__experiment__"] = self._evtmeta.getExperiment()
217 buf["__run__"] = self._evtmeta.getRun()
218 buf["__event__"] = self._evtmeta.getEvent()
219 buf["__production__"] = self._evtmeta.getProduction()
220 buf["__ncandidates__"] = len(buf)
221 buf["__candidate__"] = np.arange(len(buf))
222
223 for row, p in zip(buf, self._plist):
224 for name, v in zip(self._varnames, self._var_objects):
225 # pyroot proxy not working with callables, we should fix this.
226 # For now we need to go back by name and call it.
227 # should be `row[v.name] = v.func(p)`
228 row[name] = variables.variables.evaluate(v.name, p)
229 return buf
230
231 def fill_buffer(self):
232 """
233 fill a buffer over multiple events and return it, when self.
234 """
235 if self._event_buffer_counter == 0:
236 self._buffer = self.fill_event_buffer()
237 else:
238 self._buffer = np.concatenate((self._buffer, self.fill_event_buffer()))
239
240 self._event_buffer_counter += 1
242 self._event_buffer_counter = 0
243 return self._buffer
244 return None
245
246 def write_buffer(self, buf):
247 """
248 write the buffer to the output file
249 """
250
251 if self._format == "hdf5":
252 """Create a new row in the hdf5 file with for each particle in the list"""
253 self._table.append(buf)
254 else:
255 table = {name: buf[name] for name, _ in self._dtypes}
256 pa_table = pa.table(table, schema=pa.schema(self._schema))
257 if self._format == "parquet":
258 self._parquet_writer.write_table(pa_table)
259 elif self._format == "csv":
260 self._csv_writer.write(pa_table)
261 elif self._format == "feather":
262 self._feather_writer.write_table(pa_table)
263
264 def event(self):
265 """
266 Event processing function
267 executes the fill_buffer function and writes the data to the output file
268 """
269 buf = self.fill_buffer()
270 if buf is None:
271 return
272 self.write_buffer(buf)
273
274 def terminate(self):
275 """save and close the output"""
276 import ROOT # noqa
277 if self._event_buffer_counter > 0:
278 self.write_buffer(self._buffer)
279
280 if self._format == "hdf5":
281 self._table.flush()
282 self._hdf5_writer.close()
283 elif self._format == "parquet":
284 self._parquet_writer.close()
285 elif self._format == "csv":
286 self._csv_writer.close()
287 elif self._format == "feather":
288 self._feather_writer.close()
289 ROOT.Belle2.MetadataService.Instance().addNtuple(self._filename)
290
291
293 """
294 Legacy class to not break existing code
295 """
296
297 def __init__(self, listname, variables, filename, hdf_table_name: Optional[str] = None,):
298 super().__init__(listname, variables, filename, hdf_table_name)
299 assert self._filename.split(".")[-1] in ["h5", "hdf", "hdf5"], (
300 "Filename must end with .h5, .hdf or .hdf5 for HDF5 output. "
301 f"Got {self._filename}"
302 )
303
304
305def make_mcerrors_readable(dataframe, column="mcErrors"):
306 """
307 Take a dataframe containing a column with the output of the :b2:var:`mcErrors`
308 variable from :b2:mod:`VariablesToNTuple` and convert it to a readable set
309 of columns of the form ``{column}_{name}`` where column is the value of the
310 ``column`` argument and ``name`` is one of the :ref:`mcmatching`
311 error flags (without the leading 'c_').
312
313 Arguments:
314 dataframe(pandas.DataFrame): the pandas dataframe containing an ntuple
315 with column containing the output of the mcErrors variable
316 column(str): the name containing the values from the mcErrors variable
317 """
318 # Always avoid the top-level 'import ROOT'.
319 import ROOT # noqa
320
321 if column not in dataframe:
322 raise KeyError(f"Cannot find column '{column}'")
323
324 # convert mcErrors to int to be able to logical operate on it
325 mcErrors = dataframe[column].astype(int)
326
327 # and loop over all the c_ constants in the Belle2.MCMatching class
328 for flag in (e for e in dir(ROOT.Belle2.MCMatching) if e.startswith("c_")):
329 try:
330 value = int(getattr(ROOT.Belle2.MCMatching, flag))
331 except ValueError:
332 # probably the extraInfo column name, ignore
333 continue
334
335 # and set the column
336 name = column + flag[1:]
337 if value == 0:
338 dataframe[name] = mcErrors == 0
339 else:
340 dataframe[name] = (mcErrors & value) == value
def std_vector(*args)
Definition: __init__.py:135
def __init__(self, listname, variables, filename, Optional[str] hdf_table_name=None)
_event_buffer_counter
Event buffer counter.
_event_buffer_size
Event buffer size.
_parquet_writer
a writer object to write data into a parquet file
def __init__(self, str listname, List[str] variables, str filename, Optional[str] hdf_table_name=None, int event_buffer_size=100, **writer_kwargs)
_table_name
Table name in the hdf5 file.
_csv_writer
a writer object to write data into a csv file
_plist
Pointer to the particle list.
_var_objects
variable objects for each variable
_feather_writer
a writer object to write data into a feather file
_schema
A list of tuples and py.DataTypes to define the pyarrow schema.