Belle II Software development
example_complex.py
1
8
9"""A complicated example calibration that takes several input data lists from raw data and performs
10multiple calibrations. Only the second calibration will have its payloads placed into the final
11outputdb directory by b2caf-prompt-run.
12
13We make it so that this calibration depends on the result of a completely
14different one 'example_simple'. Even though that calibration will not be run in this process, the automated
15system can discover this dependency and use it when submitting tasks."""
16
17from prompt import CalibrationSettings, INPUT_DATA_FILTERS
18
19
26
27# We decide to only run this script once the simple one has run. This only affects the automated system when scheduling
28# tasks. This script can always be run standalone.
29from prompt.calibrations.example_simple import settings as example_simple
30
31
32settings = CalibrationSettings(name="Example Complex",
33 expert_username="ddossett",
34 description=__doc__,
35 input_data_formats=["raw"],
36 input_data_names=["physics", "cosmics", "Bcosmics"],
37 input_data_filters={"physics": [f"NOT {INPUT_DATA_FILTERS['Magnet']['On']}",
38 INPUT_DATA_FILTERS["Data Tag"]["hadron_calib"],
39 INPUT_DATA_FILTERS["Data Quality Tag"]["Good"],
40 INPUT_DATA_FILTERS["Beam Energy"]["4S"],
41 INPUT_DATA_FILTERS["Run Type"]["physics"]],
42 "cosmics": [INPUT_DATA_FILTERS['Magnet']['Off'],
43 INPUT_DATA_FILTERS["Data Tag"]["cosmic_calib"],
44 INPUT_DATA_FILTERS["Data Quality Tag"]["Bad For Alignment"],
45 INPUT_DATA_FILTERS["Beam Energy"]["Continuum"],
46 f"NOT {INPUT_DATA_FILTERS['Run Type']['physics']}"],
47 "Bcosmics": [INPUT_DATA_FILTERS["Data Tag"]["cosmic_calib"],
48 INPUT_DATA_FILTERS["Data Quality Tag"]["Good"],
49 INPUT_DATA_FILTERS["Beam Energy"]["4S"]]},
50 depends_on=[example_simple],
51 expert_config={
52 "physics_prescale": 0.2,
53 "max_events_per_file": 100,
54 "max_files_per_run": 2,
55 "payload_boundaries": []
56 })
57
58# The values in expert_config above are the DEFAULT for this script. They will be overwritten by values in caf_config.json
59
60# Note that you are forced to import the relevant script that you depend on, even though you never use it.
61# This is to make sure that this script won't run unless the dependent one exists, as well as automatically
62# checking for circular dependency via Python's import statements.
63
64
65
66
74
75
76def get_calibrations(input_data, **kwargs):
77 """
78 Parameters:
79 input_data (dict): Should contain every name from the 'input_data_names' variable as a key.
80 Each value is a dictionary with {"/path/to/file_e1_r5.root": IoV(1,5,1,5), ...}. Useful for
81 assigning to calibration.files_to_iov
82
83 **kwargs: Configuration options to be sent in. Since this may change we use kwargs as a way to help prevent
84 backwards compatibility problems. But you could use the correct arguments in b2caf-prompt-run for this
85 release explicitly if you want to.
86
87 Currently only kwargs["requested_iov"] and kwargs["expert_config"] are used.
88
89 "requested_iov" is the IoV range of the bucket and your payloads should correspond to this range.
90 However your highest payload IoV should be open ended e.g. IoV(3,4,-1,-1)
91
92 "expert_config" is the input configuration. It takes default values from your `CalibrationSettings` but these are
93 overwritten by values from the 'expert_config' key in your input `caf_config.json` file when running ``b2caf-prompt-run``.
94
95 Returns:
96 list(caf.framework.Calibration): All of the calibration objects we want to assign to the CAF process
97 """
98 # Set up config options
99 import basf2
100 from basf2 import register_module, create_path
101 from ROOT.Belle2 import TestCalibrationAlgorithm, TestBoundarySettingAlgorithm
102 from caf.framework import Calibration, Collection
103 from caf.strategies import SequentialBoundaries
104 from caf.utils import vector_from_runs, ExpRun, IoV
105
106 # In this script we want to use three different sources of input data, and reconstruct them
107 # differently before the Collector module runs.
108
109 # Get the input files from the input_data variable
110 file_to_iov_physics = input_data["physics"]
111 file_to_iov_cosmics = input_data["cosmics"]
112 file_to_iov_Bcosmics = input_data["Bcosmics"]
113
114 # We might have requested an enormous amount of data across a requested range.
115 # There's a LOT more files than runs!
116 # Lets set some limits because this calibration doesn't need that much to run.
117 expert_config = kwargs.get("expert_config")
118 max_files_per_run = expert_config["max_files_per_run"]
119 basf2.B2INFO(f"Reducing to a maximum of {max_files_per_run} files per run.")
120
121 # We filter out any more than 2 files per run. The input data files are sorted alphabetically by b2caf-prompt-run
122 # already. This procedure respects that ordering
123 from prompt.utils import filter_by_max_files_per_run
124
125 reduced_file_to_iov_physics = filter_by_max_files_per_run(file_to_iov_physics, max_files_per_run)
126 input_files_physics = list(reduced_file_to_iov_physics.keys())
127 basf2.B2INFO(f"Total number of physics files actually used as input = {len(input_files_physics)}")
128
129 reduced_file_to_iov_cosmics = filter_by_max_files_per_run(file_to_iov_cosmics, max_files_per_run)
130 input_files_cosmics = list(reduced_file_to_iov_cosmics.keys())
131 basf2.B2INFO(f"Total number of cosmics files actually used as input = {len(input_files_cosmics)}")
132
133 reduced_file_to_iov_Bcosmics = filter_by_max_files_per_run(file_to_iov_Bcosmics, max_files_per_run)
134 input_files_Bcosmics = list(reduced_file_to_iov_Bcosmics.keys())
135 basf2.B2INFO(f"Total number of Bcosmics files actually used as input = {len(input_files_Bcosmics)}")
136
137 # Get the overall request IoV we want to cover, including the end values. But we will probably want to replace the end values
138 # with -1, -1 when setting the output payload IoVs.
139 requested_iov = kwargs.get("requested_iov", None)
140
141 # The actual value our output IoV payload should have. Notice that we've set it open ended.
142 output_iov = IoV(requested_iov.exp_low, requested_iov.run_low, -1, -1)
143
144
147 col_test_physics = register_module("CaTest")
148 # This has to be 'run' otherwise our SequentialBoundaries strategy can't work.
149 # We could make it optional, based on the contents of the expert_config.
150 col_test_physics.param("granularity", "run")
151 col_test_physics.param("spread", 4)
152
153 col_test_Bcosmics = register_module("CaTest")
154 col_test_Bcosmics.param("granularity", "all")
155 col_test_Bcosmics.param("spread", 1)
156
157 col_test_cosmics = register_module("CaTest")
158 col_test_cosmics.param("granularity", "all")
159 col_test_cosmics.param("spread", 10)
160
161
164
165 # Let's specify that not all events will be used per file for every Collection
166 # Just set this with one element in the list if you use it. The value will be duplicated in collector subjobs if the number
167 # of input files is larger than 1.
168 max_events = expert_config["max_events_per_file"]
169 root_input = register_module("RootInput",
170 entrySequences=[f"0:{max_events}"]
171 )
172
173 # And/or we could set a prescale so that only a fraction of events pass onwards.
174 # This is most useful for randomly selecting events throughout input files.
175 # Note that if you set the entrySequences AS WELL as a prescale then you will be combining the entrySequences and prescale
176 # so that only a few events are passed into the Prescale module, and then only a fraction of those will continue to the
177 # Collector module.
178 prescale = expert_config["physics_prescale"]
179 prescale_mod = register_module("Prescale", prescale=prescale)
180 empty_path = create_path()
181 prescale_mod.if_false(empty_path, basf2.AfterConditionPath.END)
182
183 rec_path_physics = create_path()
184 rec_path_physics.add_module(root_input)
185 rec_path_physics.add_module(prescale_mod)
186 # could now add reconstruction modules dependent on the type of input data
187
188 rec_path_cosmics = create_path()
189 rec_path_cosmics.add_module(root_input)
190 # could now add reconstruction modules dependent on the type of input data
191
192 rec_path_Bcosmics = create_path()
193 rec_path_Bcosmics.add_module(root_input)
194 # could now add reconstruction modules dependent on the type of input data
195
196
199 alg_test1 = TestCalibrationAlgorithm()
200 alg_test2 = TestBoundarySettingAlgorithm()
201
202 # Send in a list of boundaries for our algorithm class and SequentialBoundaries strategy to use.
203 # A boundary is the STARTING run number for a new payload and all data from runs between this run and the next
204 # boundary will be used.
205 # In our algorithm the first run in our data is always a starting boundary, so we can pass an empty list here
206 # safely and still have it work.
207
208 # We make sure that the first payload begins at the start of the requested IoV.
209 # This is a quirk of SequentialBoundaries strategy as there must always be one boundary to START from.
210 # You could elect to always set this yourself manually, but that seems error prone.
211 payload_boundaries = [ExpRun(output_iov.exp_low, output_iov.run_low)]
212 # Now we can add the boundaries that exist in the expert config. They are extra boundaries, so that we don't have
213 # to set the initial one every time. If this is an empty list then we effectively run like the SingleIoV strategy.
214 payload_boundaries.extend([ExpRun(*boundary) for boundary in expert_config["payload_boundaries"]])
215 basf2.B2INFO(f"Expert set payload boundaries are: {expert_config['payload_boundaries']}")
216 # Now set them all
217 alg_test2.setBoundaries(vector_from_runs(payload_boundaries)) # This takes boundaries from the expert_config
218
219
224 collection_physics = Collection(collector=col_test_physics,
225 input_files=input_files_physics,
226 pre_collector_path=rec_path_physics,
227 max_collector_jobs=4
228 )
229
230 collection_cosmics = Collection(collector=col_test_cosmics,
231 input_files=input_files_cosmics,
232 pre_collector_path=rec_path_cosmics,
233 max_collector_jobs=2
234 )
235
236 collection_Bcosmics = Collection(collector=col_test_Bcosmics,
237 input_files=input_files_Bcosmics,
238 pre_collector_path=rec_path_Bcosmics,
239 max_collector_jobs=2
240 )
241
242
244
245 # We will set up two Calibrations. One which depends on the other.
246 # However, the first Calibration will generate payloads that we don't want to save in our output database for upload.
247 # Basically we want to ignore the payloads during the b2caf-prompt-run copying of the outputdb contents.
248 # But we still use them as input to the next calibration.
249
250 cal_test1 = Calibration("TestCalibration_cosmics")
251 # Add collections in with unique names
252 cal_test1.add_collection(name="cosmics", collection=collection_cosmics)
253 cal_test1.add_collection(name="Bcosmics", collection=collection_Bcosmics)
254 cal_test1.algorithms = [alg_test1]
255 # Do this for the default AlgorithmStrategy to force the output payload IoV
256 cal_test1.algorithms[0].params = {"apply_iov": output_iov}
257 # Mark this calibration as one whose payloads should not be copied at the end.
258 cal_test1.save_payloads = False
259
260 cal_test2 = Calibration("TestCalibration_physics")
261 # Add collections in with unique names
262 cal_test2.add_collection(name="physics", collection=collection_physics)
263 cal_test2.algorithms = [alg_test2]
264 # We apply a a different strategy that will allow us to split the data we run over into chunks based on the boundaries above
265 cal_test2.strategies = SequentialBoundaries
266 # Do this to force the output payload IoV. Note the different name to above!
267 cal_test2.algorithms[0].params["iov_coverage"] = output_iov
268
269 cal_test2.depends_on(cal_test1)
270
271 # You must return all calibrations you want to run in the prompt process
272 return [cal_test1, cal_test2]
273
274