Source code for smartBKG.b2modules.NN_trainer_module

##########################################################################
# basf2 (Belle II Analysis Software Framework)                           #
# Author: The Belle II Collaboration                                     #
#                                                                        #
# See git log for contributors and copyright holders.                    #
# This file is licensed under LGPL-3.0, see LICENSE.md.                  #
##########################################################################
import os
import shutil
import pandas as pd
import awkward as ak

import basf2 as b2
from ROOT import Belle2
import modularAnalysis as ma
from skim.WGs.fei import feiHadronicB0
from b2pandas_utils import VariablesToHDF5

from smartBKG.utils.preprocess import load_particle_list, preprocessed


[docs] class SaveFlag(b2.Module): """ Save event numbers to a Parquet file. Arguments: out_file (str): Output file path for saving the event numbers. Returns: None Note: This module should be added after the skimming process. """ def __init__(self, out_file=None): """ Initialize the SaveFlag module. :param out_file: Output file path for saving the event numbers. """ super().__init__() #: Output file path for saving the event numbers self.out_file = out_file def initialize(self): """ Initialize the data store and the list to save event numbers before processing events. """ #: Initialise event metadata from data store self.eventInfo = Belle2.PyStoreObj('EventMetaData') #: List to save event numbers of pass events self.pass_list = [] def event(self): """ Process each event and append event numbers to the pass list. """ self.pass_list.append(self.eventInfo.getEvent()) def terminate(self): """ Finalize the module and save the pass list to a Parquet file. """ ak.to_parquet(self.pass_list, self.out_file)
[docs] class TrainDataSaver(b2.Module): """ Save MCParticles to Pandas Dataframe. Arguments: output_file (str): Filename to save training data. Ending with ``parquet`` indicating fast mode, which will generate the final parquet file for training. Ending with ``h5`` indicating advanced mode, which will produce a temperary h5 file for further preprocessing. flag_file (str): Filename of the flag file indicating passing events. Returns: None """ def __init__( self, output_file, flag_file, ): """ Initialize the TrainDataSaver module. :param output_file: Filename to save training data to. :param flag_file: Filename of the flag file indicating passing events. """ super().__init__() #: Filename to save training data to self.output_file = output_file #: Filename of the flag file indicating passing events self.flag_list = ak.from_parquet(flag_file) #: Whether use fast mode or advanced mode self.fast_mode = output_file.endswith(".parquet") # delete output file if it already exists, since we will apend later if os.path.exists(output_file): os.remove(output_file) def initialize(self): """ Initialize the data store and the dictionary to save particle features before processing events. """ #: Initialise event metadata from data store self.eventInfo = Belle2.PyStoreObj('EventMetaData') #: Initialise event extra info from data store self.eventExtraInfo = Belle2.PyStoreObj('EventExtraInfo') #: Pandas dataframe to save particle features self.df_dict = pd.DataFrame() def event(self): """ Process each event and append event information to the dictionary. """ evtNum = self.eventInfo.getEvent() self.df_dict = pd.concat([ self.df_dict, load_particle_list(mcplist=Belle2.PyStoreArray("MCParticles"), evtNum=evtNum, label=(evtNum in self.flag_list)) ]) def terminate(self): """ Append events on disk in either of the two different ways and free memory. In fast mode, the dataframe containing particle-level information and skim labels is preprocessed and saved as a parquet file which is ready for NN training. In advanced mode, the dataframe is saved as a h5 file and waits for combination with event-level information before preprocessing. """ if self.fast_mode: ak.to_parquet(preprocessed(self.df_dict), self.output_file) else: self.df_dict.to_hdf(self.output_file, key='mc_information', mode='a', format='table', append=True) self.df_dict = pd.DataFrame()
[docs] class data_production(): """ Process data for training and save to Parquet file. Two modes are provided: Fast mode: save_vars set to None, produce the dataset with only the necessary information for the training. Advanced mode: save_vars set to a dictionary of event-level variables, run through hard-coded b2 steering code in self.process_b2script to produce the required particle lists and save the required variables, can be used for event-level cuts or evaluations of the NN performance. Arguments: in_dir (str): Input directory. out_dir (str): Output directory. job_id (int): Job ID for batch processing. save_vars (dict): Event-level variables to save for different particles. By default None for fast mode. In the example script having Y4S and B keys for the corresponding particle list. Returns: None """ def __init__(self, in_dir, out_dir, job_id, save_vars=None): """ Initialize the data_production object. :param in_dir: Input directory. :param out_dir: Output directory. :param job_id: Job ID for batch processing. :param save_vars: Event-level variables to save for different particles. By default None for fast mode. In the example script having Y4S and B keys for the corresponding particle list. """ dataName = '_submdst' flagName = '_flag' #: Input root file generated before skimming self.data = f'{in_dir}{dataName}{job_id}.root' #: Filename of the flag file indicating passing events self.flag = f'{in_dir}{flagName}{job_id}.parquet' if save_vars is not None: #: Temperary directory to keep intermediate files for advanced mode self.out_temp = f'{out_dir}_temp{job_id}/' os.makedirs(out_dir, exist_ok=True) os.makedirs(self.out_temp, exist_ok=True) #: Intermediate files self.temp_file = { 'MC': f'{self.out_temp}mc.h5', 'Y4S': f'{self.out_temp}y4s.h5', 'B': f'{self.out_temp}b.h5' } #: Final output Parquet file self.out_file = f'{out_dir}preprocessed{job_id}.parquet' #: Variables to save for different event levels self.save_vars = save_vars def process(self): """ Process the b2 steering file and the data generation. """ self.process_b2script() if self.save_vars is not None: self.merge_files() def process_b2script(self, num_events=2500): """ Skimming process with TrainDataSaver module. :param num_events: Maximum number of events to process. """ path = ma.create_path() ma.inputMdst(environmentType='default', filename=self.data, path=path) ma.buildEventShape(path=path) ma.buildEventKinematics(path=path) # process with advance mode if self.save_vars is not None: TrainDataSaver_module = TrainDataSaver( output_file=self.temp_file['MC'], flag_file=self.flag, ) path.add_module(TrainDataSaver_module) ma.fillParticleListFromMC('Upsilon(4S):mc', '', path=path) v2hdf5_y4s = VariablesToHDF5( 'Upsilon(4S):mc', self.save_vars['Y4S'], filename=self.temp_file['Y4S'], ) path.add_module(v2hdf5_y4s) fei_skim = feiHadronicB0(udstOutput=False, analysisGlobaltag=ma.getAnalysisGlobaltag()) fei_skim(path=path) fei_skim.postskim_path.add_module( "BestCandidateSelection", particleList="B0:generic", variable="extraInfo(SignalProbability)", outputVariable="rank_signalprob", numBest=1, ) # Key of saved table is the name of particle list v2hdf5_b = VariablesToHDF5( 'B0:generic', self.save_vars['B'], filename=self.temp_file['B'], ) fei_skim.postskim_path.add_module(v2hdf5_b) # process with fast mode else: TrainDataSaver_module = TrainDataSaver( output_file=self.out_file, flag_file=self.flag, ) path.add_module(TrainDataSaver_module) b2.process(path, max_event=num_events) print(b2.statistics) def merge_files(self): """ Merge file of particle-level information (MC) with those of event-level information (Y4S, B). Preprocess and save to disk as Parquet file in form of Awkward Array. """ df = pd.read_hdf(self.temp_file['MC'], key='mc_information') df_y4s = pd.read_hdf(self.temp_file['Y4S'], key='Upsilon(4S):mc') df_b = pd.read_hdf(self.temp_file['B'], key='B0:generic') df_merged = df_y4s.merge(df_b.drop(axis=1, labels=['icand', 'ncand']), how="left") decorr_df = df_merged.rename({'evt': 'evtNum'}, axis=1) ak.to_parquet(preprocessed(df, decorr_df), self.out_file) def clean_up(self): """ Clean up temporary files. """ # uncomment if needed for batch job # os.remove(self.data) os.remove(self.flag) if self.save_vars is not None: shutil.rmtree(self.out_temp)