Belle II Software  release-08-01-10
example_complex.py
1 
8 
9 """A complicated example calibration that takes several input data lists from raw data and performs
10 multiple calibrations. Only the second calibration will have its payloads placed into the final
11 outputdb directory by b2caf-prompt-run.
12 
13 We make it so that this calibration depends on the result of a completely
14 different one 'example_simple'. Even though that calibration will not be run in this process, the automated
15 system can discover this dependency and use it when submitting tasks."""
16 
17 from prompt import CalibrationSettings, INPUT_DATA_FILTERS
18 
19 
26 
27 # We decide to only run this script once the simple one has run. This only affects the automated system when scheduling
28 # tasks. This script can always be run standalone.
29 from prompt.calibrations.example_simple import settings as example_simple
30 
31 
32 settings = CalibrationSettings(name="Example Complex",
33  expert_username="ddossett",
34  description=__doc__,
35  input_data_formats=["raw"],
36  input_data_names=["physics", "cosmics", "Bcosmics"],
37  input_data_filters={"physics": [f"NOT {INPUT_DATA_FILTERS['Magnet']['On']}",
38  INPUT_DATA_FILTERS["Data Tag"]["hadron_calib"],
39  INPUT_DATA_FILTERS["Data Quality Tag"]["Good"],
40  INPUT_DATA_FILTERS["Beam Energy"]["4S"],
41  INPUT_DATA_FILTERS["Run Type"]["physics"]],
42  "cosmics": [INPUT_DATA_FILTERS['Magnet']['Off'],
43  INPUT_DATA_FILTERS["Data Tag"]["cosmic_calib"],
44  INPUT_DATA_FILTERS["Data Quality Tag"]["Bad For Alignment"],
45  INPUT_DATA_FILTERS["Beam Energy"]["Continuum"],
46  f"NOT {INPUT_DATA_FILTERS['Run Type']['physics']}"],
47  "Bcosmics": [INPUT_DATA_FILTERS["Data Tag"]["cosmic_calib"],
48  INPUT_DATA_FILTERS["Data Quality Tag"]["Good"],
49  INPUT_DATA_FILTERS["Beam Energy"]["4S"]]},
50  depends_on=[example_simple],
51  expert_config={
52  "physics_prescale": 0.2,
53  "max_events_per_file": 100,
54  "max_files_per_run": 2,
55  "payload_boundaries": []
56  })
57 
58 # The values in expert_config above are the DEFAULT for this script. They will be overwritten by values in caf_config.json
59 
60 # Note that you are forced to import the relevant script that you depend on, even though you never use it.
61 # This is to make sure that this script won't run unless the dependent one exists, as well as automatically
62 # checking for circular dependency via Python's import statements.
63 
64 
65 
66 
74 
75 
76 def get_calibrations(input_data, **kwargs):
77  """
78  Parameters:
79  input_data (dict): Should contain every name from the 'input_data_names' variable as a key.
80  Each value is a dictionary with {"/path/to/file_e1_r5.root": IoV(1,5,1,5), ...}. Useful for
81  assigning to calibration.files_to_iov
82 
83  **kwargs: Configuration options to be sent in. Since this may change we use kwargs as a way to help prevent
84  backwards compatibility problems. But you could use the correct arguments in b2caf-prompt-run for this
85  release explicitly if you want to.
86 
87  Currently only kwargs["requested_iov"] and kwargs["expert_config"] are used.
88 
89  "requested_iov" is the IoV range of the bucket and your payloads should correspond to this range.
90  However your highest payload IoV should be open ended e.g. IoV(3,4,-1,-1)
91 
92  "expert_config" is the input configuration. It takes default values from your `CalibrationSettings` but these are
93  overwritten by values from the 'expert_config' key in your input `caf_config.json` file when running ``b2caf-prompt-run``.
94 
95  Returns:
96  list(caf.framework.Calibration): All of the calibration objects we want to assign to the CAF process
97  """
98  # Set up config options
99  import basf2
100  from basf2 import register_module, create_path
101  from ROOT.Belle2 import TestCalibrationAlgorithm, TestBoundarySettingAlgorithm
102  from caf.framework import Calibration, Collection
103  from caf.strategies import SequentialBoundaries
104  from caf.utils import vector_from_runs, ExpRun, IoV
105 
106  # In this script we want to use three different sources of input data, and reconstruct them
107  # differently before the Collector module runs.
108 
109  # Get the input files from the input_data variable
110  file_to_iov_physics = input_data["physics"]
111  file_to_iov_cosmics = input_data["cosmics"]
112  file_to_iov_Bcosmics = input_data["Bcosmics"]
113 
114  # We might have requested an enormous amount of data across a requested range.
115  # There's a LOT more files than runs!
116  # Lets set some limits because this calibration doesn't need that much to run.
117  expert_config = kwargs.get("expert_config")
118  max_files_per_run = expert_config["max_files_per_run"]
119  basf2.B2INFO(f"Reducing to a maximum of {max_files_per_run} files per run.")
120 
121  # We filter out any more than 2 files per run. The input data files are sorted alphabetically by b2caf-prompt-run
122  # already. This procedure respects that ordering
123  from prompt.utils import filter_by_max_files_per_run
124 
125  reduced_file_to_iov_physics = filter_by_max_files_per_run(file_to_iov_physics, max_files_per_run)
126  input_files_physics = list(reduced_file_to_iov_physics.keys())
127  basf2.B2INFO(f"Total number of physics files actually used as input = {len(input_files_physics)}")
128 
129  reduced_file_to_iov_cosmics = filter_by_max_files_per_run(file_to_iov_cosmics, max_files_per_run)
130  input_files_cosmics = list(reduced_file_to_iov_cosmics.keys())
131  basf2.B2INFO(f"Total number of cosmics files actually used as input = {len(input_files_cosmics)}")
132 
133  reduced_file_to_iov_Bcosmics = filter_by_max_files_per_run(file_to_iov_Bcosmics, max_files_per_run)
134  input_files_Bcosmics = list(reduced_file_to_iov_Bcosmics.keys())
135  basf2.B2INFO(f"Total number of Bcosmics files actually used as input = {len(input_files_Bcosmics)}")
136 
137  # Get the overall request IoV we want to cover, including the end values. But we will probably want to replace the end values
138  # with -1, -1 when setting the output payload IoVs.
139  requested_iov = kwargs.get("requested_iov", None)
140 
141  # The actual value our output IoV payload should have. Notice that we've set it open ended.
142  output_iov = IoV(requested_iov.exp_low, requested_iov.run_low, -1, -1)
143 
144 
147  col_test_physics = register_module("CaTest")
148  # This has to be 'run' otherwise our SequentialBoundaries strategy can't work.
149  # We could make it optional, based on the contents of the expert_config.
150  col_test_physics.param("granularity", "run")
151  col_test_physics.param("spread", 4)
152 
153  col_test_Bcosmics = register_module("CaTest")
154  col_test_Bcosmics.param("granularity", "all")
155  col_test_Bcosmics.param("spread", 1)
156 
157  col_test_cosmics = register_module("CaTest")
158  col_test_cosmics.param("granularity", "all")
159  col_test_cosmics.param("spread", 10)
160 
161 
164 
165  # Let's specify that not all events will be used per file for every Collection
166  # Just set this with one element in the list if you use it. The value will be duplicated in collector subjobs if the number
167  # of input files is larger than 1.
168  max_events = expert_config["max_events_per_file"]
169  root_input = register_module("RootInput",
170  entrySequences=[f"0:{max_events}"]
171  )
172 
173  # And/or we could set a prescale so that only a fraction of events pass onwards.
174  # This is most useful for randomly selecting events throughout input files.
175  # Note that if you set the entrySequences AS WELL as a prescale then you will be combining the entrySequences and prescale
176  # so that only a few events are passed into the Prescale module, and then only a fraction of those will continue to the
177  # Collector module.
178  prescale = expert_config["physics_prescale"]
179  prescale_mod = register_module("Prescale", prescale=prescale)
180  empty_path = create_path()
181  prescale_mod.if_false(empty_path, basf2.AfterConditionPath.END)
182 
183  rec_path_physics = create_path()
184  rec_path_physics.add_module(root_input)
185  rec_path_physics.add_module(prescale_mod)
186  # could now add reconstruction modules dependent on the type of input data
187 
188  rec_path_cosmics = create_path()
189  rec_path_cosmics.add_module(root_input)
190  # could now add reconstruction modules dependent on the type of input data
191 
192  rec_path_Bcosmics = create_path()
193  rec_path_Bcosmics.add_module(root_input)
194  # could now add reconstruction modules dependent on the type of input data
195 
196 
199  alg_test1 = TestCalibrationAlgorithm()
200  alg_test2 = TestBoundarySettingAlgorithm()
201 
202  # Send in a list of boundaries for our algorithm class and SequentialBoundaries strategy to use.
203  # A boundary is the STARTING run number for a new payload and all data from runs between this run and the next
204  # boundary will be used.
205  # In our algorithm the first run in our data is always a starting boundary, so we can pass an empty list here
206  # safely and still have it work.
207 
208  # We make sure that the first payload begins at the start of the requested IoV.
209  # This is a quirk of SequentialBoundaries strategy as there must always be one boundary to START from.
210  # You could elect to always set this yourself manually, but that seems error prone.
211  payload_boundaries = [ExpRun(output_iov.exp_low, output_iov.run_low)]
212  # Now we can add the boundaries that exist in the expert config. They are extra boundaries, so that we don't have
213  # to set the initial one every time. If this is an empty list then we effectively run like the SingleIoV strategy.
214  payload_boundaries.extend([ExpRun(*boundary) for boundary in expert_config["payload_boundaries"]])
215  basf2.B2INFO(f"Expert set payload boundaries are: {expert_config['payload_boundaries']}")
216  # Now set them all
217  alg_test2.setBoundaries(vector_from_runs(payload_boundaries)) # This takes boundaries from the expert_config
218 
219 
224  collection_physics = Collection(collector=col_test_physics,
225  input_files=input_files_physics,
226  pre_collector_path=rec_path_physics,
227  max_collector_jobs=4
228  )
229 
230  collection_cosmics = Collection(collector=col_test_cosmics,
231  input_files=input_files_cosmics,
232  pre_collector_path=rec_path_cosmics,
233  max_collector_jobs=2
234  )
235 
236  collection_Bcosmics = Collection(collector=col_test_Bcosmics,
237  input_files=input_files_Bcosmics,
238  pre_collector_path=rec_path_Bcosmics,
239  max_collector_jobs=2
240  )
241 
242 
244 
245  # We will set up two Calibrations. One which depends on the other.
246  # However, the first Calibration will generate payloads that we don't want to save in our output database for upload.
247  # Basically we want to ignore the payloads during the b2caf-prompt-run copying of the outputdb contents.
248  # But we still use them as input to the next calibration.
249 
250  cal_test1 = Calibration("TestCalibration_cosmics")
251  # Add collections in with unique names
252  cal_test1.add_collection(name="cosmics", collection=collection_cosmics)
253  cal_test1.add_collection(name="Bcosmics", collection=collection_Bcosmics)
254  cal_test1.algorithms = [alg_test1]
255  # Do this for the default AlgorithmStrategy to force the output payload IoV
256  cal_test1.algorithms[0].params = {"apply_iov": output_iov}
257  # Mark this calibration as one whose payloads should not be copied at the end.
258  cal_test1.save_payloads = False
259 
260  cal_test2 = Calibration("TestCalibration_physics")
261  # Add collections in with unique names
262  cal_test2.add_collection(name="physics", collection=collection_physics)
263  cal_test2.algorithms = [alg_test2]
264  # We apply a a different strategy that will allow us to split the data we run over into chunks based on the boundaries above
265  cal_test2.strategies = SequentialBoundaries
266  # Do this to force the output payload IoV. Note the different name to above!
267  cal_test2.algorithms[0].params["iov_coverage"] = output_iov
268 
269  cal_test2.depends_on(cal_test1)
270 
271  # You must return all calibrations you want to run in the prompt process
272  return [cal_test1, cal_test2]
273 
274