Belle II Software development
NN_trainer_module.py
1
8import os
9import shutil
10import pandas as pd
11import awkward as ak
12
13import basf2 as b2
14from ROOT import Belle2
15import modularAnalysis as ma
16from skim.WGs.fei import feiHadronicB0
17from b2pandas_utils import VariablesToHDF5
18
19from smartBKG.utils.preprocess import load_particle_list, preprocessed
20
21
22class SaveFlag(b2.Module):
23 """
24 Save event numbers to a Parquet file.
25
26 Arguments:
27 out_file (str): Output file path for saving the event numbers.
28
29 Returns:
30 None
31
32 Note:
33 This module should be added after the skimming process.
34 """
35
36 def __init__(self, out_file=None):
37 """
38 Initialize the SaveFlag module.
39
40 :param out_file: Output file path for saving the event numbers.
41 """
42 super().__init__()
43
44 self.out_file = out_file
45
46 def initialize(self):
47 """
48 Initialize the data store and the list to save event numbers before processing events.
49 """
50
51 self.eventInfo = Belle2.PyStoreObj('EventMetaData')
52
53 self.pass_list = []
54
55 def event(self):
56 """
57 Process each event and append event numbers to the pass list.
58 """
59 self.pass_list.append(self.eventInfo.getEvent())
60
61 def terminate(self):
62 """
63 Finalize the module and save the pass list to a Parquet file.
64 """
65 ak.to_parquet(self.pass_list, self.out_file)
66
67
68class TrainDataSaver(b2.Module):
69 """
70 Save MCParticles to Pandas Dataframe.
71
72 Arguments:
73 output_file (str): Filename to save training data.
74 Ending with ``parquet`` indicating fast mode, which will generate the final parquet file for training.
75 Ending with ``h5`` indicating advanced mode, which will produce a temperary h5 file for further preprocessing.
76 flag_file (str): Filename of the flag file indicating passing events.
77
78 Returns:
79 None
80 """
81
83 self,
84 output_file,
85 flag_file,
86 ):
87 """
88 Initialize the TrainDataSaver module.
89
90 :param output_file: Filename to save training data to.
91 :param flag_file: Filename of the flag file indicating passing events.
92 """
93 super().__init__()
94
95 self.output_file = output_file
96
97 self.flag_list = ak.from_parquet(flag_file)
98
99 self.fast_mode = output_file.endswith(".parquet")
100
101 # delete output file if it already exists, since we will apend later
102 if os.path.exists(output_file):
103 os.remove(output_file)
104
105 def initialize(self):
106 """
107 Initialize the data store and the dictionary to save particle features before processing events.
108 """
109
110 self.eventInfo = Belle2.PyStoreObj('EventMetaData')
111
112 self.eventExtraInfo = Belle2.PyStoreObj('EventExtraInfo')
113
114 self.df_dict = pd.DataFrame()
115
116 def event(self):
117 """
118 Process each event and append event information to the dictionary.
119 """
120 evtNum = self.eventInfo.getEvent()
121 self.df_dict = pd.concat([
122 self.df_dict,
123 load_particle_list(mcplist=Belle2.PyStoreArray("MCParticles"), evtNum=evtNum, label=(evtNum in self.flag_list))
124 ])
125
126 def terminate(self):
127 """
128 Append events on disk in either of the two different ways and free memory.
129
130 In fast mode, the dataframe containing particle-level information and skim labels is preprocessed
131 and saved as a parquet file which is ready for NN training.
132
133 In advanced mode, the dataframe is saved as a h5 file and waits for combination with event-level information
134 before preprocessing.
135 """
136 if self.fast_mode:
137 ak.to_parquet(preprocessed(self.df_dict), self.output_file)
138 else:
139 self.df_dict.to_hdf(self.output_file, key='mc_information', mode='a', format='table', append=True)
140 self.df_dict = pd.DataFrame()
141
142
144 """
145 Process data for training and save to Parquet file. Two modes are provided:
146 Fast mode: save_vars set to None, produce the dataset with only the necessary information for the training.
147 Advanced mode: save_vars set to a dictionary of event-level variables,
148 run through hard-coded b2 steering code in self.process_b2script to produce the required particle lists
149 and save the required variables, can be used for event-level cuts or evaluations of the NN performance.
150
151 Arguments:
152 in_dir (str): Input directory.
153 out_dir (str): Output directory.
154 job_id (int): Job ID for batch processing.
155 save_vars (dict): Event-level variables to save for different particles.
156 By default None for fast mode.
157 In the example script having Y4S and B keys for the corresponding particle list.
158
159 Returns:
160 None
161 """
162
163 def __init__(self, in_dir, out_dir, job_id, save_vars=None):
164 """
165 Initialize the data_production object.
166
167 :param in_dir: Input directory.
168 :param out_dir: Output directory.
169 :param job_id: Job ID for batch processing.
170 :param save_vars: Event-level variables to save for different particles.
171 By default None for fast mode.
172 In the example script having Y4S and B keys for the corresponding particle list.
173 """
174 dataName = '_submdst'
175 flagName = '_flag'
176
177 self.data = f'{in_dir}{dataName}{job_id}.root'
178
179 self.flag = f'{in_dir}{flagName}{job_id}.parquet'
180 if save_vars is not None:
181
182 self.out_temp = f'{out_dir}_temp{job_id}/'
183 os.makedirs(out_dir, exist_ok=True)
184 os.makedirs(self.out_temp, exist_ok=True)
185
186 self.temp_file = {
187 'MC': f'{self.out_temp}mc.h5',
188 'Y4S': f'{self.out_temp}y4s.h5',
189 'B': f'{self.out_temp}b.h5'
190 }
191
192 self.out_file = f'{out_dir}preprocessed{job_id}.parquet'
193
194 self.save_vars = save_vars
195
196 def process(self):
197 """
198 Process the b2 steering file and the data generation.
199 """
200 self.process_b2script()
201 if self.save_vars is not None:
202 self.merge_files()
203
204 def process_b2script(self, num_events=2500):
205 """
206 Skimming process with TrainDataSaver module.
207
208 :param num_events: Maximum number of events to process.
209 """
210 path = ma.create_path()
211
212 ma.inputMdst(environmentType='default', filename=self.data, path=path)
213 ma.buildEventShape(path=path)
214 ma.buildEventKinematics(path=path)
215
216 # process with advance mode
217 if self.save_vars is not None:
218 TrainDataSaver_module = TrainDataSaver(
219 output_file=self.temp_file['MC'],
220 flag_file=self.flag,
221 )
222 path.add_module(TrainDataSaver_module)
223 ma.fillParticleListFromMC('Upsilon(4S):mc', '', path=path)
224 v2hdf5_y4s = VariablesToHDF5(
225 'Upsilon(4S):mc',
226 self.save_vars['Y4S'],
227 filename=self.temp_file['Y4S'],
228 )
229 path.add_module(v2hdf5_y4s)
230
231 fei_skim = feiHadronicB0(udstOutput=False, analysisGlobaltag=ma.getAnalysisGlobaltag())
232 fei_skim(path=path)
233 fei_skim.postskim_path.add_module(
234 "BestCandidateSelection",
235 particleList="B0:generic",
236 variable="extraInfo(SignalProbability)",
237 outputVariable="rank_signalprob",
238 numBest=1,
239 )
240 # Key of saved table is the name of particle list
241 v2hdf5_b = VariablesToHDF5(
242 'B0:generic',
243 self.save_vars['B'],
244 filename=self.temp_file['B'],
245 )
246 fei_skim.postskim_path.add_module(v2hdf5_b)
247 # process with fast mode
248 else:
249 TrainDataSaver_module = TrainDataSaver(
250 output_file=self.out_file,
251 flag_file=self.flag,
252 )
253 path.add_module(TrainDataSaver_module)
254 b2.process(path, max_event=num_events)
255 print(b2.statistics)
256
257 def merge_files(self):
258 """
259 Merge file of particle-level information (MC) with those of event-level information (Y4S, B).
260 Preprocess and save to disk as Parquet file in form of Awkward Array.
261 """
262 df = pd.read_hdf(self.temp_file['MC'], key='mc_information')
263 df_y4s = pd.read_hdf(self.temp_file['Y4S'], key='Upsilon(4S):mc')
264 df_b = pd.read_hdf(self.temp_file['B'], key='B0:generic')
265 df_merged = df_y4s.merge(df_b.drop(axis=1, labels=['icand', 'ncand']), how="left")
266 decorr_df = df_merged.rename({'evt': 'evtNum'}, axis=1)
267 ak.to_parquet(preprocessed(df, decorr_df), self.out_file)
268
269 def clean_up(self):
270 """
271 Clean up temporary files.
272 """
273 # uncomment if needed for batch job
274 # os.remove(self.data)
275 os.remove(self.flag)
276 if self.save_vars is not None:
277 shutil.rmtree(self.out_temp)
A (simplified) python wrapper for StoreArray.
Definition: PyStoreArray.h:72
a (simplified) python wrapper for StoreObjPtr.
Definition: PyStoreObj.h:67
out_file
Output file path for saving the event numbers.
pass_list
List to save event numbers of pass events.
eventInfo
Initialise event metadata from data store.
def __init__(self, out_file=None)
fast_mode
Whether use fast mode or advanced mode.
eventExtraInfo
Initialise event extra info from data store.
df_dict
Pandas dataframe to save particle features.
flag_list
Filename of the flag file indicating passing events.
output_file
Filename to save training data to.
def __init__(self, output_file, flag_file)
eventInfo
Initialise event metadata from data store.
out_file
Final output Parquet file.
save_vars
Variables to save for different event levels.
data
Input root file generated before skimming.
def process_b2script(self, num_events=2500)
out_temp
Temperary directory to keep intermediate files for advanced mode.
flag
Filename of the flag file indicating passing events.
def __init__(self, in_dir, out_dir, job_id, save_vars=None)