Belle II Software development
b2pandas_utils.py
1
8
9from typing import List
10import basf2
11import variables
12import tables
13import numpy as np
14import warnings
15from pyarrow.parquet import ParquetWriter
16from pyarrow.csv import CSVWriter
17import pyarrow as pa
18
19
20"""
21Python utilities to help create or manage ntuples and work with them in pandas
22"""
23
24numpy_to_pyarrow_type_map = {
25 np.int32: pa.int32(),
26 np.int64: pa.int64(),
27 np.uint32: pa.uint32(),
28 np.uint64: pa.uint64(),
29 np.float32: pa.float32(),
30 np.float64: pa.float64(),
31 np.bool_: pa.bool_(),
32 np.object_: pa.string(),
33 np.str_: pa.string(),
34}
35
36
37class VariablesToTable(basf2.Module):
38 """
39 Base class to dump ntuples into a non root format of your choosing
40 """
41
42 def __init__(self, listname: str, variables: List[str], filename: str, format: str):
43 """Constructor to initialize the internal state
44
45 Arguments:
46 listname(str): name of the particle list
47 variables(list(str)): list of variables to save for each particle
48 filename(str): name of the output file to be created
49 format(str): format of the output file, one of 'hdf5', 'parquet', 'csv'
50 """
51 super().__init__()
52
53 self._filename = filename
54
55 self._listname = listname
56
57 self._variables = variables
58
59 self._format = format
60
61 def initialize(self):
62 """Create the hdf5 file and list of variable objects to be used during
63 event processing."""
64 # Always avoid the top-level 'import ROOT'.
65 import ROOT # noqa
66
67 self._varnames = [
68 str(varname) for varname in variables.variables.resolveCollections(
70 *self._variables))]
71
72 self._var_objects = [variables.variables.getVariable(n) for n in self._varnames]
73
74
75 self._evtmeta = ROOT.Belle2.PyStoreObj("EventMetaData")
76 self._evtmeta.isRequired()
77
78 self._plist = ROOT.Belle2.PyStoreObj(self._listname)
79 self._plist.isRequired()
80
81 dtypes = [
82 ("__experiment__", np.int32), ("__run__", np.int32), ("__event__", np.uint32),
83 ("__production__", np.uint32), ("__candidate__", np.uint32), ("__ncandidates__", np.uint32)
84 ]
85 for name in self._varnames:
86 # only float variables for now
87 dtypes.append((name, np.float64))
88
89
90 self._dtypes = dtypes
91
92 if self._format == "hdf5":
94 elif self._format == "parquet":
96 elif self._format == "csv":
98 else:
99 raise ValueError(f"Unknown format {self._format}, supported formats are 'hdf5', 'parquet', 'csv'.")
100
102 """
103 Initialize the parquet writer using pyarrow
104 """
105
106 self._schema = [(name, numpy_to_pyarrow_type_map[dt]) for name, dt in self._dtypes]
107
108 self._parquet_writer = ParquetWriter(self._filename, schema=pa.schema(self._schema))
109
111 """
112 Initialize the csv writer using pyarrow
113 """
114
115 self._schema = [(name, numpy_to_pyarrow_type_map[dt]) for name, dt in self._dtypes]
116
117 self._csv_writer = CSVWriter(self._filename, schema=pa.schema(self._schema))
118
120 """
121 Initialize the hdf5 writer using pytables
122 """
123
124 self._hdf5_writer = tables.open_file(self._filename, mode="w", title="Belle2 Variables to HDF5")
125 filters = tables.Filters(complevel=1, complib='blosc:lz4', fletcher32=False)
126
127 # some variable names are not just A-Za-z0-9 so pytables complains but
128 # seems to work. Ignore warning
129 with warnings.catch_warnings():
130 warnings.simplefilter("ignore")
131
132 self._table = self._hdf5_writer.create_table("/", self._listname, obj=np.zeros(0, self._dtypes), filters=filters)
133
134 def fill_buffer(self):
135 """
136 collect all variables for the particle in a numpy array
137 """
138
139 # create a numpy array with the data
140 buf = np.empty(self._plist.getListSize(), dtype=self._dtypes)
141 # add some extra columns for bookkeeping
142 buf["__experiment__"] = self._evtmeta.getExperiment()
143 buf["__run__"] = self._evtmeta.getRun()
144 buf["__event__"] = self._evtmeta.getEvent()
145 buf["__production__"] = self._evtmeta.getProduction()
146 buf["__ncandidates__"] = len(buf)
147 buf["__candidate__"] = np.arange(len(buf))
148
149 for row, p in zip(buf, self._plist):
150 for name, v in zip(self._varnames, self._var_objects):
151 # pyroot proxy not working with callables, we should fix this.
152 # For now we need to go back by name and call it.
153 # should be `row[v.name] = v.func(p)`
154 row[name] = variables.variables.evaluate(v.name, p)
155 return buf
156
157 def event(self):
158 """
159 Event processing function
160 executes the fill_buffer function and writes the data to the output file
161 """
162 buf = self.fill_buffer()
163
164 if self._format == "hdf5":
165 """Create a new row in the hdf5 file with for each particle in the list"""
166 self._table.append(buf)
167 elif self._format == "parquet":
168 table = {name: buf[name] for name, _ in self._dtypes}
169 pa_table = pa.table(table, schema=pa.schema(self._schema))
170 self._parquet_writer.write_table(pa_table)
171 elif self._format == "csv":
172 table = {name: buf[name] for name, _ in self._dtypes}
173 pa_table = pa.table(table, schema=pa.schema(self._schema))
174 self._csv_writer.write(pa_table)
175
176 def terminate(self):
177 """save and close the output"""
178 import ROOT # noqa
179 if self._format == "hdf5":
180 self._table.flush()
181 self._hdf5_writer.close()
182 elif self._format == "parquet":
183 self._parquet_writer.close()
184 elif self._format == "csv":
185 self._csv_writer.close()
186 ROOT.Belle2.MetadataService.Instance().addNtuple(self._filename)
187
188
190 """
191 Legacy class to not break existing code
192 """
193
194 def __init__(self, listname, variables, filename):
195 super().__init__(listname, variables, filename, "hdf5")
196
197
198def make_mcerrors_readable(dataframe, column="mcErrors"):
199 """
200 Take a dataframe containing a column with the output of the :b2:var:`mcErrors`
201 variable from :b2:mod:`VariablesToNTuple` and convert it to a readable set
202 of columns of the form ``{column}_{name}`` where column is the value of the
203 ``column`` argument and ``name`` is one of the :ref:`mcmatching`
204 error flags (without the leading 'c_').
205
206 Arguments:
207 dataframe(pandas.DataFrame): the pandas dataframe containing an ntuple
208 with column containing the output of the mcErrors variable
209 column(str): the name containing the values from the mcErrors variable
210 """
211 # Always avoid the top-level 'import ROOT'.
212 import ROOT # noqa
213
214 if column not in dataframe:
215 raise KeyError(f"Cannot find column '{column}'")
216
217 # convert mcErrors to int to be able to logical operate on it
218 mcErrors = dataframe[column].astype(int)
219
220 # and loop over all the c_ constants in the Belle2.MCMatching class
221 for flag in (e for e in dir(ROOT.Belle2.MCMatching) if e.startswith("c_")):
222 try:
223 value = int(getattr(ROOT.Belle2.MCMatching, flag))
224 except ValueError:
225 # probably the extraInfo column name, ignore
226 continue
227
228 # and set the column
229 name = column + flag[1:]
230 if value == 0:
231 dataframe[name] = mcErrors == 0
232 else:
233 dataframe[name] = (mcErrors & value) == value
def std_vector(*args)
Definition: __init__.py:135
def __init__(self, listname, variables, filename)
_parquet_writer
a writer object to write data into a parquet file
_csv_writer
a writer object to write data into a csv file
def __init__(self, str listname, List[str] variables, str filename, str format)
_plist
Pointer to the particle list.
_var_objects
variable objects for each variable
_schema
A list of tuples and py.DataTypes to define the pyarrow schema.