Belle II Software development
run_collector_path.py
1#!/usr/bin/env python3
2
3
10
11import basf2
12from basf2 import conditions as b2conditions
13from basf2 import create_path, process
14from basf2 import B2FATAL, B2INFO, B2WARNING
15
16# The backend provides the input data files to the job and you can get the list of
17# files from this function
18from caf.backends import get_input_data
19
20
21def run_collectors():
22 """
23 Runs the CAF collector in a general way by passing serialised basf2 paths, input file
24 locations, and local databases from previous calibrations.
25 """
26 from basf2.pickle_path import get_path_from_file
27 import glob
28 import os
29 import json
30
31 # We pass in basf2 and CAF options via the config.json file
32 with open("collector_config.json") as config_file:
33 config = json.load(config_file)
34
35 # Create the database chain to use the necessary central DB global tags and local DBs if they are requested
36 # We deliberately override the normal database ordering because we don't want input files GTs to affect
37 # the processing. Only explicit GTs and intermediate local DBs made by the CAF should be added here.
38 b2conditions.reset()
39 b2conditions.override_globaltags()
40
41 for db_type, database in config["database_chain"]:
42 if db_type == "local":
43 B2INFO(f"Adding Local Database {database[0]} to head of chain of local databases.")
44 b2conditions.prepend_testing_payloads(database[0])
45 else:
46 B2INFO(f"Using Global Tag {database}.")
47 b2conditions.prepend_globaltag(database)
48
49 # create a path with all modules needed before calibration path is run.
50 collector_path = create_path()
51
52 # Grab all the pickled path files. Should at least be one for the collector. And optionally
53 # one for the modules to run before the collector.
54 pre_collector_path_name = "pre_collector.path"
55 pickle_paths = glob.glob("./*.path")
56
57 # Remove the pre-collector path from the overall list
58 pickle_paths = filter(lambda x: pre_collector_path_name not in x, pickle_paths)
59
60 # Check for and add a path to run before the collector
61 if os.path.exists(pre_collector_path_name):
62 collector_path.add_path(get_path_from_file(pre_collector_path_name))
63 else:
64 B2INFO("Couldn't find any pickle files for pre-collector setup")
65
66 # Check for any other paths to run (should only be collectors) and append them
67 if pickle_paths:
68 for pickle_path in pickle_paths:
69 collector_path.add_path(get_path_from_file(pickle_path))
70 else:
71 B2FATAL("Couldn't find any pickle files for collector path!")
72
73 # Grab the input data. Can't continue without some
74 input_data = get_input_data()
75
76 # Now we need to create a path that definitely has RootInput as a module.
77 main = create_path()
78 # Use this utility wrapper to check for RootInput or SeqRootInput and change params if necessary
79 from caf.utils import PathExtras
80 pe = PathExtras(collector_path)
81 if "RootInput" in pe:
82 root_input_mod = collector_path.modules()[pe.index("RootInput")]
83 root_input_mod.param("inputFileNames", input_data)
84 input_entry_sequences = get_entry_sequences(root_input_mod)
85 # If entrySquences is set, then we should make sure that it has the same length as the number of input files (if possible)
86 # Other parameters can be taken from the input directly, but this one is sensitive to the input file number.
87 if len(input_entry_sequences) == 1 and len(input_data) > 1:
88 root_input_mod.param("entrySequences", len(input_data)*input_entry_sequences)
89 elif "SeqRootInput" in pe:
90 root_input_mod = collector_path.modules()[pe.index("SeqRootInput")]
91 root_input_mod.param("inputFileNames", input_data)
92 else:
93 main.add_module("RootInput", inputFileNames=input_data)
94
95 if "HistoManager" not in pe:
96 main.add_module("HistoManager", histoFileName="CollectorOutput.root")
97
98 main.add_path(collector_path)
99 basf2.print_path(main)
100 process(main)
101 print(basf2.statistics)
102
103
104def get_entry_sequences(root_input_module):
105 """
106 Finds the value of entrySequences (list) from the RootInput module.
107 """
108 for module_param_info in root_input_module.available_params():
109 if module_param_info.name == "entrySequences":
110 entry_sequences = module_param_info.values
111 break
112 else:
113 B2WARNING("entrySequences ModuleParamInfo couldn't be found! "
114 "Setting to empty list but this indicates that this script is out of date!")
115 entry_sequences = []
116 return entry_sequences
117
118
119if __name__ == "__main__":
120 run_collectors()