393 def execute_over_run_list(self, iteration, run_list, lowest_exprun, highest_exprun):
394 """Execute runs given in list"""
395
396 remaining_runs = run_list[:]
397
398 previous_runs = []
399
400 current_runs = []
401
402 last_successful_payloads = None
403 last_successful_result = None
404
405
406 for expruns in grouper(self.algorithm.params["step_size"], run_list):
407
408 if not self.first_execution:
409 self.machine.setup_algorithm()
410 else:
411 self.first_execution = False
412
413
414 current_runs.extend(expruns)
415
416 remaining_runs = [run for run in remaining_runs if run not in current_runs]
417
418
419 if not last_successful_result:
420 B2INFO("Detected that this will be the first payload of this experiment.")
421
422
423 if remaining_runs:
424 apply_iov = IoV(*lowest_exprun, remaining_runs[0].exp, remaining_runs[0].run - 1)
425
426 else:
427 B2INFO("Detected that this will be the only payload of the experiment.")
428 apply_iov = IoV(*lowest_exprun, *highest_exprun)
429
430 else:
431 if not remaining_runs:
432 B2INFO("Detected that there are no more runs to execute in this experiment after this next execution.")
433 apply_iov = IoV(*current_runs[0], *highest_exprun)
434
435 else:
436 B2INFO("Detected that there are more runs to execute in this experiment after this next execution.")
437 apply_iov = IoV(*current_runs[0], remaining_runs[0].exp, remaining_runs[0].run - 1)
438
439 B2INFO(f"Executing and applying {apply_iov} to the payloads.")
440 self.machine.execute_runs(runs=current_runs, iteration=iteration, apply_iov=apply_iov)
441 B2INFO(f"Finished execution with result code {self.machine.result.result}.")
442
443
444 if (self.machine.result.result == AlgResult.ok.value) or (self.machine.result.result == AlgResult.iterate.value):
445 self.machine.complete()
446
447
448 if last_successful_payloads and last_successful_result:
449 B2INFO("Saving this execution's payloads to be committed later.")
450
451 new_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
452 new_successful_result = self.machine.result
453 B2INFO("We just succeeded in execution of the Algorithm."
454 f" Will now commit payloads from the previous success for {last_successful_result.iov}.")
455 self.machine.algorithm.algorithm.commit(last_successful_payloads)
456 self.results.append(last_successful_result)
457 self.send_result(last_successful_result)
458
459 if remaining_runs:
460 last_successful_payloads = new_successful_payloads
461 last_successful_result = new_successful_result
462
463 else:
464 B2INFO("We have no more runs to process. "
465 f"Will now commit the most recent payloads for {new_successful_result.iov}.")
466 self.machine.algorithm.algorithm.commit(new_successful_payloads)
467 self.results.append(new_successful_result)
468 self.send_result(new_successful_result)
469 break
470
471 else:
472
473 if remaining_runs:
474 B2INFO(f"Saving the most recent payloads for {self.machine.result.iov} to be committed later.")
475
476 last_successful_payloads = self.machine.algorithm.algorithm.getPayloadValues()
477 last_successful_result = self.machine.result
478
479 else:
480 B2INFO("We just succeeded in execution of the Algorithm."
481 " No runs left to be processed, so we are committing results of this execution.")
482 self.machine.algorithm.algorithm.commit()
483 self.results.append(self.machine.result)
484 self.send_result(self.machine.result)
485 break
486
487 previous_runs = current_runs[:]
488 current_runs = []
489
490 elif (self.machine.result.result == AlgResult.not_enough_data.value):
491 B2INFO(f"There wasn't enough data in {self.machine.result.iov}.")
492 if remaining_runs:
493 B2INFO("Some runs remain to be processed. "
494 f"Will try to add at most {self.algorithm.params['step_size']} more runs of data and execute again.")
495 elif not remaining_runs and not last_successful_result:
496 B2ERROR("There aren't any more runs remaining to merge with, and we never had a previous success."
497 " There wasn't enough data in the full input data requested.")
498 self.results.append(self.machine.result)
499 self.send_result(self.machine.result)
500 self.machine.fail()
501 break
502 elif not remaining_runs and last_successful_result:
503 B2INFO("There aren't any more runs remaining to merge with. But we had a previous success"
504 ", so we'll merge with the previous IoV.")
505 final_runs = current_runs[:]
506 current_runs = previous_runs
507 current_runs.extend(final_runs)
508 self.machine.fail()
509 elif self.machine.result.result == AlgResult.failure.value:
510 B2ERROR(f"{self.algorithm.name} returned failure exit code.")
511 self.results.append(self.machine.result)
512 self.send_result(self.machine.result)
513 self.machine.fail()
514 break
515 else:
516
517 if current_runs:
518 self.machine.setup_algorithm()
519 apply_iov = IoV(last_successful_result.iov.exp_low,
520 last_successful_result.iov.run_low,
521 *highest_exprun)
522 B2INFO(f"Executing on {apply_iov}.")
523 self.machine.execute_runs(runs=current_runs, iteration=iteration, apply_iov=apply_iov)
524 B2INFO(f"Finished execution with result code {self.machine.result.result}.")
525 if (self.machine.result.result == AlgResult.ok.value) or (
526 self.machine.result.result == AlgResult.iterate.value):
527 self.machine.complete()
528
529 self.machine.algorithm.algorithm.commit()
530
531 self.results.append(self.machine.result)
532 self.send_result(self.machine.result)
533 else:
534
535 self.results.append(self.machine.result)
536 self.send_result(self.machine.result)
537
538 self.machine.fail()
539
540