Belle II Software  release-06-00-14
run_collector_path.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 
4 
11 
12 import basf2
13 from basf2 import conditions as b2conditions
14 from basf2 import create_path, process
15 from basf2 import B2FATAL, B2INFO, B2WARNING
16 
17 # The backend provides the input data files to the job and you can get the list of
18 # files from this function
19 from caf.backends import get_input_data
20 
21 
22 def run_collectors():
23  """
24  Runs the CAF collector in a general way by passing serialised basf2 paths, input file
25  locations, and local databases from previous calibrations.
26  """
27  from basf2.pickle_path import get_path_from_file
28  import glob
29  import os
30  import json
31 
32  # We pass in basf2 and CAF options via the config.json file
33  with open("collector_config.json", "r") as config_file:
34  config = json.load(config_file)
35 
36  # Create the database chain to use the necessary central DB global tags and local DBs if they are requested
37  # We deliberately override the normal database ordering because we don't want input files GTs to affect
38  # the processing. Only explicit GTs and intermediate local DBs made by the CAF should be added here.
39  b2conditions.reset()
40  b2conditions.override_globaltags()
41 
42  for db_type, database in config["database_chain"]:
43  if db_type == "local":
44  B2INFO(f"Adding Local Database {database[0]} to head of chain of local databases.")
45  b2conditions.prepend_testing_payloads(database[0])
46  else:
47  B2INFO(f"Using Global Tag {database}.")
48  b2conditions.prepend_globaltag(database)
49 
50  # create a path with all modules needed before calibration path is run.
51  collector_path = create_path()
52 
53  # Grab all the pickled path files. Should at least be one for the collector. And optionally
54  # one for the modules to run before the collector.
55  pre_collector_path_name = "pre_collector.path"
56  pickle_paths = glob.glob("./*.path")
57 
58  # Remove the pre-collector path from the overall list
59  pickle_paths = filter(lambda x: pre_collector_path_name not in x, pickle_paths)
60 
61  # Check for and add a path to run before the collector
62  if os.path.exists(pre_collector_path_name):
63  collector_path.add_path(get_path_from_file(pre_collector_path_name))
64  else:
65  B2INFO("Couldn't find any pickle files for pre-collector setup")
66 
67  # Check for any other paths to run (should only be collectors) and append them
68  if pickle_paths:
69  for pickle_path in pickle_paths:
70  collector_path.add_path(get_path_from_file(pickle_path))
71  else:
72  B2FATAL("Couldn't find any pickle files for collector path!")
73 
74  # Grab the input data. Can't continue without some
75  input_data = get_input_data()
76 
77  # Now we need to create a path that definitely has RootInput as a module.
78  main = create_path()
79  # Use this utility wrapper to check for RootInput or SeqRootInput and change params if necessary
80  from caf.utils import PathExtras
81  pe = PathExtras(collector_path)
82  if "RootInput" in pe:
83  root_input_mod = collector_path.modules()[pe.index("RootInput")]
84  root_input_mod.param("inputFileNames", input_data)
85  input_entry_sequences = get_entry_sequences(root_input_mod)
86  # If entrySquences is set, then we should make sure that it has the same length as the number of input files (if possible)
87  # Other parameters can be taken from the input directly, but this one is sensitive to the input file number.
88  if len(input_entry_sequences) == 1 and len(input_data) > 1:
89  root_input_mod.param("entrySequences", len(input_data)*input_entry_sequences)
90  elif "SeqRootInput" in pe:
91  root_input_mod = collector_path.modules()[pe.index("SeqRootInput")]
92  root_input_mod.param("inputFileNames", input_data)
93  else:
94  main.add_module("RootInput", inputFileNames=input_data)
95 
96  if "HistoManager" not in pe:
97  main.add_module("HistoManager", histoFileName="CollectorOutput.root")
98 
99  main.add_path(collector_path)
100  basf2.print_path(main)
101  process(main)
102  print(basf2.statistics)
103 
104 
105 def get_entry_sequences(root_input_module):
106  """
107  Finds the value of entrySequences (list) from the RootInput module.
108  """
109  for module_param_info in root_input_module.available_params():
110  if module_param_info.name == "entrySequences":
111  entry_sequences = module_param_info.values
112  break
113  else:
114  B2WARNING(("entrySequences ModuleParamInfo couldn't be found! "
115  "Setting to empty list but this indicates that this script is out of date!"))
116  entry_sequences = []
117  return entry_sequences
118 
119 
120 if __name__ == "__main__":
121  run_collectors()
std::map< ExpRun, std::pair< double, double > > filter(const std::map< ExpRun, std::pair< double, double >> &runs, double cut, std::map< ExpRun, std::pair< double, double >> &runsRemoved)
filter events to remove runs shorter than cut, it stores removed runs in runsRemoved
Definition: Splitter.cc:40