Belle II Software development
example_complex.py
1
8
9"""A complicated example calibration that takes several input data lists from raw data and performs
10multiple calibrations. Only the second calibration will have its payloads placed into the final
11outputdb directory by b2caf-prompt-run.
12
13We make it so that this calibration depends on the result of a completely
14different one 'example_simple'. Even though that calibration will not be run in this process, the automated
15system can discover this dependency and use it when submitting tasks."""
16
17from prompt import CalibrationSettings, INPUT_DATA_FILTERS
18
19
26
27# We decide to only run this script once the simple one has run. This only affects the automated system when scheduling
28# tasks. This script can always be run standalone.
29from prompt.calibrations.example_simple import settings as example_simple
30
31
32settings = CalibrationSettings(name="Example Complex",
33 expert_username="ddossett",
34 subsystem="example",
35 description=__doc__,
36 input_data_formats=["raw"],
37 input_data_names=["physics", "cosmics", "Bcosmics"],
38 input_data_filters={"physics": [f"NOT {INPUT_DATA_FILTERS['Magnet']['On']}",
39 INPUT_DATA_FILTERS["Data Tag"]["hadron_calib"],
40 INPUT_DATA_FILTERS["Data Quality Tag"]["Good"],
41 INPUT_DATA_FILTERS["Beam Energy"]["4S"],
42 INPUT_DATA_FILTERS["Run Type"]["physics"]],
43 "cosmics": [INPUT_DATA_FILTERS['Magnet']['Off'],
44 INPUT_DATA_FILTERS["Data Tag"]["cosmic_calib"],
45 INPUT_DATA_FILTERS["Data Quality Tag"]["Bad For Alignment"],
46 INPUT_DATA_FILTERS["Beam Energy"]["Continuum"],
47 f"NOT {INPUT_DATA_FILTERS['Run Type']['physics']}"],
48 "Bcosmics": [INPUT_DATA_FILTERS["Data Tag"]["cosmic_calib"],
49 INPUT_DATA_FILTERS["Data Quality Tag"]["Good"],
50 INPUT_DATA_FILTERS["Beam Energy"]["4S"]]},
51 depends_on=[example_simple],
52 expert_config={
53 "physics_prescale": 0.2,
54 "max_events_per_file": 100,
55 "max_files_per_run": 2,
56 "payload_boundaries": []
57 },
58 produced_payloads=[])
59
60# The values in expert_config above are the DEFAULT for this script. They will be overwritten by values in caf_config.json
61
62# Note that you are forced to import the relevant script that you depend on, even though you never use it.
63# This is to make sure that this script won't run unless the dependent one exists, as well as automatically
64# checking for circular dependency via Python's import statements.
65
66
67
68
76
77
78def get_calibrations(input_data, **kwargs):
79 """
80 Parameters:
81 input_data (dict): Should contain every name from the 'input_data_names' variable as a key.
82 Each value is a dictionary with {"/path/to/file_e1_r5.root": IoV(1,5,1,5), ...}. Useful for
83 assigning to calibration.files_to_iov
84
85 **kwargs: Configuration options to be sent in. Since this may change we use kwargs as a way to help prevent
86 backwards compatibility problems. But you could use the correct arguments in b2caf-prompt-run for this
87 release explicitly if you want to.
88
89 Currently only kwargs["requested_iov"] and kwargs["expert_config"] are used.
90
91 "requested_iov" is the IoV range of the bucket and your payloads should correspond to this range.
92 However your highest payload IoV should be open ended e.g. IoV(3,4,-1,-1)
93
94 "expert_config" is the input configuration. It takes default values from your `CalibrationSettings` but these are
95 overwritten by values from the 'expert_config' key in your input `caf_config.json` file when running ``b2caf-prompt-run``.
96
97 Returns:
98 list(caf.framework.Calibration): All of the calibration objects we want to assign to the CAF process
99 """
100 # Set up config options
101 import basf2
102 from basf2 import register_module, create_path
103 from ROOT import Belle2 # noqa: make the Belle2 namespace available
104 from ROOT.Belle2 import TestCalibrationAlgorithm, TestBoundarySettingAlgorithm
105 from caf.framework import Calibration, Collection
106 from caf.strategies import SequentialBoundaries
107 from caf.utils import vector_from_runs, ExpRun, IoV
108
109 # In this script we want to use three different sources of input data, and reconstruct them
110 # differently before the Collector module runs.
111
112 # Get the input files from the input_data variable
113 file_to_iov_physics = input_data["physics"]
114 file_to_iov_cosmics = input_data["cosmics"]
115 file_to_iov_Bcosmics = input_data["Bcosmics"]
116
117 # We might have requested an enormous amount of data across a requested range.
118 # There's a LOT more files than runs!
119 # Lets set some limits because this calibration doesn't need that much to run.
120 expert_config = kwargs.get("expert_config")
121 max_files_per_run = expert_config["max_files_per_run"]
122 basf2.B2INFO(f"Reducing to a maximum of {max_files_per_run} files per run.")
123
124 # We filter out any more than 2 files per run. The input data files are sorted alphabetically by b2caf-prompt-run
125 # already. This procedure respects that ordering
126 from prompt.utils import filter_by_max_files_per_run
127
128 reduced_file_to_iov_physics = filter_by_max_files_per_run(file_to_iov_physics, max_files_per_run)
129 input_files_physics = list(reduced_file_to_iov_physics.keys())
130 basf2.B2INFO(f"Total number of physics files actually used as input = {len(input_files_physics)}")
131
132 reduced_file_to_iov_cosmics = filter_by_max_files_per_run(file_to_iov_cosmics, max_files_per_run)
133 input_files_cosmics = list(reduced_file_to_iov_cosmics.keys())
134 basf2.B2INFO(f"Total number of cosmics files actually used as input = {len(input_files_cosmics)}")
135
136 reduced_file_to_iov_Bcosmics = filter_by_max_files_per_run(file_to_iov_Bcosmics, max_files_per_run)
137 input_files_Bcosmics = list(reduced_file_to_iov_Bcosmics.keys())
138 basf2.B2INFO(f"Total number of Bcosmics files actually used as input = {len(input_files_Bcosmics)}")
139
140 # Get the overall request IoV we want to cover, including the end values. But we will probably want to replace the end values
141 # with -1, -1 when setting the output payload IoVs.
142 requested_iov = kwargs.get("requested_iov", None)
143
144 # The actual value our output IoV payload should have. Notice that we've set it open ended.
145 output_iov = IoV(requested_iov.exp_low, requested_iov.run_low, -1, -1)
146
147
150 col_test_physics = register_module("CaTest")
151 # This has to be 'run' otherwise our SequentialBoundaries strategy can't work.
152 # We could make it optional, based on the contents of the expert_config.
153 col_test_physics.param("granularity", "run")
154 col_test_physics.param("spread", 4)
155
156 col_test_Bcosmics = register_module("CaTest")
157 col_test_Bcosmics.param("granularity", "all")
158 col_test_Bcosmics.param("spread", 1)
159
160 col_test_cosmics = register_module("CaTest")
161 col_test_cosmics.param("granularity", "all")
162 col_test_cosmics.param("spread", 10)
163
164
167
168 # Let's specify that not all events will be used per file for every Collection
169 # Just set this with one element in the list if you use it. The value will be duplicated in collector subjobs if the number
170 # of input files is larger than 1.
171 max_events = expert_config["max_events_per_file"]
172 root_input = register_module("RootInput",
173 entrySequences=[f"0:{max_events}"]
174 )
175
176 # And/or we could set a prescale so that only a fraction of events pass onwards.
177 # This is most useful for randomly selecting events throughout input files.
178 # Note that if you set the entrySequences AS WELL as a prescale then you will be combining the entrySequences and prescale
179 # so that only a few events are passed into the Prescale module, and then only a fraction of those will continue to the
180 # Collector module.
181 prescale = expert_config["physics_prescale"]
182 prescale_mod = register_module("Prescale", prescale=prescale)
183 empty_path = create_path()
184 prescale_mod.if_false(empty_path, basf2.AfterConditionPath.END)
185
186 rec_path_physics = create_path()
187 rec_path_physics.add_module(root_input)
188 rec_path_physics.add_module(prescale_mod)
189 # could now add reconstruction modules dependent on the type of input data
190
191 rec_path_cosmics = create_path()
192 rec_path_cosmics.add_module(root_input)
193 # could now add reconstruction modules dependent on the type of input data
194
195 rec_path_Bcosmics = create_path()
196 rec_path_Bcosmics.add_module(root_input)
197 # could now add reconstruction modules dependent on the type of input data
198
199
202 alg_test1 = TestCalibrationAlgorithm()
203 alg_test2 = TestBoundarySettingAlgorithm()
204
205 # Send in a list of boundaries for our algorithm class and SequentialBoundaries strategy to use.
206 # A boundary is the STARTING run number for a new payload and all data from runs between this run and the next
207 # boundary will be used.
208 # In our algorithm the first run in our data is always a starting boundary, so we can pass an empty list here
209 # safely and still have it work.
210
211 # We make sure that the first payload begins at the start of the requested IoV.
212 # This is a quirk of SequentialBoundaries strategy as there must always be one boundary to START from.
213 # You could elect to always set this yourself manually, but that seems error prone.
214 payload_boundaries = [ExpRun(output_iov.exp_low, output_iov.run_low)]
215 # Now we can add the boundaries that exist in the expert config. They are extra boundaries, so that we don't have
216 # to set the initial one every time. If this is an empty list then we effectively run like the SingleIoV strategy.
217 payload_boundaries.extend([ExpRun(*boundary) for boundary in expert_config["payload_boundaries"]])
218 basf2.B2INFO(f"Expert set payload boundaries are: {expert_config['payload_boundaries']}")
219 # Now set them all
220 alg_test2.setBoundaries(vector_from_runs(payload_boundaries)) # This takes boundaries from the expert_config
221
222
227 collection_physics = Collection(collector=col_test_physics,
228 input_files=input_files_physics,
229 pre_collector_path=rec_path_physics,
230 max_collector_jobs=4
231 )
232
233 collection_cosmics = Collection(collector=col_test_cosmics,
234 input_files=input_files_cosmics,
235 pre_collector_path=rec_path_cosmics,
236 max_collector_jobs=2
237 )
238
239 collection_Bcosmics = Collection(collector=col_test_Bcosmics,
240 input_files=input_files_Bcosmics,
241 pre_collector_path=rec_path_Bcosmics,
242 max_collector_jobs=2
243 )
244
245
247
248 # We will set up two Calibrations. One which depends on the other.
249 # However, the first Calibration will generate payloads that we don't want to save in our output database for upload.
250 # Basically we want to ignore the payloads during the b2caf-prompt-run copying of the outputdb contents.
251 # But we still use them as input to the next calibration.
252
253 cal_test1 = Calibration("TestCalibration_cosmics")
254 # Add collections in with unique names
255 cal_test1.add_collection(name="cosmics", collection=collection_cosmics)
256 cal_test1.add_collection(name="Bcosmics", collection=collection_Bcosmics)
257 cal_test1.algorithms = [alg_test1]
258 # Do this for the default AlgorithmStrategy to force the output payload IoV
259 cal_test1.algorithms[0].params = {"apply_iov": output_iov}
260 # Mark this calibration as one whose payloads should not be copied at the end.
261 cal_test1.save_payloads = False
262
263 cal_test2 = Calibration("TestCalibration_physics")
264 # Add collections in with unique names
265 cal_test2.add_collection(name="physics", collection=collection_physics)
266 cal_test2.algorithms = [alg_test2]
267 # We apply a a different strategy that will allow us to split the data we run over into chunks based on the boundaries above
268 cal_test2.strategies = SequentialBoundaries
269 # Do this to force the output payload IoV. Note the different name to above!
270 cal_test2.algorithms[0].params["iov_coverage"] = output_iov
271
272 cal_test2.depends_on(cal_test1)
273
274 # You must return all calibrations you want to run in the prompt process
275 return [cal_test1, cal_test2]
276
277