Belle II Software  release-08-01-10
run_collector_path.py
1 #!/usr/bin/env python3
2 
3 
10 
11 import basf2
12 from basf2 import conditions as b2conditions
13 from basf2 import create_path, process
14 from basf2 import B2FATAL, B2INFO, B2WARNING
15 
16 # The backend provides the input data files to the job and you can get the list of
17 # files from this function
18 from caf.backends import get_input_data
19 
20 
21 def run_collectors():
22  """
23  Runs the CAF collector in a general way by passing serialised basf2 paths, input file
24  locations, and local databases from previous calibrations.
25  """
26  from basf2.pickle_path import get_path_from_file
27  import glob
28  import os
29  import json
30 
31  # We pass in basf2 and CAF options via the config.json file
32  with open("collector_config.json") as config_file:
33  config = json.load(config_file)
34 
35  # Create the database chain to use the necessary central DB global tags and local DBs if they are requested
36  # We deliberately override the normal database ordering because we don't want input files GTs to affect
37  # the processing. Only explicit GTs and intermediate local DBs made by the CAF should be added here.
38  b2conditions.reset()
39  b2conditions.override_globaltags()
40 
41  for db_type, database in config["database_chain"]:
42  if db_type == "local":
43  B2INFO(f"Adding Local Database {database[0]} to head of chain of local databases.")
44  b2conditions.prepend_testing_payloads(database[0])
45  else:
46  B2INFO(f"Using Global Tag {database}.")
47  b2conditions.prepend_globaltag(database)
48 
49  # create a path with all modules needed before calibration path is run.
50  collector_path = create_path()
51 
52  # Grab all the pickled path files. Should at least be one for the collector. And optionally
53  # one for the modules to run before the collector.
54  pre_collector_path_name = "pre_collector.path"
55  pickle_paths = glob.glob("./*.path")
56 
57  # Remove the pre-collector path from the overall list
58  pickle_paths = filter(lambda x: pre_collector_path_name not in x, pickle_paths)
59 
60  # Check for and add a path to run before the collector
61  if os.path.exists(pre_collector_path_name):
62  collector_path.add_path(get_path_from_file(pre_collector_path_name))
63  else:
64  B2INFO("Couldn't find any pickle files for pre-collector setup")
65 
66  # Check for any other paths to run (should only be collectors) and append them
67  if pickle_paths:
68  for pickle_path in pickle_paths:
69  collector_path.add_path(get_path_from_file(pickle_path))
70  else:
71  B2FATAL("Couldn't find any pickle files for collector path!")
72 
73  # Grab the input data. Can't continue without some
74  input_data = get_input_data()
75 
76  # Now we need to create a path that definitely has RootInput as a module.
77  main = create_path()
78  # Use this utility wrapper to check for RootInput or SeqRootInput and change params if necessary
79  from caf.utils import PathExtras
80  pe = PathExtras(collector_path)
81  if "RootInput" in pe:
82  root_input_mod = collector_path.modules()[pe.index("RootInput")]
83  root_input_mod.param("inputFileNames", input_data)
84  input_entry_sequences = get_entry_sequences(root_input_mod)
85  # If entrySquences is set, then we should make sure that it has the same length as the number of input files (if possible)
86  # Other parameters can be taken from the input directly, but this one is sensitive to the input file number.
87  if len(input_entry_sequences) == 1 and len(input_data) > 1:
88  root_input_mod.param("entrySequences", len(input_data)*input_entry_sequences)
89  elif "SeqRootInput" in pe:
90  root_input_mod = collector_path.modules()[pe.index("SeqRootInput")]
91  root_input_mod.param("inputFileNames", input_data)
92  else:
93  main.add_module("RootInput", inputFileNames=input_data)
94 
95  if "HistoManager" not in pe:
96  main.add_module("HistoManager", histoFileName="CollectorOutput.root")
97 
98  main.add_path(collector_path)
99  basf2.print_path(main)
100  process(main)
101  print(basf2.statistics)
102 
103 
104 def get_entry_sequences(root_input_module):
105  """
106  Finds the value of entrySequences (list) from the RootInput module.
107  """
108  for module_param_info in root_input_module.available_params():
109  if module_param_info.name == "entrySequences":
110  entry_sequences = module_param_info.values
111  break
112  else:
113  B2WARNING("entrySequences ModuleParamInfo couldn't be found! "
114  "Setting to empty list but this indicates that this script is out of date!")
115  entry_sequences = []
116  return entry_sequences
117 
118 
119 if __name__ == "__main__":
120  run_collectors()
std::map< ExpRun, std::pair< double, double > > filter(const std::map< ExpRun, std::pair< double, double >> &runs, double cut, std::map< ExpRun, std::pair< double, double >> &runsRemoved)
filter events to remove runs shorter than cut, it stores removed runs in runsRemoved
Definition: Splitter.cc:38