Belle II Software  release-05-02-19
example_complex.py
1 # -*- coding: utf-8 -*-
2 
3 """A complicated example calibration that takes several input data lists from raw data and performs
4 multiple calibrations. Only the second calibration will have its payloads placed into the final
5 outputdb directory by b2caf-prompt-run.
6 
7 We make it so that this calibration depends on the result of a completely
8 different one 'example_simple'. Even though that calibration will not be run in this process, the automated
9 system can discover this dependency and use it when submitting tasks."""
10 
11 from prompt import CalibrationSettings, input_data_filters
12 
13 
20 
21 # We decide to only run this script once the simple one has run. This only affects the automated system when scheduling
22 # tasks. This script can always be run standalone.
23 from prompt.calibrations.example_simple import settings as example_simple
24 
25 
26 settings = CalibrationSettings(name="Example Complex",
27  expert_username="ddossett",
28  description=__doc__,
29  input_data_formats=["raw"],
30  input_data_names=["physics", "cosmics", "Bcosmics"],
31  input_data_filters={"physics": [f"NOT {input_data_filters['Magnet']['On']}",
32  input_data_filters["Data Tag"]["hadron_calib"],
33  input_data_filters["Data Quality Tag"]["Good"],
34  input_data_filters["Beam Energy"]["4S"],
35  input_data_filters["Run Type"]["physics"]],
36  "cosmics": [input_data_filters['Magnet']['Off'],
37  input_data_filters["Data Tag"]["cosmic_calib"],
38  input_data_filters["Data Quality Tag"]["Bad For Alignment"],
39  input_data_filters["Beam Energy"]["Continuum"],
40  f"NOT {input_data_filters['Run Type']['physics']}"],
41  "Bcosmics": [input_data_filters["Data Tag"]["cosmic_calib"],
42  input_data_filters["Data Quality Tag"]["Good"],
43  input_data_filters["Beam Energy"]["4S"]]},
44  depends_on=[example_simple],
45  expert_config={
46  "physics_prescale": 0.2,
47  "max_events_per_file": 100,
48  "max_files_per_run": 2,
49  "payload_boundaries": []
50  })
51 
52 # The values in expert_config above are the DEFAULT for this script. They will be overwritten by values in caf_config.json
53 
54 # Note that you are forced to import the relevant script that you depend on, even though you never use it.
55 # This is to make sure that this script won't run unless the dependent one exists, as well as automatically
56 # checking for circular dependency via Python's import statements.
57 
58 
59 
60 
68 
69 
70 def get_calibrations(input_data, **kwargs):
71  """
72  Parameters:
73  input_data (dict): Should contain every name from the 'input_data_names' variable as a key.
74  Each value is a dictionary with {"/path/to/file_e1_r5.root": IoV(1,5,1,5), ...}. Useful for
75  assigning to calibration.files_to_iov
76 
77  **kwargs: Configuration options to be sent in. Since this may change we use kwargs as a way to help prevent
78  backwards compatibility problems. But you could use the correct arguments in b2caf-prompt-run for this
79  release explicitly if you want to.
80 
81  Currently only kwargs["requested_iov"] and kwargs["expert_config"] are used.
82 
83  "requested_iov" is the IoV range of the bucket and your payloads should correspond to this range.
84  However your highest payload IoV should be open ended e.g. IoV(3,4,-1,-1)
85 
86  "expert_config" is the input configuration. It takes default values from your `CalibrationSettings` but these are
87  overwritten by values from the 'expert_config' key in your input `caf_config.json` file when running ``b2caf-prompt-run``.
88 
89  Returns:
90  list(caf.framework.Calibration): All of the calibration objects we want to assign to the CAF process
91  """
92  # Set up config options
93  import basf2
94  from basf2 import register_module, create_path
95  from ROOT.Belle2 import TestCalibrationAlgorithm, TestBoundarySettingAlgorithm
96  from caf.framework import Calibration, Collection
97  from caf.strategies import SequentialBoundaries
98  from caf.utils import vector_from_runs, ExpRun, IoV
99 
100  # In this script we want to use three different sources of input data, and reconstruct them
101  # differently before the Collector module runs.
102 
103  # Get the input files from the input_data variable
104  file_to_iov_physics = input_data["physics"]
105  file_to_iov_cosmics = input_data["cosmics"]
106  file_to_iov_Bcosmics = input_data["Bcosmics"]
107 
108  # We might have requested an enormous amount of data across a requested range.
109  # There's a LOT more files than runs!
110  # Lets set some limits because this calibration doesn't need that much to run.
111  expert_config = kwargs.get("expert_config")
112  max_files_per_run = expert_config["max_files_per_run"]
113  basf2.B2INFO(f"Reducing to a maximum of {max_files_per_run} files per run.")
114 
115  # We filter out any more than 2 files per run. The input data files are sorted alphabetically by b2caf-prompt-run
116  # already. This procedure respects that ordering
117  from prompt.utils import filter_by_max_files_per_run
118 
119  reduced_file_to_iov_physics = filter_by_max_files_per_run(file_to_iov_physics, max_files_per_run)
120  input_files_physics = list(reduced_file_to_iov_physics.keys())
121  basf2.B2INFO(f"Total number of physics files actually used as input = {len(input_files_physics)}")
122 
123  reduced_file_to_iov_cosmics = filter_by_max_files_per_run(file_to_iov_cosmics, max_files_per_run)
124  input_files_cosmics = list(reduced_file_to_iov_cosmics.keys())
125  basf2.B2INFO(f"Total number of cosmics files actually used as input = {len(input_files_cosmics)}")
126 
127  reduced_file_to_iov_Bcosmics = filter_by_max_files_per_run(file_to_iov_Bcosmics, max_files_per_run)
128  input_files_Bcosmics = list(reduced_file_to_iov_Bcosmics.keys())
129  basf2.B2INFO(f"Total number of Bcosmics files actually used as input = {len(input_files_Bcosmics)}")
130 
131  # Get the overall request IoV we want to cover, including the end values. But we will probably want to replace the end values
132  # with -1, -1 when setting the output payload IoVs.
133  requested_iov = kwargs.get("requested_iov", None)
134 
135  # The actual value our output IoV payload should have. Notice that we've set it open ended.
136  output_iov = IoV(requested_iov.exp_low, requested_iov.run_low, -1, -1)
137 
138 
141  col_test_physics = register_module("CaTest")
142  # This has to be 'run' otherwise our SequentialBoundaries strategy can't work.
143  # We could make it optional, based on the contents of the expert_config.
144  col_test_physics.param("granularity", "run")
145  col_test_physics.param("spread", 4)
146 
147  col_test_Bcosmics = register_module("CaTest")
148  col_test_Bcosmics.param("granularity", "all")
149  col_test_Bcosmics.param("spread", 1)
150 
151  col_test_cosmics = register_module("CaTest")
152  col_test_cosmics.param("granularity", "all")
153  col_test_cosmics.param("spread", 10)
154 
155 
158 
159  # Let's specify that not all events will be used per file for every Collection
160  # Just set this with one element in the list if you use it. The value will be duplicated in collector subjobs if the number
161  # of input files is larger than 1.
162  max_events = expert_config["max_events_per_file"]
163  root_input = register_module("RootInput",
164  entrySequences=[f"0:{max_events}"]
165  )
166 
167  # And/or we could set a prescale so that only a fraction of events pass onwards.
168  # This is most useful for randomly selecting events throughout input files.
169  # Note that if you set the entrySequences AS WELL as a prescale then you will be combining the entrySequences and prescale
170  # so that only a few events are passed into the Prescale module, and then only a fraction of those will continue to the
171  # Collector module.
172  prescale = expert_config["physics_prescale"]
173  prescale_mod = register_module("Prescale", prescale=prescale)
174  empty_path = create_path()
175  prescale_mod.if_false(empty_path, basf2.AfterConditionPath.END)
176 
177  rec_path_physics = create_path()
178  rec_path_physics.add_module(root_input)
179  rec_path_physics.add_module(prescale_mod)
180  # could now add reconstruction modules dependent on the type of input data
181 
182  rec_path_cosmics = create_path()
183  rec_path_cosmics.add_module(root_input)
184  # could now add reconstruction modules dependent on the type of input data
185 
186  rec_path_Bcosmics = create_path()
187  rec_path_Bcosmics.add_module(root_input)
188  # could now add reconstruction modules dependent on the type of input data
189 
190 
193  alg_test1 = TestCalibrationAlgorithm()
194  alg_test2 = TestBoundarySettingAlgorithm()
195 
196  # Send in a list of boundaries for our algorithm class and SequentialBoundaries strategy to use.
197  # A boundary is the STARTING run number for a new payload and all data from runs between this run and the next
198  # boundary will be used.
199  # In our algorithm the first run in our data is always a starting boundary, so we can pass an empty list here
200  # safely and still have it work.
201 
202  # We make sure that the first payload begins at the start of the requested IoV.
203  # This is a quirk of SequentialBoundaries strategy as there must always be one boundary to START from.
204  # You could elect to always set this yourself manually, but that seems error prone.
205  payload_boundaries = [ExpRun(output_iov.exp_low, output_iov.run_low)]
206  # Now we can add the boundaries that exist in the expert config. They are extra boundaries, so that we don't have
207  # to set the initial one every time. If this is an empty list then we effectively run like the SingleIoV strategy.
208  payload_boundaries.extend([ExpRun(*boundary) for boundary in expert_config["payload_boundaries"]])
209  basf2.B2INFO(f"Expert set payload boundaries are: {expert_config['payload_boundaries']}")
210  # Now set them all
211  alg_test2.setBoundaries(vector_from_runs(payload_boundaries)) # This takes boundaries from the expert_config
212 
213 
218  collection_physics = Collection(collector=col_test_physics,
219  input_files=input_files_physics,
220  pre_collector_path=rec_path_physics,
221  max_collector_jobs=4
222  )
223 
224  collection_cosmics = Collection(collector=col_test_cosmics,
225  input_files=input_files_cosmics,
226  pre_collector_path=rec_path_cosmics,
227  max_collector_jobs=2
228  )
229 
230  collection_Bcosmics = Collection(collector=col_test_Bcosmics,
231  input_files=input_files_Bcosmics,
232  pre_collector_path=rec_path_Bcosmics,
233  max_collector_jobs=2
234  )
235 
236 
238 
239  # We will set up two Calibrations. One which depends on the other.
240  # However, the first Calibration will generate payloads that we don't want to save in our output database for upload.
241  # Basically we want to ignore the payloads during the b2caf-prompt-run copying of the outputdb contents.
242  # But we still use them as input to the next calibration.
243 
244  cal_test1 = Calibration("TestCalibration_cosmics")
245  # Add collections in with unique names
246  cal_test1.add_collection(name="cosmics", collection=collection_cosmics)
247  cal_test1.add_collection(name="Bcosmics", collection=collection_Bcosmics)
248  cal_test1.algorithms = [alg_test1]
249  # Do this for the default AlgorithmStrategy to force the output payload IoV
250  cal_test1.algorithms[0].params = {"apply_iov": output_iov}
251  # Mark this calibration as one whose payloads should not be copied at the end.
252  cal_test1.save_payloads = False
253 
254  cal_test2 = Calibration("TestCalibration_physics")
255  # Add collections in with unique names
256  cal_test2.add_collection(name="physics", collection=collection_physics)
257  cal_test2.algorithms = [alg_test2]
258  # We apply a a different strategy that will allow us to split the data we run over into chunks based on the boundaries above
259  cal_test2.strategies = SequentialBoundaries
260  # Do this to force the output payload IoV. Note the different name to above!
261  cal_test2.algorithms[0].params["iov_coverage"] = output_iov
262 
263  cal_test2.depends_on(cal_test1)
264 
265  # You must return all calibrations you want to run in the prompt process
266  return [cal_test1, cal_test2]
267 
268 
prompt.utils
Definition: utils.py:1
Collection
Definition: Collection.py:1
Calibration
Definition: Calibration.py:1