15from pyarrow.parquet
import ParquetWriter
16from pyarrow.csv
import CSVWriter
21Python utilities to help create or manage ntuples and work with them in pandas
24numpy_to_pyarrow_type_map = {
27 np.uint32: pa.uint32(),
28 np.uint64: pa.uint64(),
29 np.float32: pa.float32(),
30 np.float64: pa.float64(),
32 np.object_: pa.string(),
39 Base class to dump ntuples into a non root format of your choosing
42 def __init__(self, listname: str, variables: List[str], filename: str, format: str):
43 """Constructor to initialize the internal state
46 listname(str): name of the particle list
47 variables(list(str)): list of variables to save for each particle
48 filename(str): name of the output file to be created
49 format(str): format of the output file, one of
'hdf5',
'parquet',
'csv'
62 """Create the hdf5 file and list of variable objects to be used during
68 str(varname)
for varname
in variables.variables.resolveCollections(
75 self.
_evtmeta = ROOT.Belle2.PyStoreObj(
"EventMetaData")
82 (
"__experiment__", np.int32), (
"__run__", np.int32), (
"__event__", np.uint32),
83 (
"__production__", np.uint32), (
"__candidate__", np.uint32), (
"__ncandidates__", np.uint32)
87 dtypes.append((name, np.float64))
99 raise ValueError(f
"Unknown format {self._format}, supported formats are 'hdf5', 'parquet', 'csv'.")
103 Initialize the parquet writer using pyarrow
106 self._schema = [(name, numpy_to_pyarrow_type_map[dt]) for name, dt
in self.
_dtypes]
112 Initialize the csv writer using pyarrow
115 self._schema = [(name, numpy_to_pyarrow_type_map[dt]) for name, dt
in self.
_dtypes]
121 Initialize the hdf5 writer using pytables
125 filters = tables.Filters(complevel=1, complib=
'blosc:lz4', fletcher32=
False)
129 with warnings.catch_warnings():
130 warnings.simplefilter(
"ignore")
136 collect all variables for the particle
in a numpy array
140 buf = np.empty(self.
_plist.getListSize(), dtype=self.
_dtypes)
142 buf[
"__experiment__"] = self.
_evtmeta.getExperiment()
143 buf[
"__run__"] = self.
_evtmeta.getRun()
144 buf[
"__event__"] = self.
_evtmeta.getEvent()
145 buf[
"__production__"] = self.
_evtmeta.getProduction()
146 buf[
"__ncandidates__"] = len(buf)
147 buf[
"__candidate__"] = np.arange(len(buf))
149 for row, p
in zip(buf, self.
_plist):
154 row[name] = variables.variables.evaluate(v.name, p)
159 Event processing function
160 executes the fill_buffer function and writes the data to the output file
165 """Create a new row in the hdf5 file with for each particle in the list"""
167 elif self.
_format ==
"parquet":
168 table = {name: buf[name]
for name, _
in self.
_dtypes}
169 pa_table = pa.table(table, schema=pa.schema(self.
_schema))
172 table = {name: buf[name]
for name, _
in self.
_dtypes}
173 pa_table = pa.table(table, schema=pa.schema(self.
_schema))
177 """save and close the output"""
182 elif self.
_format ==
"parquet":
186 ROOT.Belle2.MetadataService.Instance().addNtuple(self.
_filename)
191 Legacy class to not break existing code
195 super().
__init__(listname, variables, filename,
"hdf5")
198def make_mcerrors_readable(dataframe, column="mcErrors"):
200 Take a dataframe containing a column with the output of the :b2:var:`mcErrors`
201 variable
from :b2:mod:`VariablesToNTuple`
and convert it to a readable set
202 of columns of the form ``{column}_{name}`` where column
is the value of the
203 ``column`` argument
and ``name``
is one of the :ref:`mcmatching`
204 error flags (without the leading
'c_').
207 dataframe(pandas.DataFrame): the pandas dataframe containing an ntuple
208 with column containing the output of the mcErrors variable
209 column(str): the name containing the values
from the mcErrors variable
214 if column
not in dataframe:
215 raise KeyError(f
"Cannot find column '{column}'")
218 mcErrors = dataframe[column].astype(int)
221 for flag
in (e
for e
in dir(ROOT.Belle2.MCMatching)
if e.startswith(
"c_")):
223 value = int(getattr(ROOT.Belle2.MCMatching, flag))
229 name = column + flag[1:]
231 dataframe[name] = mcErrors == 0
233 dataframe[name] = (mcErrors & value) == value
def __init__(self, listname, variables, filename)
def initialize_parquet_writer(self)
def initialize_csv_writer(self)
_parquet_writer
a writer object to write data into a parquet file
_csv_writer
a writer object to write data into a csv file
def __init__(self, str listname, List[str] variables, str filename, str format)
def initialize_hdf5_writer(self)
_filename
Output filename.
_variables
List of variables.
_plist
Pointer to the particle list.
_var_objects
variable objects for each variable
_hdf5_writer
The pytable file.
_listname
Particle list name.
_schema
A list of tuples and py.DataTypes to define the pyarrow schema.