Belle II Software  release-05-01-25
example_complex.py
1 # -*- coding: utf-8 -*-
2 
3 """A complicated example calibration that takes several input data lists from raw data and performs
4 multiple calibrations. Only the second calibration will have its payloads placed into the final
5 outputdb directory by b2caf-prompt-run.
6 
7 We make it so that this calibration depends on the result of a completely
8 different one 'example_simple'. Even though that calibration will not be run in this process, the automated
9 system can discover this dependency and use it when submitting tasks."""
10 
11 from prompt import CalibrationSettings
12 
13 
20 
21 # We decide to only run this script once the simple one has run. This only affects the automated system when scheduling
22 # tasks. This script can always be run standalone.
23 from prompt.calibrations.example_simple import settings as example_simple
24 
25 
26 settings = CalibrationSettings(name="Example Complex",
27  expert_username="ddossett",
28  description=__doc__,
29  input_data_formats=["raw"],
30  input_data_names=["physics", "cosmics", "Bcosmics"],
31  depends_on=[example_simple],
32  expert_config={
33  "physics_prescale": 0.2,
34  "max_events_per_file": 100,
35  "max_files_per_run": 2,
36  "payload_boundaries": []
37  })
38 
39 # The values in expert_config above are the DEFAULT for this script. They will be overwritten by values in caf_config.json
40 
41 # Note that you are forced to import the relevant script that you depend on, even though you never use it.
42 # This is to make sure that this script won't run unless the dependent one exists, as well as automatically
43 # checking for circular dependency via Python's import statements.
44 
45 
46 
47 
55 
56 
57 def get_calibrations(input_data, **kwargs):
58  """
59  Parameters:
60  input_data (dict): Should contain every name from the 'input_data_names' variable as a key.
61  Each value is a dictionary with {"/path/to/file_e1_r5.root": IoV(1,5,1,5), ...}. Useful for
62  assigning to calibration.files_to_iov
63 
64  **kwargs: Configuration options to be sent in. Since this may change we use kwargs as a way to help prevent
65  backwards compatibility problems. But you could use the correct arguments in b2caf-prompt-run for this
66  release explicitly if you want to.
67 
68  Currently only kwargs["requested_iov"] and kwargs["expert_config"] are used.
69 
70  "requested_iov" is the IoV range of the bucket and your payloads should correspond to this range.
71  However your highest payload IoV should be open ended e.g. IoV(3,4,-1,-1)
72 
73  "expert_config" is the input configuration. It takes default values from your `CalibrationSettings` but these are
74  overwritten by values from the 'expert_config' key in your input `caf_config.json` file when running ``b2caf-prompt-run``.
75 
76  Returns:
77  list(caf.framework.Calibration): All of the calibration objects we want to assign to the CAF process
78  """
79  # Set up config options
80  import basf2
81  from basf2 import register_module, create_path
82  from ROOT.Belle2 import TestCalibrationAlgorithm, TestBoundarySettingAlgorithm
83  from caf.framework import Calibration, Collection
84  from caf.strategies import SequentialBoundaries
85  from caf.utils import vector_from_runs, ExpRun, IoV
86 
87  # In this script we want to use three different sources of input data, and reconstruct them
88  # differently before the Collector module runs.
89 
90  # Get the input files from the input_data variable
91  file_to_iov_physics = input_data["physics"]
92  file_to_iov_cosmics = input_data["cosmics"]
93  file_to_iov_Bcosmics = input_data["Bcosmics"]
94 
95  # We might have requested an enormous amount of data across a requested range.
96  # There's a LOT more files than runs!
97  # Lets set some limits because this calibration doesn't need that much to run.
98  expert_config = kwargs.get("expert_config")
99  max_files_per_run = expert_config["max_files_per_run"]
100  basf2.B2INFO(f"Reducing to a maximum of {max_files_per_run} files per run.")
101 
102  # We filter out any more than 2 files per run. The input data files are sorted alphabetically by b2caf-prompt-run
103  # already. This procedure respects that ordering
104  from prompt.utils import filter_by_max_files_per_run
105 
106  reduced_file_to_iov_physics = filter_by_max_files_per_run(file_to_iov_physics, max_files_per_run)
107  input_files_physics = list(reduced_file_to_iov_physics.keys())
108  basf2.B2INFO(f"Total number of physics files actually used as input = {len(input_files_physics)}")
109 
110  reduced_file_to_iov_cosmics = filter_by_max_files_per_run(file_to_iov_cosmics, max_files_per_run)
111  input_files_cosmics = list(reduced_file_to_iov_cosmics.keys())
112  basf2.B2INFO(f"Total number of cosmics files actually used as input = {len(input_files_cosmics)}")
113 
114  reduced_file_to_iov_Bcosmics = filter_by_max_files_per_run(file_to_iov_Bcosmics, max_files_per_run)
115  input_files_Bcosmics = list(reduced_file_to_iov_Bcosmics.keys())
116  basf2.B2INFO(f"Total number of Bcosmics files actually used as input = {len(input_files_Bcosmics)}")
117 
118  # Get the overall request IoV we want to cover, including the end values. But we will probably want to replace the end values
119  # with -1, -1 when setting the output payload IoVs.
120  requested_iov = kwargs.get("requested_iov", None)
121 
122  # The actual value our output IoV payload should have. Notice that we've set it open ended.
123  output_iov = IoV(requested_iov.exp_low, requested_iov.run_low, -1, -1)
124 
125 
128  col_test_physics = register_module("CaTest")
129  # This has to be 'run' otherwise our SequentialBoundaries strategy can't work.
130  # We could make it optional, based on the contents of the expert_config.
131  col_test_physics.param("granularity", "run")
132  col_test_physics.param("spread", 4)
133 
134  col_test_Bcosmics = register_module("CaTest")
135  col_test_Bcosmics.param("granularity", "all")
136  col_test_Bcosmics.param("spread", 1)
137 
138  col_test_cosmics = register_module("CaTest")
139  col_test_cosmics.param("granularity", "all")
140  col_test_cosmics.param("spread", 10)
141 
142 
145 
146  # Let's specify that not all events will be used per file for every Collection
147  # Just set this with one element in the list if you use it. The value will be duplicated in collector subjobs if the number
148  # of input files is larger than 1.
149  max_events = expert_config["max_events_per_file"]
150  root_input = register_module("RootInput",
151  entrySequences=[f"0:{max_events}"]
152  )
153 
154  # And/or we could set a prescale so that only a fraction of events pass onwards.
155  # This is most useful for randomly selecting events throughout input files.
156  # Note that if you set the entrySequences AS WELL as a prescale then you will be combining the entrySequences and prescale
157  # so that only a few events are passed into the Prescale module, and then only a fraction of those will continue to the
158  # Collector module.
159  prescale = expert_config["physics_prescale"]
160  prescale_mod = register_module("Prescale", prescale=prescale)
161  empty_path = create_path()
162  prescale_mod.if_false(empty_path, basf2.AfterConditionPath.END)
163 
164  rec_path_physics = create_path()
165  rec_path_physics.add_module(root_input)
166  rec_path_physics.add_module(prescale_mod)
167  # could now add reconstruction modules dependent on the type of input data
168 
169  rec_path_cosmics = create_path()
170  rec_path_cosmics.add_module(root_input)
171  # could now add reconstruction modules dependent on the type of input data
172 
173  rec_path_Bcosmics = create_path()
174  rec_path_Bcosmics.add_module(root_input)
175  # could now add reconstruction modules dependent on the type of input data
176 
177 
180  alg_test1 = TestCalibrationAlgorithm()
181  alg_test2 = TestBoundarySettingAlgorithm()
182 
183  # Send in a list of boundaries for our algorithm class and SequentialBoundaries strategy to use.
184  # A boundary is the STARTING run number for a new payload and all data from runs between this run and the next
185  # boundary will be used.
186  # In our algorithm the first run in our data is always a starting boundary, so we can pass an empty list here
187  # safely and still have it work.
188 
189  # We make sure that the first payload begins at the start of the requested IoV.
190  # This is a quirk of SequentialBoundaries strategy as there must always be one boundary to START from.
191  # You could elect to always set this yourself manually, but that seems error prone.
192  payload_boundaries = [ExpRun(output_iov.exp_low, output_iov.run_low)]
193  # Now we can add the boundaries that exist in the expert config. They are extra boundaries, so that we don't have
194  # to set the initial one every time. If this is an empty list then we effectively run like the SingleIoV strategy.
195  payload_boundaries.extend([ExpRun(*boundary) for boundary in expert_config["payload_boundaries"]])
196  basf2.B2INFO(f"Expert set payload boundaries are: {expert_config['payload_boundaries']}")
197  # Now set them all
198  alg_test2.setBoundaries(vector_from_runs(payload_boundaries)) # This takes boundaries from the expert_config
199 
200 
205  collection_physics = Collection(collector=col_test_physics,
206  input_files=input_files_physics,
207  pre_collector_path=rec_path_physics,
208  max_collector_jobs=4
209  )
210 
211  collection_cosmics = Collection(collector=col_test_cosmics,
212  input_files=input_files_cosmics,
213  pre_collector_path=rec_path_cosmics,
214  max_collector_jobs=2
215  )
216 
217  collection_Bcosmics = Collection(collector=col_test_Bcosmics,
218  input_files=input_files_Bcosmics,
219  pre_collector_path=rec_path_Bcosmics,
220  max_collector_jobs=2
221  )
222 
223 
225 
226  # We will set up two Calibrations. One which depends on the other.
227  # However, the first Calibration will generate payloads that we don't want to save in our output database for upload.
228  # Basically we want to ignore the payloads during the b2caf-prompt-run copying of the outputdb contents.
229  # But we still use them as input to the next calibration.
230 
231  cal_test1 = Calibration("TestCalibration_cosmics")
232  # Add collections in with unique names
233  cal_test1.add_collection(name="cosmics", collection=collection_cosmics)
234  cal_test1.add_collection(name="Bcosmics", collection=collection_Bcosmics)
235  cal_test1.algorithms = [alg_test1]
236  # Do this for the default AlgorithmStrategy to force the output payload IoV
237  cal_test1.algorithms[0].params = {"apply_iov": output_iov}
238  # Mark this calibration as one whose payloads should not be copied at the end.
239  cal_test1.save_payloads = False
240 
241  cal_test2 = Calibration("TestCalibration_physics")
242  # Add collections in with unique names
243  cal_test2.add_collection(name="physics", collection=collection_physics)
244  cal_test2.algorithms = [alg_test2]
245  # We apply a a different strategy that will allow us to split the data we run over into chunks based on the boundaries above
246  cal_test2.strategies = SequentialBoundaries
247  # Do this to force the output payload IoV. Note the different name to above!
248  cal_test2.algorithms[0].params["iov_coverage"] = output_iov
249 
250  cal_test2.depends_on(cal_test1)
251 
252  # You must return all calibrations you want to run in the prompt process
253  return [cal_test1, cal_test2]
254 
255 
prompt.utils
Definition: utils.py:1
Collection
Definition: Collection.py:1
Calibration
Definition: Calibration.py:1