Belle II Software development
SequentialRunByRun Class Reference
Inheritance diagram for SequentialRunByRun:
AlgorithmStrategy

Public Member Functions

 __init__ (self, algorithm)
 
 apply_experiment_settings (self, algorithm, experiment)
 
 run (self, iov, iteration, queue)
 
 execute_over_run_list (self, iteration, run_list, lowest_exprun, highest_exprun)
 
 setup_from_dict (self, params)
 
 is_valid (self)
 
 find_iov_gaps (self)
 
 any_failed_iov (self)
 
 send_result (self, result)
 
 send_final_state (self, state)
 

Public Attributes

 machine = AlgorithmMachine(self.algorithm)
 :py:class:caf.state_machines.AlgorithmMachine used to help set up and execute CalibrationAlgorithm It gets setup properly in :py:func:run
 
bool first_execution = True
 boolean storing whether this is the first time the algorithm is executed
 
 algorithm = algorithm
 Algorithm() class that we're running.
 
list input_files = []
 Collector output files, will contain all files returned by the output patterns.
 
str output_dir = ""
 The algorithm output directory which is mostly used to store the stdout file.
 
str output_database_dir = ""
 The output database directory for the localdb that the algorithm will commit to.
 
list database_chain = []
 User defined database chain i.e.
 
list dependent_databases = []
 CAF created local databases from previous calibrations that this calibration/algorithm depends on.
 
list ignored_runs = []
 Runs that will not be included in ANY execution of the algorithm.
 
list results = []
 The list of results objects which will be sent out before the end.
 
 queue = None
 The multiprocessing Queue we use to pass back results one at a time.
 

Static Public Attributes

dict usable_params
 The params that you could set on the Algorithm object which this Strategy would use.
 
list required_attrs
 Required attributes that must exist before the strategy can run properly.
 
list required_true_attrs
 Attributes that must have a value that returns True when tested by :py:meth:is_valid.
 
list allowed_granularities = ["run", "all"]
 Granularity of collector that can be run by this algorithm properly.
 
str FINISHED_RESULTS = "DONE"
 Signal value that is put into the Queue when there are no more results left.
 
str COMPLETED = "COMPLETED"
 Completed state.
 
str FAILED = "FAILED"
 Failed state.
 

Detailed Description

Algorithm strategy to do run-by-run calibration of collected data.
Runs the algorithm over the input data contained within the requested IoV, starting with the first run's data only.
If the algorithm returns 'not enough data' on the current run set, it won't commit the payloads, but instead adds
the next run's data and tries again.

Once an execution on a set of runs return 'iterate' or 'ok' we move onto the next runs (if any are left)
and start the same procedure again. Committing of payloads to the outputdb only happens once we're sure that there
is enough data in the remaining runs to get a full execution. If there isn't enough data remaining, the last runs
are merged with the previous successful execution's runs and a final execution is performed on all remaining runs.

Additionally this strategy will automatically make sure that IoV gaps in your input data are covered by a payload.
This means that there shouldn't be any IoVs that don't get a new payload by the  end of running an iteration.

This uses a `caf.state_machines.AlgorithmMachine` to actually execute the various steps rather than operating on
a CalibrationAlgorithm C++ class directly.

Definition at line 260 of file strategies.py.

Constructor & Destructor Documentation

◆ __init__()

__init__ ( self,
algorithm )
 

Definition at line 289 of file strategies.py.

289 def __init__(self, algorithm):
290 """
291 """
292 super().__init__(algorithm)
293
295 self.machine = AlgorithmMachine(self.algorithm)
296 if "step_size" not in self.algorithm.params:
297 self.algorithm.params["step_size"] = 1
298
299 self.first_execution = True
300

Member Function Documentation

◆ any_failed_iov()

any_failed_iov ( self)
inherited
Returns:
    bool: If any result in the current results list has a failed algorithm code we return True

Definition at line 152 of file strategies.py.

152 def any_failed_iov(self):
153 """
154 Returns:
155 bool: If any result in the current results list has a failed algorithm code we return True
156 """
157 failed_results = []
158 for result in self.results:
159 if result.result == AlgResult.failure.value or result.result == AlgResult.not_enough_data.value:
160 failed_results.append(result)
161 if failed_results:
162 B2WARNING("Failed results found.")
163 for result in failed_results:
164 if result.result == AlgResult.failure.value:
165 B2ERROR(f"c_Failure returned for {result.iov}.")
166 elif result.result == AlgResult.not_enough_data.value:
167 B2WARNING(f"c_NotEnoughData returned for {result.iov}.")
168 return True
169 else:
170 return False
171

◆ apply_experiment_settings()

apply_experiment_settings ( self,
algorithm,
experiment )
Apply experiment-dependent settings.
This is the default version, which does not do anything.
If necessary, it should be reimplemented by derived classes.

Definition at line 301 of file strategies.py.

301 def apply_experiment_settings(self, algorithm, experiment):
302 """
303 Apply experiment-dependent settings.
304 This is the default version, which does not do anything.
305 If necessary, it should be reimplemented by derived classes.
306 """
307 return
308

◆ execute_over_run_list()

execute_over_run_list ( self,
iteration,
run_list,
lowest_exprun,
highest_exprun )
Execute runs given in list

Definition at line 393 of file strategies.py.

393 def execute_over_run_list(self, iteration, run_list, lowest_exprun, highest_exprun):
394 """Execute runs given in list"""
395 # The runs (data) we have left to execute from this run list
396 remaining_runs = run_list[:]
397 # The previous execution's runs
398 previous_runs = []
399 # The current runs we are executing
400 current_runs = []
401 # The last successful payload and result
402 last_successful_payloads = None
403 last_successful_result = None
404
405 # Iterate over ExpRuns within an experiment in chunks of 'step_size'
406 for expruns in grouper(self.algorithm.params["step_size"], run_list):
407 # Already set up earlier the first time, so we shouldn't do it again
408 if not self.first_execution:
409 self.machine.setup_algorithm()
410 else:
411 self.first_execution = False
412
413 # Add on the next step of runs
414 current_runs.extend(expruns)
415 # Remove them from our remaining runs
416 remaining_runs = [run for run in remaining_runs if run not in current_runs]
417
418 # Is this the first payload of the experiment
419 if not last_successful_result:
420 B2INFO("Detected that this will be the first payload of this experiment.")
421 # If this is the first payload but we have other data, we need the IoV to cover from the
422 # lowest IoV extent requested up to the ExpRun right before the next run in the remaining runs list.
423 if remaining_runs:
424 apply_iov = IoV(*lowest_exprun, remaining_runs[0].exp, remaining_runs[0].run - 1)
425 # If this is the first payload but there isn't more data, we set the IoV to cover the full range
426 else:
427 B2INFO("Detected that this will be the only payload of the experiment.")
428 apply_iov = IoV(*lowest_exprun, *highest_exprun)
429 # If there were previous successes
430 else:
431 if not remaining_runs:
432 B2INFO("Detected that there are no more runs to execute in this experiment after this next execution.")
433 apply_iov = IoV(*current_runs[0], *highest_exprun)
434 # Otherwise, it's just a normal IoV in the middle.
435 else:
436 B2INFO("Detected that there are more runs to execute in this experiment after this next execution.")
437 apply_iov = IoV(*current_runs[0], remaining_runs[0].exp, remaining_runs[0].run - 1)
438
439 B2INFO(f"Executing and applying {apply_iov} to the payloads.")
440 self.machine.execute_runs(runs=current_runs, iteration=iteration, apply_iov=apply_iov)
441 B2INFO(f"Finished execution with result code {self.machine.result.result}.")
442
443 # Does this count as a successful execution?
444 if (self.machine.result.result == AlgResult.ok.value) or (self.machine.result.result == AlgResult.iterate.value):
445 self.machine.complete()
446 # If we've succeeded but we have a previous success we can commit the previous payloads
447 # since we have new ones ready
448 if last_successful_payloads and last_successful_result:
449 B2INFO("Saving this execution's payloads to be committed later.")
450 # Save the payloads and result
451 new_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
452 new_successful_result = self.machine.result
453 B2INFO("We just succeeded in execution of the Algorithm."
454 f" Will now commit payloads from the previous success for {last_successful_result.iov}.")
455 self.machine.algorithm.algorithm.commit(last_successful_payloads)
456 self.results.append(last_successful_result)
457 self.send_result(last_successful_result)
458 # If there are remaining runs we need to have the current payloads ready to commit after the next execution
459 if remaining_runs:
460 last_successful_payloads = new_successful_payloads
461 last_successful_result = new_successful_result
462 # If there's not more runs to process we should also commit the new ones
463 else:
464 B2INFO("We have no more runs to process. "
465 f"Will now commit the most recent payloads for {new_successful_result.iov}.")
466 self.machine.algorithm.algorithm.commit(new_successful_payloads)
467 self.results.append(new_successful_result)
468 self.send_result(new_successful_result)
469 break
470 # if there's no previous success this must be the first run executed
471 else:
472 # Need to save payloads for later if we have a success but runs remain
473 if remaining_runs:
474 B2INFO(f"Saving the most recent payloads for {self.machine.result.iov} to be committed later.")
475 # Save the payloads and result
476 last_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
477 last_successful_result = self.machine.result
478 # Need to commit and exit if we have a success and no remaining data
479 else:
480 B2INFO("We just succeeded in execution of the Algorithm."
481 " No runs left to be processed, so we are committing results of this execution.")
482 self.machine.algorithm.algorithm.commit()
483 self.results.append(self.machine.result)
484 self.send_result(self.machine.result)
485 break
486
487 previous_runs = current_runs[:]
488 current_runs = []
489 # If it wasn't successful, was it due to lack of data in the runs?
490 elif (self.machine.result.result == AlgResult.not_enough_data.value):
491 B2INFO(f"There wasn't enough data in {self.machine.result.iov}.")
492 if remaining_runs:
493 B2INFO("Some runs remain to be processed. "
494 f"Will try to add at most {self.algorithm.params['step_size']} more runs of data and execute again.")
495 elif not remaining_runs and not last_successful_result:
496 B2ERROR("There aren't any more runs remaining to merge with, and we never had a previous success."
497 " There wasn't enough data in the full input data requested.")
498 self.results.append(self.machine.result)
499 self.send_result(self.machine.result)
500 self.machine.fail()
501 break
502 elif not remaining_runs and last_successful_result:
503 B2INFO("There aren't any more runs remaining to merge with. But we had a previous success"
504 ", so we'll merge with the previous IoV.")
505 final_runs = current_runs[:]
506 current_runs = previous_runs
507 current_runs.extend(final_runs)
508 self.machine.fail()
509 elif self.machine.result.result == AlgResult.failure.value:
510 B2ERROR(f"{self.algorithm.name} returned failure exit code.")
511 self.results.append(self.machine.result)
512 self.send_result(self.machine.result)
513 self.machine.fail()
514 break
515 else:
516 # Check if we need to run a final execution on the previous execution + dangling set of runs
517 if current_runs:
518 self.machine.setup_algorithm()
519 apply_iov = IoV(last_successful_result.iov.exp_low,
520 last_successful_result.iov.run_low,
521 *highest_exprun)
522 B2INFO(f"Executing on {apply_iov}.")
523 self.machine.execute_runs(runs=current_runs, iteration=iteration, apply_iov=apply_iov)
524 B2INFO(f"Finished execution with result code {self.machine.result.result}.")
525 if (self.machine.result.result == AlgResult.ok.value) or (
526 self.machine.result.result == AlgResult.iterate.value):
527 self.machine.complete()
528 # Commit all the payloads and send out the results
529 self.machine.algorithm.algorithm.commit()
530 # Save the result
531 self.results.append(self.machine.result)
532 self.send_result(self.machine.result)
533 else:
534 # Save the result
535 self.results.append(self.machine.result)
536 self.send_result(self.machine.result)
537 # But failed
538 self.machine.fail()
539
540

◆ find_iov_gaps()

find_iov_gaps ( self)
inherited
Finds and prints the current gaps between the IoVs of the strategy results. Basically these are the IoVs
not covered by any payload. It CANNOT find gaps if they exist across an experiment boundary. Only gaps
within the same experiment are found.

Returns:
    iov_gaps(list[IoV])

Definition at line 132 of file strategies.py.

132 def find_iov_gaps(self):
133 """
134 Finds and prints the current gaps between the IoVs of the strategy results. Basically these are the IoVs
135 not covered by any payload. It CANNOT find gaps if they exist across an experiment boundary. Only gaps
136 within the same experiment are found.
137
138 Returns:
139 iov_gaps(list[IoV])
140 """
141 iov_gaps = find_gaps_in_iov_list(sorted([result.iov for result in self.results]))
142 if iov_gaps:
143 gap_msg = ["Found gaps between IoVs of algorithm results (regardless of result)."]
144 gap_msg.append("You may have requested these gaps deliberately by not passing in data containing these runs.")
145 gap_msg.append("This may not be a problem, but you will not have payoads defined for these IoVs")
146 gap_msg.append("unless you edit the final database.txt yourself.")
147 B2INFO_MULTILINE(gap_msg)
148 for iov in iov_gaps:
149 B2INFO(f"{iov} not covered by any execution of the algorithm.")
150 return iov_gaps
151

◆ is_valid()

is_valid ( self)
inherited
Returns:
    bool: Whether or not this strategy has been set up correctly with all its necessary attributes.

Definition at line 114 of file strategies.py.

114 def is_valid(self):
115 """
116 Returns:
117 bool: Whether or not this strategy has been set up correctly with all its necessary attributes.
118 """
119 B2INFO("Checking validity of current AlgorithmStrategy setup.")
120 # Check if we're somehow missing a required attribute (should be impossible since they get initialised in init)
121 for attribute_name in self.required_attrs:
122 if not hasattr(self, attribute_name):
123 B2ERROR(f"AlgorithmStrategy attribute {attribute_name} doesn't exist.")
124 return False
125 # Check if any attributes that need actual values haven't been set or were empty
126 for attribute_name in self.required_true_attrs:
127 if not getattr(self, attribute_name):
128 B2ERROR(f"AlgorithmStrategy attribute {attribute_name} returned False.")
129 return False
130 return True
131

◆ run()

run ( self,
iov,
iteration,
queue )
Runs the algorithm machine over the collected data and fills the results.

Reimplemented from AlgorithmStrategy.

Definition at line 309 of file strategies.py.

309 def run(self, iov, iteration, queue):
310 """
311 Runs the algorithm machine over the collected data and fills the results.
312 """
313 if not self.is_valid():
314 raise StrategyError("This AlgorithmStrategy was not set up correctly!")
315 self.queue = queue
316 B2INFO(f"Setting up {self.__class__.__name__} strategy for {self.algorithm.name}.")
317 # Now add all the necessary parameters for a strategy to run
318 machine_params = {}
319 machine_params["database_chain"] = self.database_chain
320 machine_params["dependent_databases"] = self.dependent_databases
321 machine_params["output_dir"] = self.output_dir
322 machine_params["output_database_dir"] = self.output_database_dir
323 machine_params["input_files"] = self.input_files
324 machine_params["ignored_runs"] = self.ignored_runs
325 self.machine.setup_from_dict(machine_params)
326 # Start moving through machine states
327 self.machine.setup_algorithm(iteration=iteration)
328 # After this point, the logging is in the stdout of the algorithm
329 B2INFO(f"Beginning execution of {self.algorithm.name} using strategy {self.__class__.__name__}.")
330 runs_to_execute = []
331 all_runs_collected = runs_from_vector(self.algorithm.algorithm.getRunListFromAllData())
332 # If we were given a specific IoV to calibrate we just execute over runs in that IoV
333 if iov:
334 runs_to_execute = runs_overlapping_iov(iov, all_runs_collected)
335 else:
336 runs_to_execute = all_runs_collected[:]
337
338 # Remove the ignored runs from our run list to execute
339 if self.ignored_runs:
340 B2INFO(f"Removing the ignored_runs from the runs to execute for {self.algorithm.name}.")
341 runs_to_execute.difference_update(set(self.ignored_runs))
342 # Sets aren't ordered so lets go back to lists and sort
343 runs_to_execute = sorted(runs_to_execute)
344
345 # We don't want to cross the boundary of Experiments accidentally. So we will split our run list
346 # into separate lists, one for each experiment number contained. That way we can evaluate each experiment
347 # separately and prevent IoVs from crossing the boundary.
348 runs_to_execute = split_runs_by_exp(runs_to_execute)
349
350 # Now iterate through the experiments, executing runs in blocks of 'step_size'. We DO NOT allow a payload IoV to
351 # extend over multiple experiments, only multiple runs
352 iov_coverage = None
353 if "iov_coverage" in self.algorithm.params:
354 B2INFO(f"Detected that you have set iov_coverage to {self.algorithm.params['iov_coverage']}.")
355 iov_coverage = self.algorithm.params["iov_coverage"]
356
357 number_of_experiments = len(runs_to_execute)
358 # Iterate over experiment run lists
359 for i_exp, run_list in enumerate(runs_to_execute, start=1):
360
361 # Apply experiment-dependent settings.
362 if "has_experiment_settings" in self.algorithm.params:
363 if self.algorithm.params["has_experiment_settings"]:
364 self.apply_experiment_settings(self.machine.algorithm.algorithm, run_list[0].exp)
365
366 # If 'iov_coverage' was set in the algorithm.params and it is larger (at both ends) than the
367 # input data runs IoV, then we also have to set the first payload IoV to encompass the missing beginning
368 # of the iov_coverage, and the last payload IoV must cover up to the end of iov_coverage.
369 # This is only true for the lowest and highest experiments in our input data.
370 # If we have multiple experiments the iov must be adjusted to avoid gaps at the iov boundaries
371 lowest_exprun = ExpRun(run_list[0].exp, 0)
372 highest_exprun = ExpRun(run_list[-1].exp, -1)
373
374 if i_exp == 1:
375 lowest_exprun = ExpRun(iov_coverage.exp_low, iov_coverage.run_low) if iov_coverage else run_list[0]
376 if i_exp == number_of_experiments:
377 highest_exprun = ExpRun(iov_coverage.exp_high, iov_coverage.run_high) if iov_coverage else run_list[-1]
378
379 self.execute_over_run_list(iteration, run_list, lowest_exprun, highest_exprun)
380
381 # Print any knowable gaps between result IoVs, if any are foun there is a problem.
382 gaps = self.find_iov_gaps()
383 # Dump them to a file for logging
384 with open(f"{self.algorithm.name}_iov_gaps.json", "w") as f:
385 json.dump(gaps, f)
386
387 # If any results weren't successes we fail
388 if self.any_failed_iov():
389 self.send_final_state(self.FAILED)
390 else:
391 self.send_final_state(self.COMPLETED)
392

◆ send_final_state()

send_final_state ( self,
state )
inherited
send final state

Definition at line 176 of file strategies.py.

176 def send_final_state(self, state):
177 """send final state"""
178 self.queue.put({"type": "final_state", "value": state})
179
180

◆ send_result()

send_result ( self,
result )
inherited
send result

Definition at line 172 of file strategies.py.

172 def send_result(self, result):
173 """send result"""
174 self.queue.put({"type": "result", "value": result})
175

◆ setup_from_dict()

setup_from_dict ( self,
params )
inherited
Parameters:
    params (dict): Dictionary containing values to be assigned to the strategy attributes of the same name.

Definition at line 106 of file strategies.py.

106 def setup_from_dict(self, params):
107 """
108 Parameters:
109 params (dict): Dictionary containing values to be assigned to the strategy attributes of the same name.
110 """
111 for attribute_name, value in params.items():
112 setattr(self, attribute_name, value)
113

Member Data Documentation

◆ algorithm

algorithm = algorithm
inherited

Algorithm() class that we're running.

Definition at line 80 of file strategies.py.

◆ allowed_granularities

list allowed_granularities = ["run", "all"]
staticinherited

Granularity of collector that can be run by this algorithm properly.

Definition at line 65 of file strategies.py.

◆ COMPLETED

str COMPLETED = "COMPLETED"
staticinherited

Completed state.

Definition at line 71 of file strategies.py.

◆ database_chain

list database_chain = []
inherited

User defined database chain i.e.

the default global tag, or if you have localdb's/tags for custom alignment etc

Definition at line 88 of file strategies.py.

◆ dependent_databases

list dependent_databases = []
inherited

CAF created local databases from previous calibrations that this calibration/algorithm depends on.

Definition at line 90 of file strategies.py.

◆ FAILED

str FAILED = "FAILED"
staticinherited

Failed state.

Definition at line 74 of file strategies.py.

◆ FINISHED_RESULTS

str FINISHED_RESULTS = "DONE"
staticinherited

Signal value that is put into the Queue when there are no more results left.

Definition at line 68 of file strategies.py.

◆ first_execution

bool first_execution = True

boolean storing whether this is the first time the algorithm is executed

Definition at line 299 of file strategies.py.

◆ ignored_runs

list ignored_runs = []
inherited

Runs that will not be included in ANY execution of the algorithm.

Usually set by Calibration.ignored_runs. The different strategies may handle the resulting run gaps differently.

Definition at line 93 of file strategies.py.

◆ input_files

list input_files = []
inherited

Collector output files, will contain all files returned by the output patterns.

Definition at line 82 of file strategies.py.

◆ machine

machine = AlgorithmMachine(self.algorithm)

:py:class:caf.state_machines.AlgorithmMachine used to help set up and execute CalibrationAlgorithm It gets setup properly in :py:func:run

Definition at line 295 of file strategies.py.

◆ output_database_dir

str output_database_dir = ""
inherited

The output database directory for the localdb that the algorithm will commit to.

Definition at line 86 of file strategies.py.

◆ output_dir

str output_dir = ""
inherited

The algorithm output directory which is mostly used to store the stdout file.

Definition at line 84 of file strategies.py.

◆ queue

queue = None
inherited

The multiprocessing Queue we use to pass back results one at a time.

Definition at line 97 of file strategies.py.

◆ required_attrs

list required_attrs
staticinherited
Initial value:
= ["algorithm",
"database_chain",
"dependent_databases",
"output_dir",
"output_database_dir",
"input_files",
"ignored_runs"
]

Required attributes that must exist before the strategy can run properly.

Some are allowed be values that return False when tested e.g. "" or []

Definition at line 48 of file strategies.py.

◆ required_true_attrs

list required_true_attrs
staticinherited
Initial value:
= ["algorithm",
"output_dir",
"output_database_dir",
"input_files"
]

Attributes that must have a value that returns True when tested by :py:meth:is_valid.

Definition at line 58 of file strategies.py.

◆ results

list results = []
inherited

The list of results objects which will be sent out before the end.

Definition at line 95 of file strategies.py.

◆ usable_params

dict usable_params
static
Initial value:
= {
"has_experiment_settings": bool,
"iov_coverage": IoV,
"step_size": int
}

The params that you could set on the Algorithm object which this Strategy would use.

Just here for documentation reasons.

Definition at line 280 of file strategies.py.


The documentation for this class was generated from the following file: