Belle II Software development
VariablesToTable Class Reference
Inheritance diagram for VariablesToTable:
VariablesToHDF5

Public Member Functions

def __init__ (self, str listname, List[str] variables, str filename, Optional[str] hdf_table_name=None, int event_buffer_size=100, **writer_kwargs)
 
def initialize (self)
 
def buffer (self)
 
def event_buffer (self)
 
def clear_buffer (self)
 
def append_buffer (self)
 
def initialize_feather_writer (self)
 
def initialize_parquet_writer (self)
 
def initialize_csv_writer (self)
 
def initialize_hdf5_writer (self)
 
def fill_event_buffer (self)
 
def buffer_full (self)
 
def write_buffer (self)
 
def event (self)
 
def terminate (self)
 

Protected Attributes

 _filename
 Output filename.
 
 _listname
 Particle list name.
 
 _variables
 List of variables.
 
 _format
 Output format.
 
 _table_name
 Table name in the hdf5 file.
 
 _event_buffer_size
 Event buffer size.
 
 _event_buffer_counter
 Event buffer counter.
 
 _writer_kwargs
 writer kwargs
 
 _varnames
 variable names
 
 _std_varnames
 std::vector of variable names
 
 _evtmeta
 Event metadata.
 
 _plist
 Pointer to the particle list.
 
 _dtypes
 The data type.
 
 _buffer
 event variables buffer (will be automatically grown if necessary)
 
 _buffer_index
 current start index in the event variables buffer
 
 _schema
 A list of tuples and py.DataTypes to define the pyarrow schema.
 
 _feather_writer
 a writer object to write data into a feather file
 
 _parquet_writer
 a writer object to write data into a parquet file
 
 _csv_writer
 a writer object to write data into a csv file
 
 _hdf5_writer
 The pytable file.
 
 _table
 The pytable.
 

Detailed Description

Base class to dump ntuples into a non root format of your choosing

Definition at line 38 of file b2pandas_utils.py.

Constructor & Destructor Documentation

◆ __init__()

def __init__ (   self,
str  listname,
List[str]  variables,
str  filename,
Optional[str]   hdf_table_name = None,
int   event_buffer_size = 100,
**  writer_kwargs 
)
Constructor to initialize the internal state

Arguments:
    listname(str): name of the particle list
    variables(list(str)): list of variables to save for each particle
    filename(str): name of the output file to be created.
        Needs to end with `.csv` for csv output, `.parquet` or `.pq` for parquet output,
        `.h5`, `.hdf` or `.hdf5` for hdf5 output and `.feather` or `.arrow` for feather output
    hdf_table_name(str): name of the table in the hdf5 file.
        If not provided, it will be the same as the listname
    event_buffer_size(int): number of events to buffer before writing to disk,
        higher values will use more memory but write faster and result in smaller files
    **writer_kwargs: additional keyword arguments to pass to the writer.
        For details, see the documentation of the writer in the apache arrow documentation.
        Only use, if you know what you are doing!

Reimplemented in VariablesToHDF5.

Definition at line 43 of file b2pandas_utils.py.

51 ):
52 """Constructor to initialize the internal state
53
54 Arguments:
55 listname(str): name of the particle list
56 variables(list(str)): list of variables to save for each particle
57 filename(str): name of the output file to be created.
58 Needs to end with `.csv` for csv output, `.parquet` or `.pq` for parquet output,
59 `.h5`, `.hdf` or `.hdf5` for hdf5 output and `.feather` or `.arrow` for feather output
60 hdf_table_name(str): name of the table in the hdf5 file.
61 If not provided, it will be the same as the listname
62 event_buffer_size(int): number of events to buffer before writing to disk,
63 higher values will use more memory but write faster and result in smaller files
64 **writer_kwargs: additional keyword arguments to pass to the writer.
65 For details, see the documentation of the writer in the apache arrow documentation.
66 Only use, if you know what you are doing!
67 """
68 super().__init__()
69
70 self._filename = filename
71
72 self._listname = listname
73
74 self._variables = list(set(variables))
75
76 file_type = self._filename.split(".")[-1]
77 if file_type in ["csv"]:
78 self._format = "csv"
79 elif file_type in ["parquet", "pq"]:
80 self._format = "parquet"
81 elif file_type in ["h5", "hdf", "hdf5"]:
82 self._format = "hdf5"
83 elif file_type in ["feather", "arrow"]:
84 self._format = "feather"
85 else:
86 raise ValueError(
87 f"Unknown file type ending .{file_type}, supported types are 'csv', "
88 "'parquet', 'pq', 'h5', 'hdf', 'hdf5', 'feather' or 'arrow'"
89 )
90
91 self._table_name = (
92 hdf_table_name if hdf_table_name is not None else self._listname
93 )
94
95 self._event_buffer_size = event_buffer_size
96
97 self._event_buffer_counter = 0
98
99 self._writer_kwargs = writer_kwargs
100

Member Function Documentation

◆ append_buffer()

def append_buffer (   self)
"Append" a new event to the buffer by moving the buffer index forward by particle list size

Automatically replaces the buffer by a larger one if necessary

Definition at line 178 of file b2pandas_utils.py.

178 def append_buffer(self):
179 """
180 "Append" a new event to the buffer by moving the buffer index forward by particle list size
181
182 Automatically replaces the buffer by a larger one if necessary
183 """
184 plist_size = self._plist.getListSize()
185 if (plist_size + self._buffer_index) > len(self._buffer):
186 new_buffer = np.empty(
187 # factor 1.5 larger or at least as large as necessary
188 max(int(len(self._buffer) * 1.5), self._buffer_index + plist_size),
189 dtype=self._dtypes,
190 )
191 new_buffer[:self._buffer_index] = self.buffer
192 self._buffer = new_buffer
193 self._buffer_index += plist_size
194 self._event_buffer_counter += 1
195

◆ buffer()

def buffer (   self)
The buffer slice across multiple entries

Definition at line 158 of file b2pandas_utils.py.

158 def buffer(self):
159 """
160 The buffer slice across multiple entries
161 """
162 return self._buffer[:self._buffer_index]
163

◆ buffer_full()

def buffer_full (   self)
check if the buffer is full

Definition at line 275 of file b2pandas_utils.py.

275 def buffer_full(self):
276 """
277 check if the buffer is full
278 """
279 return self._event_buffer_counter == self._event_buffer_size
280

◆ clear_buffer()

def clear_buffer (   self)
Reset the buffer event counter and index

Definition at line 171 of file b2pandas_utils.py.

171 def clear_buffer(self):
172 """
173 Reset the buffer event counter and index
174 """
175 self._event_buffer_counter = 0
176 self._buffer_index = 0
177

◆ event()

def event (   self)
Event processing function

executes the fill_buffer function and writes the data to the output file
in chunks of event_buffer_size

Definition at line 298 of file b2pandas_utils.py.

298 def event(self):
299 """
300 Event processing function
301
302 executes the fill_buffer function and writes the data to the output file
303 in chunks of event_buffer_size
304 """
305 self.append_buffer()
306 self.fill_event_buffer()
307 if self.buffer_full:
308 self.write_buffer()
309 self.clear_buffer()
310

◆ event_buffer()

def event_buffer (   self)
The buffer slice for the current event

Definition at line 165 of file b2pandas_utils.py.

165 def event_buffer(self):
166 """
167 The buffer slice for the current event
168 """
169 return self._buffer[self._buffer_index - self._plist.getListSize(): self._buffer_index]
170

◆ fill_event_buffer()

def fill_event_buffer (   self)
Assign values for all variables for all particles in the particle list to the current event buffer

Definition at line 254 of file b2pandas_utils.py.

254 def fill_event_buffer(self):
255 """
256 Assign values for all variables for all particles in the particle list to the current event buffer
257 """
258 buf = self.event_buffer
259
260 # add some extra columns for bookkeeping
261 buf["__experiment__"] = self._evtmeta.getExperiment()
262 buf["__run__"] = self._evtmeta.getRun()
263 buf["__event__"] = self._evtmeta.getEvent()
264 buf["__production__"] = self._evtmeta.getProduction()
265 buf["__ncandidates__"] = len(buf)
266 buf["__candidate__"] = np.arange(len(buf))
267
268 # fill variables into buffer
269 vector = variables.variables.evaluateVariables(self._std_varnames, self._plist)
270 values = np.array(vector.data()).reshape(-1, len(self._varnames))
271 for name, col in zip(self._varnames, values.T):
272 buf[name] = col
273

◆ initialize()

def initialize (   self)
Setup variable lists, pointers, buffers and file writers

Definition at line 101 of file b2pandas_utils.py.

101 def initialize(self):
102 """
103 Setup variable lists, pointers, buffers and file writers
104 """
105 # Always avoid the top-level 'import ROOT'.
106 import ROOT # noqa
107
108
109 self._varnames = [
110 str(varname)
111 for varname in variables.variables.resolveCollections(
112 variables.std_vector(*self._variables)
113 )
114 ]
115
116
117 self._std_varnames = variables.std_vector(*self._varnames)
118
119
120 self._evtmeta = ROOT.Belle2.PyStoreObj("EventMetaData")
121 self._evtmeta.isRequired()
122
123
124 self._plist = ROOT.Belle2.PyStoreObj(self._listname)
125 self._plist.isRequired()
126
127 dtypes = [
128 ("__experiment__", np.int32),
129 ("__run__", np.int32),
130 ("__event__", np.uint32),
131 ("__production__", np.uint32),
132 ("__candidate__", np.uint32),
133 ("__ncandidates__", np.uint32),
134 ]
135 for name in self._varnames:
136 # only float variables for now
137 dtypes.append((name, np.float64))
138
139
140 self._dtypes = dtypes
141
142
143 self._buffer = np.empty(self._event_buffer_size * 10, dtype=self._dtypes)
144
145
146 self._buffer_index = 0
147
148 if self._format == "hdf5":
149 self.initialize_hdf5_writer()
150 elif self._format == "parquet":
151 self.initialize_parquet_writer()
152 elif self._format == "csv":
153 self.initialize_csv_writer()
154 elif self._format == "feather":
155 self.initialize_feather_writer()
156
def std_vector(*args)
Definition: __init__.py:142

◆ initialize_csv_writer()

def initialize_csv_writer (   self)
Initialize the csv writer using pyarrow

Definition at line 224 of file b2pandas_utils.py.

224 def initialize_csv_writer(self):
225 """
226 Initialize the csv writer using pyarrow
227 """
228
229 self._schema = [
230 (name, numpy_to_pyarrow_type_map[dt]) for name, dt in self._dtypes
231 ]
232
233 self._csv_writer = CSVWriter(self._filename, schema=pa.schema(self._schema), **self._writer_kwargs)
234

◆ initialize_feather_writer()

def initialize_feather_writer (   self)
Initialize the feather writer using pyarrow

Definition at line 196 of file b2pandas_utils.py.

196 def initialize_feather_writer(self):
197 """
198 Initialize the feather writer using pyarrow
199 """
200
201 self._schema = [
202 (name, numpy_to_pyarrow_type_map[dt]) for name, dt in self._dtypes
203 ]
204
205 self._feather_writer = ipc.RecordBatchFileWriter(
206 sink=self._filename,
207 schema=pa.schema(self._schema),
208 **self._writer_kwargs,
209 )
210

◆ initialize_hdf5_writer()

def initialize_hdf5_writer (   self)
Initialize the hdf5 writer using pytables

Definition at line 235 of file b2pandas_utils.py.

235 def initialize_hdf5_writer(self):
236 """
237 Initialize the hdf5 writer using pytables
238 """
239
240 self._hdf5_writer = tables.open_file(
241 self._filename, mode="w", title="Belle2 Variables to HDF5"
242 )
243 filters = tables.Filters(complevel=1, complib="blosc:lz4", fletcher32=False)
244
245 # some variable names are not just A-Za-z0-9 so pytables complains but
246 # seems to work. Ignore warning
247 with warnings.catch_warnings():
248 warnings.simplefilter("ignore")
249
250 self._table = self._hdf5_writer.create_table(
251 "/", self._table_name, obj=np.zeros(0, self._dtypes), filters=filters, **self._writer_kwargs
252 )
253

◆ initialize_parquet_writer()

def initialize_parquet_writer (   self)
Initialize the parquet writer using pyarrow

Definition at line 211 of file b2pandas_utils.py.

211 def initialize_parquet_writer(self):
212 """
213 Initialize the parquet writer using pyarrow
214 """
215
216 self._schema = [
217 (name, numpy_to_pyarrow_type_map[dt]) for name, dt in self._dtypes
218 ]
219
220 self._parquet_writer = ParquetWriter(
221 self._filename, schema=pa.schema(self._schema), **self._writer_kwargs
222 )
223

◆ terminate()

def terminate (   self)
save and close the output

Definition at line 311 of file b2pandas_utils.py.

311 def terminate(self):
312 """save and close the output"""
313 import ROOT # noqa
314 if len(self.buffer) > 0:
315 self.write_buffer()
316
317 if self._format == "hdf5":
318 self._table.flush()
319 self._hdf5_writer.close()
320 elif self._format == "parquet":
321 self._parquet_writer.close()
322 elif self._format == "csv":
323 self._csv_writer.close()
324 elif self._format == "feather":
325 self._feather_writer.close()
326 ROOT.Belle2.MetadataService.Instance().addNtuple(self._filename)
327
328

◆ write_buffer()

def write_buffer (   self)
write the buffer to the output file

Definition at line 281 of file b2pandas_utils.py.

281 def write_buffer(self):
282 """
283 write the buffer to the output file
284 """
285 if self._format == "hdf5":
286 """Create a new row in the hdf5 file with for each particle in the list"""
287 self._table.append(self.buffer)
288 else:
289 table = {name: self.buffer[name] for name, _ in self._dtypes}
290 pa_table = pa.table(table, schema=pa.schema(self._schema))
291 if self._format == "parquet":
292 self._parquet_writer.write_table(pa_table)
293 elif self._format == "csv":
294 self._csv_writer.write(pa_table)
295 elif self._format == "feather":
296 self._feather_writer.write_table(pa_table)
297

Member Data Documentation

◆ _buffer

_buffer
protected

event variables buffer (will be automatically grown if necessary)

Definition at line 143 of file b2pandas_utils.py.

◆ _buffer_index

_buffer_index
protected

current start index in the event variables buffer

Definition at line 146 of file b2pandas_utils.py.

◆ _csv_writer

_csv_writer
protected

a writer object to write data into a csv file

Definition at line 233 of file b2pandas_utils.py.

◆ _dtypes

_dtypes
protected

The data type.

Definition at line 140 of file b2pandas_utils.py.

◆ _event_buffer_counter

_event_buffer_counter
protected

Event buffer counter.

Definition at line 97 of file b2pandas_utils.py.

◆ _event_buffer_size

_event_buffer_size
protected

Event buffer size.

Definition at line 95 of file b2pandas_utils.py.

◆ _evtmeta

_evtmeta
protected

Event metadata.

Definition at line 120 of file b2pandas_utils.py.

◆ _feather_writer

_feather_writer
protected

a writer object to write data into a feather file

Definition at line 205 of file b2pandas_utils.py.

◆ _filename

_filename
protected

Output filename.

Definition at line 70 of file b2pandas_utils.py.

◆ _format

_format
protected

Output format.

Definition at line 78 of file b2pandas_utils.py.

◆ _hdf5_writer

_hdf5_writer
protected

The pytable file.

Definition at line 240 of file b2pandas_utils.py.

◆ _listname

_listname
protected

Particle list name.

Definition at line 72 of file b2pandas_utils.py.

◆ _parquet_writer

_parquet_writer
protected

a writer object to write data into a parquet file

Definition at line 220 of file b2pandas_utils.py.

◆ _plist

_plist
protected

Pointer to the particle list.

Definition at line 124 of file b2pandas_utils.py.

◆ _schema

_schema
protected

A list of tuples and py.DataTypes to define the pyarrow schema.

Definition at line 201 of file b2pandas_utils.py.

◆ _std_varnames

_std_varnames
protected

std::vector of variable names

Definition at line 117 of file b2pandas_utils.py.

◆ _table

_table
protected

The pytable.

Definition at line 250 of file b2pandas_utils.py.

◆ _table_name

_table_name
protected

Table name in the hdf5 file.

Definition at line 91 of file b2pandas_utils.py.

◆ _variables

_variables
protected

List of variables.

Definition at line 74 of file b2pandas_utils.py.

◆ _varnames

_varnames
protected

variable names

Definition at line 109 of file b2pandas_utils.py.

◆ _writer_kwargs

_writer_kwargs
protected

writer kwargs

Definition at line 99 of file b2pandas_utils.py.


The documentation for this class was generated from the following file: