Belle II Software development
example_complex.py
1
8
9"""A complicated example calibration that takes several input data lists from raw data and performs
10multiple calibrations. Only the second calibration will have its payloads placed into the final
11outputdb directory by b2caf-prompt-run.
12
13We make it so that this calibration depends on the result of a completely
14different one 'example_simple'. Even though that calibration will not be run in this process, the automated
15system can discover this dependency and use it when submitting tasks."""
16
17from prompt import CalibrationSettings, INPUT_DATA_FILTERS
18
19
26
27# We decide to only run this script once the simple one has run. This only affects the automated system when scheduling
28# tasks. This script can always be run standalone.
29from prompt.calibrations.example_simple import settings as example_simple
30
31
32settings = CalibrationSettings(name="Example Complex",
33 expert_username="ddossett",
34 description=__doc__,
35 input_data_formats=["raw"],
36 input_data_names=["physics", "cosmics", "Bcosmics"],
37 input_data_filters={"physics": [f"NOT {INPUT_DATA_FILTERS['Magnet']['On']}",
38 INPUT_DATA_FILTERS["Data Tag"]["hadron_calib"],
39 INPUT_DATA_FILTERS["Data Quality Tag"]["Good"],
40 INPUT_DATA_FILTERS["Beam Energy"]["4S"],
41 INPUT_DATA_FILTERS["Run Type"]["physics"]],
42 "cosmics": [INPUT_DATA_FILTERS['Magnet']['Off'],
43 INPUT_DATA_FILTERS["Data Tag"]["cosmic_calib"],
44 INPUT_DATA_FILTERS["Data Quality Tag"]["Bad For Alignment"],
45 INPUT_DATA_FILTERS["Beam Energy"]["Continuum"],
46 f"NOT {INPUT_DATA_FILTERS['Run Type']['physics']}"],
47 "Bcosmics": [INPUT_DATA_FILTERS["Data Tag"]["cosmic_calib"],
48 INPUT_DATA_FILTERS["Data Quality Tag"]["Good"],
49 INPUT_DATA_FILTERS["Beam Energy"]["4S"]]},
50 depends_on=[example_simple],
51 expert_config={
52 "physics_prescale": 0.2,
53 "max_events_per_file": 100,
54 "max_files_per_run": 2,
55 "payload_boundaries": []
56 })
57
58# The values in expert_config above are the DEFAULT for this script. They will be overwritten by values in caf_config.json
59
60# Note that you are forced to import the relevant script that you depend on, even though you never use it.
61# This is to make sure that this script won't run unless the dependent one exists, as well as automatically
62# checking for circular dependency via Python's import statements.
63
64
65
66
74
75
76def get_calibrations(input_data, **kwargs):
77 """
78 Parameters:
79 input_data (dict): Should contain every name from the 'input_data_names' variable as a key.
80 Each value is a dictionary with {"/path/to/file_e1_r5.root": IoV(1,5,1,5), ...}. Useful for
81 assigning to calibration.files_to_iov
82
83 **kwargs: Configuration options to be sent in. Since this may change we use kwargs as a way to help prevent
84 backwards compatibility problems. But you could use the correct arguments in b2caf-prompt-run for this
85 release explicitly if you want to.
86
87 Currently only kwargs["requested_iov"] and kwargs["expert_config"] are used.
88
89 "requested_iov" is the IoV range of the bucket and your payloads should correspond to this range.
90 However your highest payload IoV should be open ended e.g. IoV(3,4,-1,-1)
91
92 "expert_config" is the input configuration. It takes default values from your `CalibrationSettings` but these are
93 overwritten by values from the 'expert_config' key in your input `caf_config.json` file when running ``b2caf-prompt-run``.
94
95 Returns:
96 list(caf.framework.Calibration): All of the calibration objects we want to assign to the CAF process
97 """
98 # Set up config options
99 import basf2
100 from basf2 import register_module, create_path
101 from ROOT import Belle2 # noqa: make the Belle2 namespace available
102 from ROOT.Belle2 import TestCalibrationAlgorithm, TestBoundarySettingAlgorithm
103 from caf.framework import Calibration, Collection
104 from caf.strategies import SequentialBoundaries
105 from caf.utils import vector_from_runs, ExpRun, IoV
106
107 # In this script we want to use three different sources of input data, and reconstruct them
108 # differently before the Collector module runs.
109
110 # Get the input files from the input_data variable
111 file_to_iov_physics = input_data["physics"]
112 file_to_iov_cosmics = input_data["cosmics"]
113 file_to_iov_Bcosmics = input_data["Bcosmics"]
114
115 # We might have requested an enormous amount of data across a requested range.
116 # There's a LOT more files than runs!
117 # Lets set some limits because this calibration doesn't need that much to run.
118 expert_config = kwargs.get("expert_config")
119 max_files_per_run = expert_config["max_files_per_run"]
120 basf2.B2INFO(f"Reducing to a maximum of {max_files_per_run} files per run.")
121
122 # We filter out any more than 2 files per run. The input data files are sorted alphabetically by b2caf-prompt-run
123 # already. This procedure respects that ordering
124 from prompt.utils import filter_by_max_files_per_run
125
126 reduced_file_to_iov_physics = filter_by_max_files_per_run(file_to_iov_physics, max_files_per_run)
127 input_files_physics = list(reduced_file_to_iov_physics.keys())
128 basf2.B2INFO(f"Total number of physics files actually used as input = {len(input_files_physics)}")
129
130 reduced_file_to_iov_cosmics = filter_by_max_files_per_run(file_to_iov_cosmics, max_files_per_run)
131 input_files_cosmics = list(reduced_file_to_iov_cosmics.keys())
132 basf2.B2INFO(f"Total number of cosmics files actually used as input = {len(input_files_cosmics)}")
133
134 reduced_file_to_iov_Bcosmics = filter_by_max_files_per_run(file_to_iov_Bcosmics, max_files_per_run)
135 input_files_Bcosmics = list(reduced_file_to_iov_Bcosmics.keys())
136 basf2.B2INFO(f"Total number of Bcosmics files actually used as input = {len(input_files_Bcosmics)}")
137
138 # Get the overall request IoV we want to cover, including the end values. But we will probably want to replace the end values
139 # with -1, -1 when setting the output payload IoVs.
140 requested_iov = kwargs.get("requested_iov", None)
141
142 # The actual value our output IoV payload should have. Notice that we've set it open ended.
143 output_iov = IoV(requested_iov.exp_low, requested_iov.run_low, -1, -1)
144
145
148 col_test_physics = register_module("CaTest")
149 # This has to be 'run' otherwise our SequentialBoundaries strategy can't work.
150 # We could make it optional, based on the contents of the expert_config.
151 col_test_physics.param("granularity", "run")
152 col_test_physics.param("spread", 4)
153
154 col_test_Bcosmics = register_module("CaTest")
155 col_test_Bcosmics.param("granularity", "all")
156 col_test_Bcosmics.param("spread", 1)
157
158 col_test_cosmics = register_module("CaTest")
159 col_test_cosmics.param("granularity", "all")
160 col_test_cosmics.param("spread", 10)
161
162
165
166 # Let's specify that not all events will be used per file for every Collection
167 # Just set this with one element in the list if you use it. The value will be duplicated in collector subjobs if the number
168 # of input files is larger than 1.
169 max_events = expert_config["max_events_per_file"]
170 root_input = register_module("RootInput",
171 entrySequences=[f"0:{max_events}"]
172 )
173
174 # And/or we could set a prescale so that only a fraction of events pass onwards.
175 # This is most useful for randomly selecting events throughout input files.
176 # Note that if you set the entrySequences AS WELL as a prescale then you will be combining the entrySequences and prescale
177 # so that only a few events are passed into the Prescale module, and then only a fraction of those will continue to the
178 # Collector module.
179 prescale = expert_config["physics_prescale"]
180 prescale_mod = register_module("Prescale", prescale=prescale)
181 empty_path = create_path()
182 prescale_mod.if_false(empty_path, basf2.AfterConditionPath.END)
183
184 rec_path_physics = create_path()
185 rec_path_physics.add_module(root_input)
186 rec_path_physics.add_module(prescale_mod)
187 # could now add reconstruction modules dependent on the type of input data
188
189 rec_path_cosmics = create_path()
190 rec_path_cosmics.add_module(root_input)
191 # could now add reconstruction modules dependent on the type of input data
192
193 rec_path_Bcosmics = create_path()
194 rec_path_Bcosmics.add_module(root_input)
195 # could now add reconstruction modules dependent on the type of input data
196
197
200 alg_test1 = TestCalibrationAlgorithm()
201 alg_test2 = TestBoundarySettingAlgorithm()
202
203 # Send in a list of boundaries for our algorithm class and SequentialBoundaries strategy to use.
204 # A boundary is the STARTING run number for a new payload and all data from runs between this run and the next
205 # boundary will be used.
206 # In our algorithm the first run in our data is always a starting boundary, so we can pass an empty list here
207 # safely and still have it work.
208
209 # We make sure that the first payload begins at the start of the requested IoV.
210 # This is a quirk of SequentialBoundaries strategy as there must always be one boundary to START from.
211 # You could elect to always set this yourself manually, but that seems error prone.
212 payload_boundaries = [ExpRun(output_iov.exp_low, output_iov.run_low)]
213 # Now we can add the boundaries that exist in the expert config. They are extra boundaries, so that we don't have
214 # to set the initial one every time. If this is an empty list then we effectively run like the SingleIoV strategy.
215 payload_boundaries.extend([ExpRun(*boundary) for boundary in expert_config["payload_boundaries"]])
216 basf2.B2INFO(f"Expert set payload boundaries are: {expert_config['payload_boundaries']}")
217 # Now set them all
218 alg_test2.setBoundaries(vector_from_runs(payload_boundaries)) # This takes boundaries from the expert_config
219
220
225 collection_physics = Collection(collector=col_test_physics,
226 input_files=input_files_physics,
227 pre_collector_path=rec_path_physics,
228 max_collector_jobs=4
229 )
230
231 collection_cosmics = Collection(collector=col_test_cosmics,
232 input_files=input_files_cosmics,
233 pre_collector_path=rec_path_cosmics,
234 max_collector_jobs=2
235 )
236
237 collection_Bcosmics = Collection(collector=col_test_Bcosmics,
238 input_files=input_files_Bcosmics,
239 pre_collector_path=rec_path_Bcosmics,
240 max_collector_jobs=2
241 )
242
243
245
246 # We will set up two Calibrations. One which depends on the other.
247 # However, the first Calibration will generate payloads that we don't want to save in our output database for upload.
248 # Basically we want to ignore the payloads during the b2caf-prompt-run copying of the outputdb contents.
249 # But we still use them as input to the next calibration.
250
251 cal_test1 = Calibration("TestCalibration_cosmics")
252 # Add collections in with unique names
253 cal_test1.add_collection(name="cosmics", collection=collection_cosmics)
254 cal_test1.add_collection(name="Bcosmics", collection=collection_Bcosmics)
255 cal_test1.algorithms = [alg_test1]
256 # Do this for the default AlgorithmStrategy to force the output payload IoV
257 cal_test1.algorithms[0].params = {"apply_iov": output_iov}
258 # Mark this calibration as one whose payloads should not be copied at the end.
259 cal_test1.save_payloads = False
260
261 cal_test2 = Calibration("TestCalibration_physics")
262 # Add collections in with unique names
263 cal_test2.add_collection(name="physics", collection=collection_physics)
264 cal_test2.algorithms = [alg_test2]
265 # We apply a a different strategy that will allow us to split the data we run over into chunks based on the boundaries above
266 cal_test2.strategies = SequentialBoundaries
267 # Do this to force the output payload IoV. Note the different name to above!
268 cal_test2.algorithms[0].params["iov_coverage"] = output_iov
269
270 cal_test2.depends_on(cal_test1)
271
272 # You must return all calibrations you want to run in the prompt process
273 return [cal_test1, cal_test2]
274
275