Belle II Software release-09-00-02
example_complex.py
1
8
9"""A complicated example calibration that takes several input data lists from raw data and performs
10multiple calibrations. Only the second calibration will have its payloads placed into the final
11outputdb directory by b2caf-prompt-run.
12
13We make it so that this calibration depends on the result of a completely
14different one 'example_simple'. Even though that calibration will not be run in this process, the automated
15system can discover this dependency and use it when submitting tasks."""
16
17from prompt import CalibrationSettings, INPUT_DATA_FILTERS
18
19
26
27# We decide to only run this script once the simple one has run. This only affects the automated system when scheduling
28# tasks. This script can always be run standalone.
29from prompt.calibrations.example_simple import settings as example_simple
30
31
32settings = CalibrationSettings(name="Example Complex",
33 expert_username="ddossett",
34 subsystem="example",
35 description=__doc__,
36 input_data_formats=["raw"],
37 input_data_names=["physics", "cosmics", "Bcosmics"],
38 input_data_filters={"physics": [f"NOT {INPUT_DATA_FILTERS['Magnet']['On']}",
39 INPUT_DATA_FILTERS["Data Tag"]["hadron_calib"],
40 INPUT_DATA_FILTERS["Data Quality Tag"]["Good"],
41 INPUT_DATA_FILTERS["Beam Energy"]["4S"],
42 INPUT_DATA_FILTERS["Run Type"]["physics"]],
43 "cosmics": [INPUT_DATA_FILTERS['Magnet']['Off'],
44 INPUT_DATA_FILTERS["Data Tag"]["cosmic_calib"],
45 INPUT_DATA_FILTERS["Data Quality Tag"]["Bad For Alignment"],
46 INPUT_DATA_FILTERS["Beam Energy"]["Continuum"],
47 f"NOT {INPUT_DATA_FILTERS['Run Type']['physics']}"],
48 "Bcosmics": [INPUT_DATA_FILTERS["Data Tag"]["cosmic_calib"],
49 INPUT_DATA_FILTERS["Data Quality Tag"]["Good"],
50 INPUT_DATA_FILTERS["Beam Energy"]["4S"]]},
51 depends_on=[example_simple],
52 expert_config={
53 "physics_prescale": 0.2,
54 "max_events_per_file": 100,
55 "max_files_per_run": 2,
56 "payload_boundaries": []
57 },
58 produced_payloads=[])
59
60# The values in expert_config above are the DEFAULT for this script. They will be overwritten by values in caf_config.json
61
62# Note that you are forced to import the relevant script that you depend on, even though you never use it.
63# This is to make sure that this script won't run unless the dependent one exists, as well as automatically
64# checking for circular dependency via Python's import statements.
65
66
67
68
76
77
78def get_calibrations(input_data, **kwargs):
79 """
80 Parameters:
81 input_data (dict): Should contain every name from the 'input_data_names' variable as a key.
82 Each value is a dictionary with {"/path/to/file_e1_r5.root": IoV(1,5,1,5), ...}. Useful for
83 assigning to calibration.files_to_iov
84
85 **kwargs: Configuration options to be sent in. Since this may change we use kwargs as a way to help prevent
86 backwards compatibility problems. But you could use the correct arguments in b2caf-prompt-run for this
87 release explicitly if you want to.
88
89 Currently only kwargs["requested_iov"] and kwargs["expert_config"] are used.
90
91 "requested_iov" is the IoV range of the bucket and your payloads should correspond to this range.
92 However your highest payload IoV should be open ended e.g. IoV(3,4,-1,-1)
93
94 "expert_config" is the input configuration. It takes default values from your `CalibrationSettings` but these are
95 overwritten by values from the 'expert_config' key in your input `caf_config.json` file when running ``b2caf-prompt-run``.
96
97 Returns:
98 list(caf.framework.Calibration): All of the calibration objects we want to assign to the CAF process
99 """
100 # Set up config options
101 import basf2
102 from basf2 import register_module, create_path
103 from ROOT.Belle2 import TestCalibrationAlgorithm, TestBoundarySettingAlgorithm
104 from caf.framework import Calibration, Collection
105 from caf.strategies import SequentialBoundaries
106 from caf.utils import vector_from_runs, ExpRun, IoV
107
108 # In this script we want to use three different sources of input data, and reconstruct them
109 # differently before the Collector module runs.
110
111 # Get the input files from the input_data variable
112 file_to_iov_physics = input_data["physics"]
113 file_to_iov_cosmics = input_data["cosmics"]
114 file_to_iov_Bcosmics = input_data["Bcosmics"]
115
116 # We might have requested an enormous amount of data across a requested range.
117 # There's a LOT more files than runs!
118 # Lets set some limits because this calibration doesn't need that much to run.
119 expert_config = kwargs.get("expert_config")
120 max_files_per_run = expert_config["max_files_per_run"]
121 basf2.B2INFO(f"Reducing to a maximum of {max_files_per_run} files per run.")
122
123 # We filter out any more than 2 files per run. The input data files are sorted alphabetically by b2caf-prompt-run
124 # already. This procedure respects that ordering
125 from prompt.utils import filter_by_max_files_per_run
126
127 reduced_file_to_iov_physics = filter_by_max_files_per_run(file_to_iov_physics, max_files_per_run)
128 input_files_physics = list(reduced_file_to_iov_physics.keys())
129 basf2.B2INFO(f"Total number of physics files actually used as input = {len(input_files_physics)}")
130
131 reduced_file_to_iov_cosmics = filter_by_max_files_per_run(file_to_iov_cosmics, max_files_per_run)
132 input_files_cosmics = list(reduced_file_to_iov_cosmics.keys())
133 basf2.B2INFO(f"Total number of cosmics files actually used as input = {len(input_files_cosmics)}")
134
135 reduced_file_to_iov_Bcosmics = filter_by_max_files_per_run(file_to_iov_Bcosmics, max_files_per_run)
136 input_files_Bcosmics = list(reduced_file_to_iov_Bcosmics.keys())
137 basf2.B2INFO(f"Total number of Bcosmics files actually used as input = {len(input_files_Bcosmics)}")
138
139 # Get the overall request IoV we want to cover, including the end values. But we will probably want to replace the end values
140 # with -1, -1 when setting the output payload IoVs.
141 requested_iov = kwargs.get("requested_iov", None)
142
143 # The actual value our output IoV payload should have. Notice that we've set it open ended.
144 output_iov = IoV(requested_iov.exp_low, requested_iov.run_low, -1, -1)
145
146
149 col_test_physics = register_module("CaTest")
150 # This has to be 'run' otherwise our SequentialBoundaries strategy can't work.
151 # We could make it optional, based on the contents of the expert_config.
152 col_test_physics.param("granularity", "run")
153 col_test_physics.param("spread", 4)
154
155 col_test_Bcosmics = register_module("CaTest")
156 col_test_Bcosmics.param("granularity", "all")
157 col_test_Bcosmics.param("spread", 1)
158
159 col_test_cosmics = register_module("CaTest")
160 col_test_cosmics.param("granularity", "all")
161 col_test_cosmics.param("spread", 10)
162
163
166
167 # Let's specify that not all events will be used per file for every Collection
168 # Just set this with one element in the list if you use it. The value will be duplicated in collector subjobs if the number
169 # of input files is larger than 1.
170 max_events = expert_config["max_events_per_file"]
171 root_input = register_module("RootInput",
172 entrySequences=[f"0:{max_events}"]
173 )
174
175 # And/or we could set a prescale so that only a fraction of events pass onwards.
176 # This is most useful for randomly selecting events throughout input files.
177 # Note that if you set the entrySequences AS WELL as a prescale then you will be combining the entrySequences and prescale
178 # so that only a few events are passed into the Prescale module, and then only a fraction of those will continue to the
179 # Collector module.
180 prescale = expert_config["physics_prescale"]
181 prescale_mod = register_module("Prescale", prescale=prescale)
182 empty_path = create_path()
183 prescale_mod.if_false(empty_path, basf2.AfterConditionPath.END)
184
185 rec_path_physics = create_path()
186 rec_path_physics.add_module(root_input)
187 rec_path_physics.add_module(prescale_mod)
188 # could now add reconstruction modules dependent on the type of input data
189
190 rec_path_cosmics = create_path()
191 rec_path_cosmics.add_module(root_input)
192 # could now add reconstruction modules dependent on the type of input data
193
194 rec_path_Bcosmics = create_path()
195 rec_path_Bcosmics.add_module(root_input)
196 # could now add reconstruction modules dependent on the type of input data
197
198
201 alg_test1 = TestCalibrationAlgorithm()
202 alg_test2 = TestBoundarySettingAlgorithm()
203
204 # Send in a list of boundaries for our algorithm class and SequentialBoundaries strategy to use.
205 # A boundary is the STARTING run number for a new payload and all data from runs between this run and the next
206 # boundary will be used.
207 # In our algorithm the first run in our data is always a starting boundary, so we can pass an empty list here
208 # safely and still have it work.
209
210 # We make sure that the first payload begins at the start of the requested IoV.
211 # This is a quirk of SequentialBoundaries strategy as there must always be one boundary to START from.
212 # You could elect to always set this yourself manually, but that seems error prone.
213 payload_boundaries = [ExpRun(output_iov.exp_low, output_iov.run_low)]
214 # Now we can add the boundaries that exist in the expert config. They are extra boundaries, so that we don't have
215 # to set the initial one every time. If this is an empty list then we effectively run like the SingleIoV strategy.
216 payload_boundaries.extend([ExpRun(*boundary) for boundary in expert_config["payload_boundaries"]])
217 basf2.B2INFO(f"Expert set payload boundaries are: {expert_config['payload_boundaries']}")
218 # Now set them all
219 alg_test2.setBoundaries(vector_from_runs(payload_boundaries)) # This takes boundaries from the expert_config
220
221
226 collection_physics = Collection(collector=col_test_physics,
227 input_files=input_files_physics,
228 pre_collector_path=rec_path_physics,
229 max_collector_jobs=4
230 )
231
232 collection_cosmics = Collection(collector=col_test_cosmics,
233 input_files=input_files_cosmics,
234 pre_collector_path=rec_path_cosmics,
235 max_collector_jobs=2
236 )
237
238 collection_Bcosmics = Collection(collector=col_test_Bcosmics,
239 input_files=input_files_Bcosmics,
240 pre_collector_path=rec_path_Bcosmics,
241 max_collector_jobs=2
242 )
243
244
246
247 # We will set up two Calibrations. One which depends on the other.
248 # However, the first Calibration will generate payloads that we don't want to save in our output database for upload.
249 # Basically we want to ignore the payloads during the b2caf-prompt-run copying of the outputdb contents.
250 # But we still use them as input to the next calibration.
251
252 cal_test1 = Calibration("TestCalibration_cosmics")
253 # Add collections in with unique names
254 cal_test1.add_collection(name="cosmics", collection=collection_cosmics)
255 cal_test1.add_collection(name="Bcosmics", collection=collection_Bcosmics)
256 cal_test1.algorithms = [alg_test1]
257 # Do this for the default AlgorithmStrategy to force the output payload IoV
258 cal_test1.algorithms[0].params = {"apply_iov": output_iov}
259 # Mark this calibration as one whose payloads should not be copied at the end.
260 cal_test1.save_payloads = False
261
262 cal_test2 = Calibration("TestCalibration_physics")
263 # Add collections in with unique names
264 cal_test2.add_collection(name="physics", collection=collection_physics)
265 cal_test2.algorithms = [alg_test2]
266 # We apply a a different strategy that will allow us to split the data we run over into chunks based on the boundaries above
267 cal_test2.strategies = SequentialBoundaries
268 # Do this to force the output payload IoV. Note the different name to above!
269 cal_test2.algorithms[0].params["iov_coverage"] = output_iov
270
271 cal_test2.depends_on(cal_test1)
272
273 # You must return all calibrations you want to run in the prompt process
274 return [cal_test1, cal_test2]
275
276