Belle II Software  release-06-02-00
example_complex.py
1 # -*- coding: utf-8 -*-
2 
3 
10 
11 """A complicated example calibration that takes several input data lists from raw data and performs
12 multiple calibrations. Only the second calibration will have its payloads placed into the final
13 outputdb directory by b2caf-prompt-run.
14 
15 We make it so that this calibration depends on the result of a completely
16 different one 'example_simple'. Even though that calibration will not be run in this process, the automated
17 system can discover this dependency and use it when submitting tasks."""
18 
19 from prompt import CalibrationSettings, INPUT_DATA_FILTERS
20 
21 
28 
29 # We decide to only run this script once the simple one has run. This only affects the automated system when scheduling
30 # tasks. This script can always be run standalone.
31 from prompt.calibrations.example_simple import settings as example_simple
32 
33 
34 settings = CalibrationSettings(name="Example Complex",
35  expert_username="ddossett",
36  description=__doc__,
37  input_data_formats=["raw"],
38  input_data_names=["physics", "cosmics", "Bcosmics"],
39  input_data_filters={"physics": [f"NOT {INPUT_DATA_FILTERS['Magnet']['On']}",
40  INPUT_DATA_FILTERS["Data Tag"]["hadron_calib"],
41  INPUT_DATA_FILTERS["Data Quality Tag"]["Good"],
42  INPUT_DATA_FILTERS["Beam Energy"]["4S"],
43  INPUT_DATA_FILTERS["Run Type"]["physics"]],
44  "cosmics": [INPUT_DATA_FILTERS['Magnet']['Off'],
45  INPUT_DATA_FILTERS["Data Tag"]["cosmic_calib"],
46  INPUT_DATA_FILTERS["Data Quality Tag"]["Bad For Alignment"],
47  INPUT_DATA_FILTERS["Beam Energy"]["Continuum"],
48  f"NOT {INPUT_DATA_FILTERS['Run Type']['physics']}"],
49  "Bcosmics": [INPUT_DATA_FILTERS["Data Tag"]["cosmic_calib"],
50  INPUT_DATA_FILTERS["Data Quality Tag"]["Good"],
51  INPUT_DATA_FILTERS["Beam Energy"]["4S"]]},
52  depends_on=[example_simple],
53  expert_config={
54  "physics_prescale": 0.2,
55  "max_events_per_file": 100,
56  "max_files_per_run": 2,
57  "payload_boundaries": []
58  })
59 
60 # The values in expert_config above are the DEFAULT for this script. They will be overwritten by values in caf_config.json
61 
62 # Note that you are forced to import the relevant script that you depend on, even though you never use it.
63 # This is to make sure that this script won't run unless the dependent one exists, as well as automatically
64 # checking for circular dependency via Python's import statements.
65 
66 
67 
68 
76 
77 
78 def get_calibrations(input_data, **kwargs):
79  """
80  Parameters:
81  input_data (dict): Should contain every name from the 'input_data_names' variable as a key.
82  Each value is a dictionary with {"/path/to/file_e1_r5.root": IoV(1,5,1,5), ...}. Useful for
83  assigning to calibration.files_to_iov
84 
85  **kwargs: Configuration options to be sent in. Since this may change we use kwargs as a way to help prevent
86  backwards compatibility problems. But you could use the correct arguments in b2caf-prompt-run for this
87  release explicitly if you want to.
88 
89  Currently only kwargs["requested_iov"] and kwargs["expert_config"] are used.
90 
91  "requested_iov" is the IoV range of the bucket and your payloads should correspond to this range.
92  However your highest payload IoV should be open ended e.g. IoV(3,4,-1,-1)
93 
94  "expert_config" is the input configuration. It takes default values from your `CalibrationSettings` but these are
95  overwritten by values from the 'expert_config' key in your input `caf_config.json` file when running ``b2caf-prompt-run``.
96 
97  Returns:
98  list(caf.framework.Calibration): All of the calibration objects we want to assign to the CAF process
99  """
100  # Set up config options
101  import basf2
102  from basf2 import register_module, create_path
103  from ROOT.Belle2 import TestCalibrationAlgorithm, TestBoundarySettingAlgorithm
104  from caf.framework import Calibration, Collection
105  from caf.strategies import SequentialBoundaries
106  from caf.utils import vector_from_runs, ExpRun, IoV
107 
108  # In this script we want to use three different sources of input data, and reconstruct them
109  # differently before the Collector module runs.
110 
111  # Get the input files from the input_data variable
112  file_to_iov_physics = input_data["physics"]
113  file_to_iov_cosmics = input_data["cosmics"]
114  file_to_iov_Bcosmics = input_data["Bcosmics"]
115 
116  # We might have requested an enormous amount of data across a requested range.
117  # There's a LOT more files than runs!
118  # Lets set some limits because this calibration doesn't need that much to run.
119  expert_config = kwargs.get("expert_config")
120  max_files_per_run = expert_config["max_files_per_run"]
121  basf2.B2INFO(f"Reducing to a maximum of {max_files_per_run} files per run.")
122 
123  # We filter out any more than 2 files per run. The input data files are sorted alphabetically by b2caf-prompt-run
124  # already. This procedure respects that ordering
125  from prompt.utils import filter_by_max_files_per_run
126 
127  reduced_file_to_iov_physics = filter_by_max_files_per_run(file_to_iov_physics, max_files_per_run)
128  input_files_physics = list(reduced_file_to_iov_physics.keys())
129  basf2.B2INFO(f"Total number of physics files actually used as input = {len(input_files_physics)}")
130 
131  reduced_file_to_iov_cosmics = filter_by_max_files_per_run(file_to_iov_cosmics, max_files_per_run)
132  input_files_cosmics = list(reduced_file_to_iov_cosmics.keys())
133  basf2.B2INFO(f"Total number of cosmics files actually used as input = {len(input_files_cosmics)}")
134 
135  reduced_file_to_iov_Bcosmics = filter_by_max_files_per_run(file_to_iov_Bcosmics, max_files_per_run)
136  input_files_Bcosmics = list(reduced_file_to_iov_Bcosmics.keys())
137  basf2.B2INFO(f"Total number of Bcosmics files actually used as input = {len(input_files_Bcosmics)}")
138 
139  # Get the overall request IoV we want to cover, including the end values. But we will probably want to replace the end values
140  # with -1, -1 when setting the output payload IoVs.
141  requested_iov = kwargs.get("requested_iov", None)
142 
143  # The actual value our output IoV payload should have. Notice that we've set it open ended.
144  output_iov = IoV(requested_iov.exp_low, requested_iov.run_low, -1, -1)
145 
146 
149  col_test_physics = register_module("CaTest")
150  # This has to be 'run' otherwise our SequentialBoundaries strategy can't work.
151  # We could make it optional, based on the contents of the expert_config.
152  col_test_physics.param("granularity", "run")
153  col_test_physics.param("spread", 4)
154 
155  col_test_Bcosmics = register_module("CaTest")
156  col_test_Bcosmics.param("granularity", "all")
157  col_test_Bcosmics.param("spread", 1)
158 
159  col_test_cosmics = register_module("CaTest")
160  col_test_cosmics.param("granularity", "all")
161  col_test_cosmics.param("spread", 10)
162 
163 
166 
167  # Let's specify that not all events will be used per file for every Collection
168  # Just set this with one element in the list if you use it. The value will be duplicated in collector subjobs if the number
169  # of input files is larger than 1.
170  max_events = expert_config["max_events_per_file"]
171  root_input = register_module("RootInput",
172  entrySequences=[f"0:{max_events}"]
173  )
174 
175  # And/or we could set a prescale so that only a fraction of events pass onwards.
176  # This is most useful for randomly selecting events throughout input files.
177  # Note that if you set the entrySequences AS WELL as a prescale then you will be combining the entrySequences and prescale
178  # so that only a few events are passed into the Prescale module, and then only a fraction of those will continue to the
179  # Collector module.
180  prescale = expert_config["physics_prescale"]
181  prescale_mod = register_module("Prescale", prescale=prescale)
182  empty_path = create_path()
183  prescale_mod.if_false(empty_path, basf2.AfterConditionPath.END)
184 
185  rec_path_physics = create_path()
186  rec_path_physics.add_module(root_input)
187  rec_path_physics.add_module(prescale_mod)
188  # could now add reconstruction modules dependent on the type of input data
189 
190  rec_path_cosmics = create_path()
191  rec_path_cosmics.add_module(root_input)
192  # could now add reconstruction modules dependent on the type of input data
193 
194  rec_path_Bcosmics = create_path()
195  rec_path_Bcosmics.add_module(root_input)
196  # could now add reconstruction modules dependent on the type of input data
197 
198 
201  alg_test1 = TestCalibrationAlgorithm()
202  alg_test2 = TestBoundarySettingAlgorithm()
203 
204  # Send in a list of boundaries for our algorithm class and SequentialBoundaries strategy to use.
205  # A boundary is the STARTING run number for a new payload and all data from runs between this run and the next
206  # boundary will be used.
207  # In our algorithm the first run in our data is always a starting boundary, so we can pass an empty list here
208  # safely and still have it work.
209 
210  # We make sure that the first payload begins at the start of the requested IoV.
211  # This is a quirk of SequentialBoundaries strategy as there must always be one boundary to START from.
212  # You could elect to always set this yourself manually, but that seems error prone.
213  payload_boundaries = [ExpRun(output_iov.exp_low, output_iov.run_low)]
214  # Now we can add the boundaries that exist in the expert config. They are extra boundaries, so that we don't have
215  # to set the initial one every time. If this is an empty list then we effectively run like the SingleIoV strategy.
216  payload_boundaries.extend([ExpRun(*boundary) for boundary in expert_config["payload_boundaries"]])
217  basf2.B2INFO(f"Expert set payload boundaries are: {expert_config['payload_boundaries']}")
218  # Now set them all
219  alg_test2.setBoundaries(vector_from_runs(payload_boundaries)) # This takes boundaries from the expert_config
220 
221 
226  collection_physics = Collection(collector=col_test_physics,
227  input_files=input_files_physics,
228  pre_collector_path=rec_path_physics,
229  max_collector_jobs=4
230  )
231 
232  collection_cosmics = Collection(collector=col_test_cosmics,
233  input_files=input_files_cosmics,
234  pre_collector_path=rec_path_cosmics,
235  max_collector_jobs=2
236  )
237 
238  collection_Bcosmics = Collection(collector=col_test_Bcosmics,
239  input_files=input_files_Bcosmics,
240  pre_collector_path=rec_path_Bcosmics,
241  max_collector_jobs=2
242  )
243 
244 
246 
247  # We will set up two Calibrations. One which depends on the other.
248  # However, the first Calibration will generate payloads that we don't want to save in our output database for upload.
249  # Basically we want to ignore the payloads during the b2caf-prompt-run copying of the outputdb contents.
250  # But we still use them as input to the next calibration.
251 
252  cal_test1 = Calibration("TestCalibration_cosmics")
253  # Add collections in with unique names
254  cal_test1.add_collection(name="cosmics", collection=collection_cosmics)
255  cal_test1.add_collection(name="Bcosmics", collection=collection_Bcosmics)
256  cal_test1.algorithms = [alg_test1]
257  # Do this for the default AlgorithmStrategy to force the output payload IoV
258  cal_test1.algorithms[0].params = {"apply_iov": output_iov}
259  # Mark this calibration as one whose payloads should not be copied at the end.
260  cal_test1.save_payloads = False
261 
262  cal_test2 = Calibration("TestCalibration_physics")
263  # Add collections in with unique names
264  cal_test2.add_collection(name="physics", collection=collection_physics)
265  cal_test2.algorithms = [alg_test2]
266  # We apply a a different strategy that will allow us to split the data we run over into chunks based on the boundaries above
267  cal_test2.strategies = SequentialBoundaries
268  # Do this to force the output payload IoV. Note the different name to above!
269  cal_test2.algorithms[0].params["iov_coverage"] = output_iov
270 
271  cal_test2.depends_on(cal_test1)
272 
273  # You must return all calibrations you want to run in the prompt process
274  return [cal_test1, cal_test2]
275 
276