Belle II Software  release-05-01-25
run_collector_path.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 
4 import basf2
5 from basf2 import conditions as b2conditions
6 from basf2 import create_path, process
7 from basf2 import B2FATAL, B2INFO, B2WARNING
8 
9 # The backend provides the input data files to the job and you can get the list of
10 # files from this function
11 from caf.backends import get_input_data
12 
13 
14 def run_collectors():
15  """
16  Runs the CAF collector in a general way by passing serialised basf2 paths, input file
17  locations, and local databases from previous calibrations.
18  """
19  from basf2.pickle_path import get_path_from_file
20  import glob
21  import os
22  import json
23 
24  # We pass in basf2 and CAF options via the config.json file
25  with open("collector_config.json", "r") as config_file:
26  config = json.load(config_file)
27 
28  # Create the database chain to use the necessary central DB global tags and local DBs if they are requested
29  # We deliberately override the normal database ordering because we don't want input files GTs to affect
30  # the processing. Only explicit GTs and intermediate local DBs made by the CAF should be added here.
31  b2conditions.reset()
32  b2conditions.override_globaltags()
33 
34  for db_type, database in config["database_chain"]:
35  if db_type == "local":
36  B2INFO(f"Adding Local Database {database[0]} to head of chain of local databases.")
37  b2conditions.prepend_testing_payloads(database[0])
38  else:
39  B2INFO(f"Using Global Tag {database}.")
40  b2conditions.prepend_globaltag(database)
41 
42  # create a path with all modules needed before calibration path is run.
43  collector_path = create_path()
44 
45  # Grab all the pickled path files. Should at least be one for the collector. And optionally
46  # one for the modules to run before the collector.
47  pre_collector_path_name = "pre_collector.path"
48  pickle_paths = glob.glob("./*.path")
49 
50  # Remove the pre-collector path from the overall list
51  pickle_paths = filter(lambda x: pre_collector_path_name not in x, pickle_paths)
52 
53  # Check for and add a path to run before the collector
54  if os.path.exists(pre_collector_path_name):
55  collector_path.add_path(get_path_from_file(pre_collector_path_name))
56  else:
57  B2INFO("Couldn't find any pickle files for pre-collector setup")
58 
59  # Check for any other paths to run (should only be collectors) and append them
60  if pickle_paths:
61  for pickle_path in pickle_paths:
62  collector_path.add_path(get_path_from_file(pickle_path))
63  else:
64  B2FATAL("Couldn't find any pickle files for collector path!")
65 
66  # Grab the input data. Can't continue without some
67  input_data = get_input_data()
68 
69  # Now we need to create a path that definitely has RootInput as a module.
70  main = create_path()
71  # Use this utility wrapper to check for RootInput or SeqRootInput and change params if necessary
72  from caf.utils import PathExtras
73  pe = PathExtras(collector_path)
74  if "RootInput" in pe:
75  root_input_mod = collector_path.modules()[pe.index("RootInput")]
76  root_input_mod.param("inputFileNames", input_data)
77  input_entry_sequences = get_entry_sequences(root_input_mod)
78  # If entrySquences is set, then we should make sure that it has the same length as the number of input files (if possible)
79  # Other parameters can be taken from the input directly, but this one is sensitive to the input file number.
80  if len(input_entry_sequences) == 1 and len(input_data) > 1:
81  root_input_mod.param("entrySequences", len(input_data)*input_entry_sequences)
82  elif "SeqRootInput" in pe:
83  root_input_mod = collector_path.modules()[pe.index("SeqRootInput")]
84  root_input_mod.param("inputFileNames", input_data)
85  else:
86  main.add_module("RootInput", inputFileNames=input_data)
87 
88  if "HistoManager" not in pe:
89  main.add_module("HistoManager", histoFileName="CollectorOutput.root")
90 
91  main.add_path(collector_path)
92  basf2.print_path(main)
93  process(main)
94  print(basf2.statistics)
95 
96 
97 def get_entry_sequences(root_input_module):
98  """
99  Finds the value of entrySequences (list) from the RootInput module.
100  """
101  for module_param_info in root_input_module.available_params():
102  if module_param_info.name == "entrySequences":
103  entry_sequences = module_param_info.values
104  break
105  else:
106  B2WARNING(("entrySequences ModuleParamInfo couldn't be found! "
107  "Setting to empty list but this indicates that this script is out of date!"))
108  entry_sequences = []
109  return entry_sequences
110 
111 
112 if __name__ == "__main__":
113  run_collectors()
Belle2::filter
std::map< ExpRun, std::pair< double, double > > filter(const std::map< ExpRun, std::pair< double, double >> &runs, double cut, std::map< ExpRun, std::pair< double, double >> &runsRemoved)
filter events to remove runs shorter than cut, it stores removed runs in runsRemoved
Definition: Splitter.cc:43
basf2.pickle_path
Definition: pickle_path.py:1