Belle II Software  release-05-02-19
framework.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 
4 """
5 This module implements several objects/functions to configure and run calibrations.
6 These classes are used to construct the workflow of the calibration job.
7 The actual processing code is mostly in the `caf.state_machines` module.
8 """
9 
10 __all__ = ["CalibrationBase", "Calibration", "Algorithm", "CAF"]
11 
12 import os
13 from threading import Thread
14 from time import sleep
15 from pathlib import Path
16 import shutil
17 from glob import glob
18 
19 from basf2 import B2ERROR, B2WARNING, B2INFO, B2FATAL, B2DEBUG
20 from basf2 import find_file
21 from basf2 import conditions as b2conditions
22 
23 from abc import ABC, abstractmethod
24 
25 import caf
26 from caf.utils import B2INFO_MULTILINE
27 from caf.utils import past_from_future_dependencies
28 from caf.utils import topological_sort
29 from caf.utils import all_dependencies
30 from caf.utils import method_dispatch
31 from caf.utils import temporary_workdir
32 from caf.utils import find_int_dirs
33 from caf.utils import LocalDatabase
34 from caf.utils import CentralDatabase
35 from caf.utils import parse_file_uri
36 
37 import caf.strategies as strategies
38 import caf.runners as runners
39 from caf import backends
40 from caf.backends import MaxSubjobsSplitter, MaxFilesSplitter
41 from caf.state_machines import CalibrationMachine, ConditionError, MachineError
42 from caf.database import CAFDB
43 
44 
45 class Collection():
46  """
47  Keyword Arguments:
48  collector (str, basf2.Module): The collector module or module name for this `Collection`.
49  input_files (list[str]): The input files to be used for only this `Collection`.
50  pre_collection_path (basf2.Path): The reconstruction `basf2.Path` to be run prior to the Collector module.
51  database_chain (list[CentralDatabase, LocalDatabase]): The database chain to be used initially for this `Collection`.
52  output_patterns (list[str]): Output patterns of files produced by collector which will be used to pass to the
53  `Algorithm.data_input` function. Setting this here, replaces the default completely.
54  max_files_for_collector_job (int): Maximum number of input files sent to each collector subjob for this `Collection`.
55  Technically this sets the SubjobSplitter to be used, not compatible with max_collector_jobs.
56  max_collector_jobs (int): Maximum number of collector subjobs for this `Collection`.
57  Input files are split evenly between them. Technically this sets the SubjobSplitter to be used. Not compatible with
58  max_files_for_collector_job.
59  backend_args (dict): The args for the backend submission of this `Collection`.
60  """
61 
63  default_max_collector_jobs = 1000
64 
66  job_config = "collector_job.json"
67 
68  def __init__(self,
69  collector=None,
70  input_files=None,
71  pre_collector_path=None,
72  database_chain=None,
73  output_patterns=None,
74  max_files_per_collector_job=None,
75  max_collector_jobs=None,
76  backend_args=None
77  ):
78 
79  self.collector = collector
80 
81  self.input_files = []
82  if input_files:
83  self.input_files = input_files
84 
90  self.files_to_iovs = {}
91 
95  self.pre_collector_path = None
96  if pre_collector_path:
97  self.pre_collector_path = pre_collector_path
98 
102  self.output_patterns = ["CollectorOutput.root"]
103  if output_patterns:
104  self.output_patterns = output_patterns
105 
106 
108  self.splitter = None
109  if max_files_per_collector_job and max_collector_jobs:
110  B2FATAL("Cannot set both 'max_files_per_collector_job' and 'max_collector_jobs' of a collection!")
111  elif max_files_per_collector_job:
112  self.max_files_per_collector_job = max_files_per_collector_job
113  elif max_collector_jobs:
114  self.max_collector_jobs = max_collector_jobs
115  else:
117 
118 
120  self.backend_args = {}
121  if backend_args:
122  self.backend_args = backend_args
123 
124  if database_chain:
125 
128  self.database_chain = database_chain
129  else:
130  self.database_chain = []
131  # This may seem weird but the changes to the DB interface mean that they have effectively swapped from being
132  # described well by appending to a list to a deque. So we do bit of reversal to translate it back and make the
133  # most important GT the last one encountered.
134  for tag in reversed(b2conditions.default_globaltags):
135  self.use_central_database(tag)
136 
137  self.job_script = Path(find_file("calibration/scripts/caf/run_collector_path.py")).absolute()
138  """The basf2 steering file that will be used for Collector jobs run by this collection.
139 This script will be copied into subjob directories as part of the input sandbox."""
140 
141 
142  self.job_cmd = ["basf2", self.job_script.name, "--job-information job_info.json"]
143 
144  def reset_database(self):
145  """
146  Remove everything in the database_chain of this Calibration, including the default central database
147  tag automatically included from `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`.
148  """
149  self.database_chain = []
150 
151  def use_central_database(self, global_tag):
152  """
153  Parameters:
154  global_tag (str): The central database global tag to use for this calibration.
155 
156  Using this allows you to add a central database to the head of the global tag database chain for this collection.
157  The default database chain is just the central one from `basf2.conditions.default_globaltags`.
158  The input file global tag will always be overrided and never used unless explicitly set.
159 
160  To turn off central database completely or use a custom tag as the base, you should call `Calibration.reset_database`
161  and start adding databases with `Calibration.use_local_database` and `Calibration.use_central_database`.
162 
163  Alternatively you could set an empty list as the input database_chain when adding the Collection to the Calibration.
164 
165  NOTE!! Since ``release-04-00-00`` the behaviour of basf2 conditions databases has changed.
166  All local database files MUST now be at the head of the 'chain', with all central database global tags in their own
167  list which will be checked after all local database files have been checked.
168 
169  So even if you ask for ``["global_tag1", "localdb/database.txt", "global_tag2"]`` to be the database chain, the real order
170  that basf2 will use them is ``["global_tag1", "global_tag2", "localdb/database.txt"]`` where the file is checked first.
171  """
172  central_db = CentralDatabase(global_tag)
173  self.database_chain.append(central_db)
174 
175  def use_local_database(self, filename, directory=""):
176  """
177  Parameters:
178  filename (str): The path to the database.txt of the local database
179  directory (str): The path to the payloads directory for this local database.
180 
181  Append a local database to the chain for this collection.
182  You can call this function multiple times and each database will be added to the chain IN ORDER.
183  The databases are applied to this collection ONLY.
184 
185  NOTE!! Since release-04-00-00 the behaviour of basf2 conditions databases has changed.
186  All local database files MUST now be at the head of the 'chain', with all central database global tags in their own
187  list which will be checked after all local database files have been checked.
188 
189  So even if you ask for ["global_tag1", "localdb/database.txt", "global_tag2"] to be the database chain, the real order
190  that basf2 will use them is ["global_tag1", "global_tag2", "localdb/database.txt"] where the file is checked first.
191  """
192  local_db = LocalDatabase(filename, directory)
193  self.database_chain.append(local_db)
194 
195  @staticmethod
196  def uri_list_from_input_file(input_file):
197  """
198  Parameters:
199  input_file (str): A local file/glob pattern or XROOTD URI
200 
201  Returns:
202  list: A list of the URIs found from the initial string.
203  """
204  # By default we assume it is a local file path if no "scheme" is given
205  uri = parse_file_uri(input_file)
206  if uri.scheme == "file":
207  # For local files we also perform a glob just in case it is a wildcard pattern.
208  # That way we will have all the uris of files separately
209  uris = [parse_file_uri(f).geturl() for f in glob(input_file)]
210  else:
211  # Just let everything else through and hop the backend can figure it out
212  uris = [input_file]
213  return uris
214 
215  @property
216  def input_files(self):
217  return self._input_files
218 
219  @input_files.setter
220  def input_files(self, value):
221  if isinstance(value, str):
222  # If it's a string, we convert to a list of URIs
223  self._input_files = self.uri_list_from_input_file(value)
224  elif isinstance(value, list):
225  # If it's a list we loop and do the same thing
226  total_files = []
227  for pattern in value:
228  total_files.extend(self.uri_list_from_input_file(pattern))
229  self._input_files = total_files
230  else:
231  raise TypeError("Input files must be a list or string")
232 
233  @property
234  def collector(self):
235  """
236  """
237  return self._collector
238 
239  @collector.setter
240  def collector(self, collector):
241  """
242  """
243  # check if collector is already a module or if we need to create one
244  # from the name
245  if collector:
246  from basf2 import Module
247  if isinstance(collector, str):
248  from basf2 import register_module
249  collector = register_module(collector)
250  if not isinstance(collector, Module):
251  B2ERROR("Collector needs to be either a Module or the name of such a module")
252 
253  self._collector = collector
254 
255  def is_valid(self):
256  if (not self.collector or not self.input_files):
257  return False
258  else:
259  return True
260 
261  @property
262  def max_collector_jobs(self):
263  if self.splitter:
264  return self.splitter.max_subjobs
265  else:
266  return None
267 
268  @max_collector_jobs.setter
269  def max_collector_jobs(self, value):
270  if value is None:
271  self.splitter = None
272  else:
273  self.splitter = MaxSubjobsSplitter(max_subjobs=value)
274 
275  @property
276  def max_files_per_collector_job(self):
277  if self.splitter:
278  return self.splitter.max_files_per_subjob
279  else:
280  return None
281 
282  @max_files_per_collector_job.setter
283  def max_files_per_collector_job(self, value):
284  if value is None:
285  self.splitter = None
286  else:
287  self.splitter = MaxFilesSplitter(max_files_per_subjob=value)
288 
289 
290 class CalibrationBase(ABC, Thread):
291  """
292  Abstract base class of Calibration types. The CAF implements the :py:class:`Calibration` class which inherits from
293  this and runs the C++ CalibrationCollectorModule and CalibrationAlgorithm classes. But by inheriting from this
294  class and providing the minimal necessary methods/attributes you could plug in your own Calibration types
295  that doesn't depend on the C++ CAF at all and run everything in your own way.
296 
297  .. warning:: Writing your own class inheriting from :py:class:`CalibrationBase` class is not recommended!
298  But it's there if you really need it.
299 
300  Parameters:
301  name (str): Name of this calibration object. Should be unique if you are going to run it.
302 
303  Keyword Arguments:
304  input_files (list[str]): Input files for this calibration. May contain wildcard expressions useable by `glob.glob`.
305  """
306 
308  end_state = "completed"
309 
311  fail_state = "failed"
312 
313  def __init__(self, name, input_files=None):
314  """
315  """
316  super().__init__()
317 
318  self.name = name
319 
321 
322  self.dependencies = []
323 
329  self.files_to_iovs = {}
330  if input_files:
331 
332  self.input_files = input_files
333  else:
334  self.input_files = []
335 
336 
337  self.iov = None
338 
340 
342  self.save_payloads = True
343 
344  self.jobs_to_submit = []
345 
346  @abstractmethod
347  def run(self):
348  """
349  The most important method. Runs inside a new Thread and is called from `CalibrationBase.start`
350  once the dependencies of this `CalibrationBase` have returned with state == end_state i.e. "completed".
351  """
352 
353  @abstractmethod
354  def is_valid(self):
355  """
356  A simple method you should implement that will return True or False depending on whether
357  the Calibration has been set up correctly and can be run safely.
358  """
359 
360  def depends_on(self, calibration):
361  """
362  Parameters:
363  calibration (`CalibrationBase`): The Calibration object which will produce constants that this one depends on.
364 
365  Adds dependency of this calibration on another i.e. This calibration
366  will not run until the dependency has completed, and the constants produced
367  will be used via the database chain.
368 
369  You can define multiple dependencies for a single calibration simply
370  by calling this multiple times. Be careful when adding the calibration into
371  the `CAF` not to add a circular/cyclic dependency. If you do the sort will return an
372  empty order and the `CAF` processing will fail.
373 
374  This function appens to the `CalibrationBase.dependencies` and `CalibrationBase.future_dependencies` attributes of this
375  `CalibrationBase` and the input one respectively. This prevents us having to do too much recalculation later on.
376  """
377  # Check that we don't have two calibration names that are the same
378  if self.name != calibration.name:
379  # Tests if we have the calibrations added as dependencies already and adds if not
380  if calibration not in self.dependencies:
381  self.dependencies.append(calibration)
382  if self not in calibration.dependencies:
383  calibration.future_dependencies.append(self)
384  else:
385  B2WARNING((f"Tried to add {calibration} as a dependency for {self} but they have the same name."
386  "Dependency was not added."))
387 
388  def dependencies_met(self):
389  """
390  Checks if all of the Calibrations that this one depends on have reached a successful end state.
391  """
392  return all(map(lambda x: x.state == x.end_state, self.dependencies))
393 
395  """
396  Returns the list of calibrations in our dependency list that have failed.
397  """
398  failed = []
399  for calibration in self.dependencies:
400  if calibration.state == self.fail_state:
401  failed.append(calibration)
402  return failed
403 
404  def _apply_calibration_defaults(self, defaults):
405  """
406  We pass in default calibration options from the `CAF` instance here if called.
407  Won't overwrite any options already set.
408  """
409  for key, value in defaults.items():
410  try:
411  if getattr(self, key) is None:
412  setattr(self, key, value)
413  except AttributeError:
414  print(f"The calibration {self.name} does not support the attribute {key}.")
415 
416 
418  """
419  Every Calibration object must have at least one collector at least one algorithm.
420  You have the option to add in your collector/algorithm by argument here, or add them
421  later by changing the properties.
422 
423  If you plan to use multiple `Collection` objects I recommend that you only set the name here and add the Collections
424  separately via `add_collection()`.
425 
426  Parameters:
427  name (str): Name of this calibration. It should be unique for use in the `CAF`
428  Keyword Arguments:
429  collector (str, `basf2.Module`): Should be set to a CalibrationCollectorModule() or a string with the module name.
430  algorithms (list, ``ROOT.Belle2.CalibrationAlgorithm``): The algorithm(s) to use for this `Calibration`.
431  input_files (str, list[str]): Input files for use by this Calibration. May contain wildcards useable by `glob.glob`
432 
433  A Calibration won't be valid in the `CAF` until it has all of these four attributes set. For example:
434 
435  >>> cal = Calibration('TestCalibration1')
436  >>> col1 = register_module('CaTest')
437  >>> cal.add_collection('TestColl', col1)
438 
439  or equivalently
440 
441  >>> cal = Calibration('TestCalibration1', 'CaTest')
442 
443  If you want to run a basf2 :py:class:`path <basf2.Path>` before your collector module when running over data
444 
445  >>> cal.pre_collector_path = my_basf2_path
446 
447  You don't have to put a RootInput module in this pre-collection path, but you can if
448  you need some special parameters. If you want to process sroot files the you have to explicitly add
449  SeqRootInput to your pre-collection path.
450  The inputFileNames parameter of (Seq)RootInput will be set by the CAF automatically for you.
451 
452 
453  You can use optional arguments to pass in some/all during initialisation of the `Calibration` class
454 
455  >>> cal = Calibration( 'TestCalibration1', 'CaTest', [alg1,alg2], ['/path/to/file.root'])
456 
457  you can change the input file list later on, before running with `CAF`
458 
459  >>> cal.input_files = ['path/to/*.root', 'other/path/to/file2.root']
460 
461  If you have multiple collections from calling `add_collection()` then you should instead set the pre_collector_path,
462  input_files, database chain etc from there. See `Collection`.
463 
464  Adding the CalibrationAlgorithm(s) is easy
465 
466  >>> alg1 = TestAlgo()
467  >>> cal.algorithms = alg1
468 
469  Or equivalently
470 
471  >>> cal.algorithms = [alg1]
472 
473  Or for multiple algorithms for one collector
474 
475  >>> alg2 = TestAlgo()
476  >>> cal.algorithms = [alg1, alg2]
477 
478  Note that when you set the algorithms, they are automatically wrapped and stored as a Python class
479  `Algorithm`. To access the C++ algorithm clas underneath directly do:
480 
481  >>> cal.algorithms[i].algorithm
482 
483  If you have a setup function that you want to run before each of the algorithms, set that with
484 
485  >>> cal.pre_algorithms = my_function_object
486 
487  If you want a different setup for each algorithm use a list with the same number of elements
488  as your algorithm list.
489 
490  >>> cal.pre_algorithms = [my_function1, my_function2, ...]
491 
492  You can also specify the dependencies of the calibration on others
493 
494  >>> cal.depends_on(cal2)
495 
496  By doing this, the `CAF` will respect the ordering of the calibrations and will pass the
497  calibration constants created by earlier completed calibrations to dependent ones.
498  """
499 
500  moves = ["submit_collector", "complete", "run_algorithms", "iterate", "fail_fully"]
501 
502  alg_output_dir = "algorithm_output"
503 
504  checkpoint_states = ["init", "collector_completed", "completed"]
505 
506  default_collection_name = "default"
507 
508  def __init__(self,
509  name,
510  collector=None,
511  algorithms=None,
512  input_files=None,
513  pre_collector_path=None,
514  database_chain=None,
515  output_patterns=None,
516  max_files_per_collector_job=None,
517  max_collector_jobs=None,
518  backend_args=None
519  ):
520  """
521  """
522 
523  self.collections = {}
524 
525  self._algorithms = []
526 
527  # Default collection added, will have None type and requires setting later via `self.collector`, or will take the
528  # CollectorModule/module name directly.
530  Collection(collector,
531  input_files,
532  pre_collector_path,
533  database_chain,
534  output_patterns,
535  max_files_per_collector_job,
536  max_collector_jobs,
537  backend_args
538  ))
539 
540  super().__init__(name, input_files)
541  if algorithms:
542 
545  self.algorithms = algorithms
546 
547  self.results = {}
548 
550  self.max_iterations = None
551 
555  self.ignored_runs = None
556  if self.algorithms:
557 
561  if database_chain:
562 
564  self.database_chain = database_chain
565  else:
566  self.database_chain = []
567  # This database is already applied to the `Collection` automatically, so don't do it again
568  for tag in reversed(b2conditions.default_globaltags):
569  self.use_central_database(tag, apply_to_default_collection=False)
570 
574 
576  self.backend = None
577 
582 
583  self.heartbeat = 3
584 
585  self.machine = None
586 
587  self._db_path = None
588 
589  def add_collection(self, name, collection):
590  """
591  Parameters:
592  name (str): Unique name of this `Collection` in the Calibration.
593  collection (`Collection`): `Collection` object to use.
594 
595  Adds a new `Collection` object to the `Calibration`. Any valid Collection will be used in the Calibration.
596  A default Collection is automatically added but isn't valid and won't run unless you have assigned a collector
597  + input files.
598  You can ignore the default one and only add your own custom Collections. You can configure the default from the
599  Calibration(...) arguments or after creating the Calibration object via directly setting the cal.collector, cal.input_files
600  attributes.
601  """
602  if name not in self.collections:
603  self.collections[name] = collection
604  else:
605  B2WARNING(f"A Collection with the name '{name}' already exists in this Calibration. It has not been added."
606  "Please use another name.")
607 
608  def is_valid(self):
609  """
610  A full calibration consists of a collector AND an associated algorithm AND input_files.
611 
612  Returns False if:
613  1) We are missing any of the above.
614  2) There are multiple Collections and the Collectors have mis-matched granularities.
615  3) Any of our Collectors have granularities that don't match what our Strategy can use.
616  """
617  if not self.algorithms:
618  B2WARNING(f"Empty algorithm list for {self.name}.")
619  return False
620 
621  if not any([collection.is_valid() for collection in self.collections.values()]):
622  B2WARNING(f"No valid Collections for {self.name}.")
623  return False
624 
625  granularities = []
626  for collection in self.collections.values():
627  if collection.is_valid():
628  collector_params = collection.collector.available_params()
629  for param in collector_params:
630  if param.name == "granularity":
631  granularities.append(param.values)
632  if len(set(granularities)) > 1:
633  B2WARNING("Multiple different granularities set for the Collections in this Calibration.")
634  return False
635 
636  for alg in self.algorithms:
637  alg_type = type(alg.algorithm).__name__
638  incorrect_gran = [granularity not in alg.strategy.allowed_granularities for granularity in granularities]
639  if any(incorrect_gran):
640  B2WARNING(f"Selected strategy for {alg_type} does not match a collector's granularity.")
641  return False
642  return True
643 
644  def reset_database(self, apply_to_default_collection=True):
645  """
646  Keyword Arguments:
647  apply_to_default_collection (bool): Should we also reset the default collection?
648 
649  Remove everything in the database_chain of this Calibration, including the default central database
650  tag automatically included from `basf2.conditions.default_globaltags`. This will NOT affect the database chain of any
651  `Collection` other than the default one. You can prevent the default Collection from having its chain reset by setting
652  'apply_to_default_collection' to False.
653  """
654  self.database_chain = []
655  if self.default_collection_name in self.collections and apply_to_default_collection:
657 
658  def use_central_database(self, global_tag, apply_to_default_collection=True):
659  """
660  Parameters:
661  global_tag (str): The central database global tag to use for this calibration.
662 
663  Keyword Arguments:
664  apply_to_default_collection (bool): Should we also call use_central_database on the default collection (if it exists)
665 
666  Using this allows you to append a central database to the database chain for this calibration.
667  The default database chain is just the central one from `basf2.conditions.default_globaltags`.
668  To turn off central database completely or use a custom tag as the base, you should call `Calibration.reset_database`
669  and start adding databases with `Calibration.use_local_database` and `Calibration.use_central_database`.
670 
671  Note that the database chain attached to the `Calibration` will only affect the default `Collection` (if it exists),
672  and the algorithm processes. So calling:
673 
674  >> cal.use_central_database("global_tag")
675 
676  will modify the database chain used by all the algorithms assigned to this `Calibration`, and modifies the database chain
677  assigned to
678 
679  >> cal.collections['default'].database_chain
680 
681  But calling
682 
683  >> cal.use_central_database(file_path, payload_dir, False)
684 
685  will add the database to the Algorithm processes, but leave the default Collection database chain untouched.
686  So if you have multiple Collections in this Calibration *their database chains are separate*.
687  To specify an additional `CentralDatabase` for a different collection, you will have to call:
688 
689  >> cal.collections['OtherCollection'].use_central_database("global_tag")
690  """
691  central_db = CentralDatabase(global_tag)
692  self.database_chain.append(central_db)
693  if self.default_collection_name in self.collections and apply_to_default_collection:
695 
696  def use_local_database(self, filename, directory="", apply_to_default_collection=True):
697  """
698  Parameters:
699  filename (str): The path to the database.txt of the local database
700 
701  Keyword Argumemts:
702  directory (str): The path to the payloads directory for this local database.
703  apply_to_default_collection (bool): Should we also call use_local_database on the default collection (if it exists)
704 
705  Append a local database to the chain for this calibration.
706  You can call this function multiple times and each database will be added to the chain IN ORDER.
707  The databases are applied to this calibration ONLY.
708  The Local and Central databases applied via these functions are applied to the algorithm processes and optionally
709  the default `Collection` job as a database chain.
710  There are other databases applied to the processes later, checked by basf2 in this order:
711 
712  1) Local Database from previous iteration of this Calibration.
713  2) Local Database chain from output of previous dependent Calibrations.
714  3) This chain of Local and Central databases where the last added is checked first.
715 
716  Note that this function on the `Calibration` object will only affect the default `Collection` if it exists and if
717  'apply_to_default_collection' remains True. So calling:
718 
719  >> cal.use_local_database(file_path, payload_dir)
720 
721  will modify the database chain used by all the algorithms assigned to this `Calibration`, and modifies the database chain
722  assigned to
723 
724  >> cal.collections['default'].database_chain
725 
726  But calling
727 
728  >> cal.use_local_database(file_path, payload_dir, False)
729 
730  will add the database to the Algorithm processes, but leave the default Collection database chain untouched.
731 
732  If you have multiple Collections in this Calibration *their database chains are separate*.
733  To specify an additional `LocalDatabase` for a different collection, you will have to call:
734 
735  >> cal.collections['OtherCollection'].use_local_database(file_path, payload_dir)
736 
737  """
738  local_db = LocalDatabase(filename, directory)
739  self.database_chain.append(local_db)
740  if self.default_collection_name in self.collections and apply_to_default_collection:
741  self.collections[self.default_collection_name].use_local_database(filename, directory)
742 
743  def _get_default_collection_attribute(self, attr):
744  if self.default_collection_name in self.collections:
745  return getattr(self.collections[self.default_collection_name], attr)
746  else:
747  B2WARNING(f"You tried to get the attribute '{attr}' from the Calibration '{self.name}', "
748  "but the default collection doesn't exist."
749  f"You should use the cal.collections['CollectionName'].{attr} to access a custom "
750  "collection's attributes directly.")
751  return None
752 
753  def _set_default_collection_attribute(self, attr, value):
754  if self.default_collection_name in self.collections:
755  setattr(self.collections[self.default_collection_name], attr, value)
756  else:
757  B2WARNING(f"You tried to set the attribute '{attr}' from the Calibration '{self.name}', "
758  "but the default collection doesn't exist."
759  f"You should use the cal.collections['CollectionName'].{attr} to access a custom "
760  "collection's attributes directly.")
761 
762  @property
763  def collector(self):
764  """
765  """
766  return self._get_default_collection_attribute("collector")
767 
768  @collector.setter
769  def collector(self, collector):
770  """
771  """
772  # check if collector is already a module or if we need to create one
773  # from the name
774  from basf2 import Module
775  if isinstance(collector, str):
776  from basf2 import register_module
777  collector = register_module(collector)
778  if not isinstance(collector, Module):
779  B2ERROR("Collector needs to be either a Module or the name of such a module")
780 
781  self._set_default_collection_attribute("collector", collector)
782 
783  @property
784  def input_files(self):
785  """
786  """
787  return self._get_default_collection_attribute("input_files")
788 
789  @input_files.setter
790  def input_files(self, files):
791  """
792  """
793  self._set_default_collection_attribute("input_files", files)
794 
795  @property
796  def files_to_iovs(self):
797  """
798  """
799  return self._get_default_collection_attribute("files_to_iovs")
800 
801  @files_to_iovs.setter
802  def files_to_iovs(self, file_map):
803  """
804  """
805  self._set_default_collection_attribute("files_to_iovs", file_map)
806 
807  @property
809  """
810  """
811  return self._get_default_collection_attribute("pre_collector_path")
812 
813  @pre_collector_path.setter
814  def pre_collector_path(self, path):
815  """
816  """
817  self._set_default_collection_attribute("pre_collector_path", path)
818 
819  @property
820  def output_patterns(self):
821  """
822  """
823  return self._get_default_collection_attribute("output_patterns")
824 
825  @output_patterns.setter
826  def output_patterns(self, patterns):
827  """
828  """
829  self._set_default_collection_attribute("output_patterns", patterns)
830 
831  @property
833  """
834  """
835  return self._get_default_collection_attribute("max_files_per_collector_job")
836 
837  @max_files_per_collector_job.setter
838  def max_files_per_collector_job(self, max_files):
839  """
840  """
841  self._set_default_collection_attribute("max_files_per_collector_job", max_files)
842 
843  @property
845  """
846  """
847  return self._get_default_collection_attribute("max_collector_jobs")
848 
849  @max_collector_jobs.setter
850  def max_collector_jobs(self, max_jobs):
851  """
852  """
853  self._set_default_collection_attribute("max_collector_jobs", max_jobs)
854 
855  @property
856  def backend_args(self):
857  """
858  """
859  return self._get_default_collection_attribute("backend_args")
860 
861  @backend_args.setter
862  def backend_args(self, args):
863  """
864  """
865  self._set_default_collection_attribute("backend_args", args)
866 
867  @property
868  def algorithms(self):
869  """
870  """
871  return self._algorithms
872 
873  @algorithms.setter
874  @method_dispatch
875  def algorithms(self, value):
876  """
877  """
878  from ROOT.Belle2 import CalibrationAlgorithm
879  if isinstance(value, CalibrationAlgorithm):
880  self._algorithms = [Algorithm(value)]
881  else:
882  B2ERROR(f"Something other than CalibrationAlgorithm instance passed in ({type(value)}). "
883  "Algorithm needs to inherit from Belle2::CalibrationAlgorithm")
884 
885  @algorithms.fset.register(tuple)
886  @algorithms.fset.register(list)
887  def _(self, value):
888  """
889  Alternate algorithms setter for lists and tuples of CalibrationAlgorithms.
890  """
891  from ROOT.Belle2 import CalibrationAlgorithm
892  if value:
893  self._algorithms = []
894  for alg in value:
895  if isinstance(alg, CalibrationAlgorithm):
896  self._algorithms.append(Algorithm(alg))
897  else:
898  B2ERROR((f"Something other than CalibrationAlgorithm instance passed in {type(value)}."
899  "Algorithm needs to inherit from Belle2::CalibrationAlgorithm"))
900 
901  @property
902  def pre_algorithms(self):
903  """
904  Callback run prior to each algorithm iteration.
905  """
906  return [alg.pre_algorithm for alg in self.algorithms]
907 
908  @pre_algorithms.setter
909  @method_dispatch
910  def pre_algorithms(self, func):
911  """
912  """
913  if func:
914  for alg in self.algorithms:
915  alg.pre_algorithm = func
916  else:
917  B2ERROR("Something evaluated as False passed in as pre_algorithm function.")
918 
919  @pre_algorithms.fset.register(tuple)
920  @pre_algorithms.fset.register(list)
921  def _(self, values):
922  """
923  Alternate pre_algorithms setter for lists and tuples of functions, should be one per algorithm.
924  """
925  if values:
926  if len(values) == len(self.algorithms):
927  for func, alg in zip(values, self.algorithms):
928  alg.pre_algorithm = func
929  else:
930  B2ERROR("Number of functions and number of algorithms doesn't match.")
931  else:
932  B2ERROR("Empty container passed in for pre_algorithm functions")
933 
934  @property
935  def strategies(self):
936  """
937  The `caf.strategies.AlgorithmStrategy` or `list` of them used when running the algorithm(s).
938  """
939  return [alg.strategy for alg in self.algorithms]
940 
941  @strategies.setter
942  @method_dispatch
943  def strategies(self, strategy):
944  """
945  """
946  if strategy:
947  for alg in self.algorithms:
948  alg.strategy = strategy
949  else:
950  B2ERROR("Something evaluated as False passed in as a strategy.")
951 
952  @strategies.fset.register(tuple)
953  @strategies.fset.register(list)
954  def _(self, values):
955  """
956  Alternate strategies setter for lists and tuples of functions, should be one per algorithm.
957  """
958  if values:
959  if len(values) == len(self.algorithms):
960  for strategy, alg in zip(strategies, self.algorithms):
961  alg.strategy = strategy
962  else:
963  B2ERROR("Number of strategies and number of algorithms doesn't match.")
964  else:
965  B2ERROR("Empty container passed in for strategies list")
966 
967  def __repr__(self):
968  """
969  """
970  return self.name
971 
972  def run(self):
973  """
974  Main logic of the Calibration object.
975  Will be run in a new Thread by calling the start() method.
976  """
977  with CAFDB(self._db_path, read_only=True) as db:
978  initial_state = db.get_calibration_value(self.name, "checkpoint")
979  initial_iteration = db.get_calibration_value(self.name, "iteration")
980  B2INFO("Initial status of {} found to be state={}, iteration={}".format(self.name,
981  initial_state,
982  initial_iteration))
983  self.machine = CalibrationMachine(self,
984  iov_to_calibrate=self.iov,
985  initial_state=initial_state,
986  iteration=initial_iteration)
987  self.state = initial_state
988  self.machine.root_dir = Path(os.getcwd(), self.name)
989  self.machine.collector_backend = self.backend
990 
991  # Before we start running, let's clean up any iteration directories from iterations above our initial one.
992  # Should prevent confusion between attempts if we fail again.
993  all_iteration_paths = find_int_dirs(self.machine.root_dir)
994  for iteration_path in all_iteration_paths:
995  if int(iteration_path.name) > initial_iteration:
996  shutil.rmtree(iteration_path)
997 
998  while self.state != self.end_state and self.state != self.fail_state:
999  if self.state == "init":
1000  try:
1001  B2INFO(f"Attempting collector submission for calibration {self.name}.")
1002  self.machine.submit_collector()
1003  except Exception as err:
1004  B2FATAL(str(err))
1005 
1006  self._poll_collector()
1007 
1008  # If we failed take us to the final fail state
1009  if self.state == "collector_failed":
1010  self.machine.fail_fully()
1011  return
1012 
1013  # It's possible that we might raise an error while attempting to run due
1014  # to some problems e.g. Missing collector output files
1015  # We catch the error and exit with failed state so the CAF will stop
1016  try:
1017  B2INFO(f"Attempting to run algorithms for calibration {self.name}.")
1018  self.machine.run_algorithms()
1019  except MachineError as err:
1020  B2ERROR(str(err))
1021  self.machine.fail()
1022 
1023  # If we failed take us to the final fail state
1024  if self.machine.state == "algorithms_failed":
1025  self.machine.fail_fully()
1026  return
1027 
1028  def _poll_collector(self):
1029  """
1030  """
1031  while self.state == "running_collector":
1032  try:
1033  self.machine.complete()
1034  # ConditionError is thrown when the condtions for the transition have returned false, it's not serious.
1035  except ConditionError:
1036  try:
1037  B2DEBUG(29, f"Checking if collector jobs for calibration {self.name} have failed.")
1038  self.machine.fail()
1039  except ConditionError:
1040  pass
1041  sleep(self.heartbeat) # Sleep until we want to check again
1042 
1043  @property
1044  def state(self):
1045  """
1046  The current major state of the calibration in the database file. The machine may have a different state.
1047  """
1048  with CAFDB(self._db_path, read_only=True) as db:
1049  state = db.get_calibration_value(self.name, "state")
1050  return state
1051 
1052  @state.setter
1053  def state(self, state):
1054  """
1055  """
1056  B2DEBUG(29, f"Setting {self.name} to state {state}.")
1057  with CAFDB(self._db_path) as db:
1058  db.update_calibration_value(self.name, "state", str(state))
1059  if state in self.checkpoint_states:
1060  db.update_calibration_value(self.name, "checkpoint", str(state))
1061  B2DEBUG(29, f"{self.name} set to {state}.")
1062 
1063  @property
1064  def iteration(self):
1065  """
1066  Retrieves the current iteration number in the database file.
1067 
1068  Returns:
1069  int: The current iteration number
1070  """
1071  with CAFDB(self._db_path, read_only=True) as db:
1072  iteration = db.get_calibration_value(self.name, "iteration")
1073  return iteration
1074 
1075  @iteration.setter
1076  def iteration(self, iteration):
1077  """
1078  """
1079  B2DEBUG(29, f"Setting {self.name} to {iteration}.")
1080  with CAFDB(self._db_path) as db:
1081  db.update_calibration_value(self.name, "iteration", iteration)
1082  B2DEBUG(29, f"{self.name} set to {self.iteration}.")
1083 
1084 
1085 class Algorithm():
1086  """
1087  Parameters:
1088  algorithm: The CalibrationAlgorithm instance that we want to execute.
1089  Keyword Arguments:
1090  data_input (types.FunctionType): An optional function that sets the input files of the algorithm.
1091  pre_algorithm (types.FunctionType): An optional function that runs just prior to execution of the algorithm.
1092  Useful for set up e.g. module initialisation
1093 
1094  This is a simple wrapper class around the C++ CalibrationAlgorithm class.
1095  It helps to add functionality to algorithms for use by the Calibration and CAF classes rather
1096  than separating the logic into those classes directly.
1097 
1098  This is **not** currently a class that a user should interact with much during `CAF`
1099  setup (unless you're doing something advanced).
1100  The `Calibration` class should be doing the most of the creation of the defaults for these objects.
1101 
1102  Setting the `data_input` function might be necessary if you have set the `Calibration.output_patterns`.
1103  Also, setting the `pre_algorithm` to a function that should execute prior to each `strategies.AlgorithmStrategy`
1104  is often useful i.e. by calling for the Geometry module to initialise.
1105  """
1106 
1107  def __init__(self, algorithm, data_input=None, pre_algorithm=None):
1108  """
1109  """
1110 
1111  self.algorithm = algorithm
1112 
1113  self.name = algorithm.__cppname__[algorithm.__cppname__.rfind('::') + 2:]
1114 
1117  self.data_input = data_input
1118  if not self.data_input:
1120 
1123  self.pre_algorithm = pre_algorithm
1124 
1127 
1132  self.params = {}
1133 
1134  def default_inputdata_setup(self, input_file_paths):
1135  """
1136  Simple setup to set the input file names to the algorithm. Applied to the data_input attribute
1137  by default. This simply takes all files returned from the `Calibration.output_patterns` and filters
1138  for only the CollectorOutput.root files. Then it sets them as input files to the CalibrationAlgorithm class.
1139  """
1140  collector_output_files = list(filter(lambda file_path: "CollectorOutput.root" == Path(file_path).name,
1141  input_file_paths))
1142  info_lines = [f"Input files used in {self.name}:"]
1143  info_lines.extend(collector_output_files)
1144  B2INFO_MULTILINE(info_lines)
1145  self.algorithm.setInputFileNames(collector_output_files)
1146 
1147 
1148 class CAF():
1149  """
1150  Parameters:
1151  calibration_defaults (dict): A dictionary of default options for calibrations run by this `CAF` instance e.g.
1152 
1153  >>> calibration_defaults={"max_iterations":2}
1154 
1155  This class holds `Calibration` objects and processes them. It defines the initial configuration/setup
1156  for the calibrations. But most of the real processing is done through the `caf.state_machines.CalibrationMachine`.
1157 
1158  The `CAF` class essentially does some initial setup, holds the `CalibrationBase` instances and calls the
1159  `CalibrationBase.start` when the dependencies are met.
1160 
1161  Much of the checking for consistency is done in this class so that no processing is done with an invalid
1162  setup. Choosing which files to use as input should be done from outside during the setup of the `CAF` and
1163  `CalibrationBase` instances.
1164  """
1165 
1166 
1167  _db_name = "caf_state.db"
1168 
1169  default_calibration_config = {
1170  "max_iterations": 5,
1171  "ignored_runs": []
1172  }
1173 
1174  def __init__(self, calibration_defaults=None):
1175  """
1176  """
1177 
1178  self.calibrations = {}
1179 
1182 
1184  self.dependencies = {}
1185 
1186  self.output_dir = "calibration_results"
1187 
1188  self.order = None
1189 
1190  self._backend = None
1191 
1192  self.heartbeat = 5
1193 
1194  if not calibration_defaults:
1195  calibration_defaults = {}
1196 
1198  self.calibration_defaults = {**self.default_calibration_config, **calibration_defaults}
1199 
1200  self._db_path = None
1201 
1202  def add_calibration(self, calibration):
1203  """
1204  Adds a `Calibration` that is to be used in this program to the list.
1205  Also adds an empty dependency list to the overall dictionary.
1206  You should not directly alter a `Calibration` object after it has been
1207  added here.
1208  """
1209  if calibration.is_valid():
1210  if calibration.name not in self.calibrations:
1211  self.calibrations[calibration.name] = calibration
1212  else:
1213  B2WARNING(f"Tried to add a calibration with the name {calibration.name} twice.")
1214  else:
1215  B2WARNING((f"Tried to add incomplete/invalid calibration ({calibration.name}) to the framwork."
1216  "It was not added and will not be part of the final process."))
1217 
1219  """
1220  This checks the future and past dependencies of each `Calibration` in the `CAF`.
1221  If any dependencies are not known to the `CAF` then they are removed from the `Calibration`
1222  object directly.
1223  """
1224  calibration_names = [calibration.name for calibration in self.calibrations.values()]
1225 
1226  def is_dependency_in_caf(dependency):
1227  """
1228  Quick function to use with filter() and check dependencies against calibrations known to `CAF`
1229  """
1230  dependency_in_caf = dependency.name in calibration_names
1231  if not dependency_in_caf:
1232  B2WARNING(f"The calibration {dependency.name} is a required dependency but is not in the CAF."
1233  " It has been removed as a dependency.")
1234  return dependency_in_caf
1235 
1236  # Check that there aren't dependencies on calibrations not added to the framework
1237  # Remove them from the calibration objects if there are.
1238  for calibration in self.calibrations.values():
1239  filtered_future_dependencies = list(filter(is_dependency_in_caf, calibration.future_dependencies))
1240  calibration.future_dependencies = filtered_future_dependencies
1241 
1242  filtered_dependencies = list(filter(is_dependency_in_caf, calibration.dependencies))
1243  calibration.dependencies = filtered_dependencies
1244 
1246  """
1247  - Uses dependency atrributes of calibrations to create a dependency dictionary and passes it
1248  to a sorting algorithm.
1249  - Returns valid OrderedDict if sort was succesful, empty one if it failed (most likely a cyclic dependency)
1250  """
1251  # First remove any dependencies on calibrations not added to the CAF
1253  # Filling dependencies dictionaries of CAF for sorting, only explicit dependencies for now
1254  # Note that they currently use the names not the calibration objects.
1255  for calibration in self.calibrations.values():
1256  future_dependencies_names = [dependency.name for dependency in calibration.future_dependencies]
1257  past_dependencies_names = [dependency.name for dependency in calibration.dependencies]
1258 
1259  self.future_dependencies[calibration.name] = future_dependencies_names
1260  self.dependencies[calibration.name] = past_dependencies_names
1261  # Gives us a list of A (not THE) valid ordering and checks for cyclic dependencies
1262  order = topological_sort(self.future_dependencies)
1263  if not order:
1264  return False
1265 
1266  # Get an ordered dictionary of the sort order but including all implicit dependencies.
1267  ordered_full_dependencies = all_dependencies(self.future_dependencies, order)
1268 
1269  # Return all the implicit+explicit past dependencies
1270  full_past_dependencies = past_from_future_dependencies(ordered_full_dependencies)
1271  # Correct each calibration's dependency list to reflect the implicit dependencies
1272  for calibration in self.calibrations.values():
1273  full_deps = full_past_dependencies[calibration.name]
1274  explicit_deps = [cal.name for cal in calibration.dependencies]
1275  for dep in full_deps:
1276  if dep not in explicit_deps:
1277  calibration.dependencies.append(self.calibrations[dep])
1278  # At this point the calibrations have their full dependencies but they aren't in topological
1279  # sort order. Correct that here
1280  ordered_dependency_list = []
1281  for ordered_calibration_name in order:
1282  if ordered_calibration_name in [dep.name for dep in calibration.dependencies]:
1283  ordered_dependency_list.append(self.calibrations[ordered_calibration_name])
1284  calibration.dependencies = ordered_dependency_list
1285  order = ordered_full_dependencies
1286  # We should also patch in all of the implicit dependencies for the calibrations
1287  return order
1288 
1289  def _check_backend(self):
1290  """
1291  Makes sure that the CAF has a valid backend setup. If one isn't set by the user (or if the
1292  one that is stored isn't a valid Backend object) we should create a default Local backend.
1293  """
1294  if not isinstance(self._backend, caf.backends.Backend):
1295 
1296  self.backend = caf.backends.Local()
1297 
1299  """
1300  Checks all current calibrations and removes any invalid Collections from their collections list.
1301  """
1302  B2INFO("Checking for any invalid Collections in Calibrations.")
1303  for calibration in self.calibrations.values():
1304  valid_collections = {}
1305  for name, collection in calibration.collections.items():
1306  if collection.is_valid():
1307  valid_collections[name] = collection
1308  else:
1309  B2WARNING(f"Removing invalid Collection '{name}' from Calibration '{calibration.name}'.")
1310  calibration.collections = valid_collections
1311 
1312  def run(self, iov=None):
1313  """
1314  Keyword Arguments:
1315  iov(`caf.utils.IoV`): IoV to calibrate for this processing run. Only the input files necessary to calibrate
1316  this IoV will be used in the collection step.
1317 
1318  This function runs the overall calibration job, saves the outputs to the output_dir directory,
1319  and creates database payloads.
1320 
1321  Upload of final databases is not done here. This simply creates the local databases in
1322  the output directory. You should check the validity of your new local database before uploading
1323  to the conditions DB via the basf2 tools/interface to the DB.
1324  """
1325  if not self.calibrations:
1326  B2FATAL("There were no Calibration objects to run. Maybe you tried to add invalid ones?")
1327  # Checks whether the dependencies we've added will give a valid order
1328  order = self._order_calibrations()
1329  if not order:
1330  B2FATAL("Couldn't order the calibrations properly. Could be a cyclic dependency.")
1331 
1332  # Check that a backend has been set and use default Local() one if not
1333  self._check_backend()
1334 
1336 
1337  # Creates the overall output directory and reset the attribute to use an absolute path to it.
1338  self.output_dir = self._make_output_dir()
1339 
1340  # Creates a SQLite DB to save the status of the various calibrations
1341  self._make_database()
1342 
1343  # Enter the overall output dir during processing and opena connection to the DB
1344  with temporary_workdir(self.output_dir):
1345  db = CAFDB(self._db_path)
1346  db.open()
1347  db_initial_calibrations = db.query("select * from calibrations").fetchall()
1348  for calibration in self.calibrations.values():
1349  # Apply defaults given to the `CAF` to the calibrations if they aren't set
1350  calibration._apply_calibration_defaults(self.calibration_defaults)
1351  calibration._db_path = self._db_path
1352  calibration.output_database_dir = Path(self.output_dir, calibration.name, "outputdb").as_posix()
1353  calibration.iov = iov
1354  if not calibration.backend:
1355  calibration.backend = self.backend
1356  # Do some checking of the db to see if we need to add an entry for this calibration
1357  if calibration.name not in [db_cal[0] for db_cal in db_initial_calibrations]:
1358  db.insert_calibration(calibration.name)
1359  db.commit()
1360  else:
1361  for cal_info in db_initial_calibrations:
1362  if cal_info[0] == calibration.name:
1363  cal_initial_state = cal_info[2]
1364  cal_initial_iteration = cal_info[3]
1365  B2INFO(f"Previous entry in database found for {calibration.name}.")
1366  B2INFO(f"Setting {calibration.name} state to checkpoint state '{cal_initial_state}'.")
1367  calibration.state = cal_initial_state
1368  B2INFO(f"Setting {calibration.name} iteration to '{cal_initial_iteration}'.")
1369  calibration.iteration = cal_initial_iteration
1370  # Daemonize so that it exits if the main program exits
1371  calibration.daemon = True
1372 
1373  db.close()
1374 
1375  # Is it possible to keep going?
1376  keep_running = True
1377  while keep_running:
1378  keep_running = False
1379  # Do we have calibrations that may yet complete?
1380  remaining_calibrations = []
1381 
1382  for calibration in self.calibrations.values():
1383  # Find the currently ended calibrations (may not be joined yet)
1384  if (calibration.state == CalibrationBase.end_state or calibration.state == CalibrationBase.fail_state):
1385  # Search for any alive Calibrations and join them
1386  if calibration.is_alive():
1387  B2DEBUG(29, f"Joining {calibration.name}.")
1388  calibration.join()
1389  else:
1390  if calibration.dependencies_met():
1391  if not calibration.is_alive():
1392  B2DEBUG(29, f"Starting {calibration.name}.")
1393  try:
1394  calibration.start()
1395  except RuntimeError:
1396  # Catch the case when the calibration just finished so it ended up here
1397  # in the "else" and not above where it should have been joined.
1398  B2DEBUG(29, f"{calibration.name} probably just finished, join it later.")
1399  remaining_calibrations.append(calibration)
1400  else:
1401  if not calibration.failed_dependencies():
1402  remaining_calibrations.append(calibration)
1403  if remaining_calibrations:
1404  keep_running = True
1405  # Loop over jobs that the calibrations want submitted and submit them.
1406  # We do this here because some backends don't like us submitting in parallel from multiple CalibrationThreads
1407  # So this is like a mini job queue without getting too clever with it
1408  for calibration in remaining_calibrations:
1409  for job in calibration.jobs_to_submit[:]:
1410  calibration.backend.submit(job)
1411  calibration.jobs_to_submit.remove(job)
1412  sleep(self.heartbeat)
1413 
1414  B2INFO("Printing summary of final CAF status.")
1415  with CAFDB(self._db_path, read_only=True) as db:
1416  print(db.output_calibration_table())
1417 
1418  @property
1419  def backend(self):
1420  """
1421  The `backend <backends.Backend>` that runs the collector job.
1422  When set, this is checked that a `backends.Backend` class instance was passed in.
1423  """
1424  return self._backend
1425 
1426  @backend.setter
1427  def backend(self, backend):
1428  """
1429  """
1430  if isinstance(backend, caf.backends.Backend):
1431  self._backend = backend
1432  else:
1433  B2ERROR('Backend property must inherit from Backend class.')
1434 
1435  def _make_output_dir(self):
1436  """
1437  Creates the output directory. If it already exists we are now going to try and restart the program from the last state.
1438 
1439  Returns:
1440  str: The absolute path of the new output_dir
1441  """
1442  p = Path(self.output_dir).resolve()
1443  if p.is_dir():
1444  B2INFO(f"{p.as_posix()} output directory already exists. "
1445  "We will try to restart from the previous finishing state.")
1446  return p.as_posix()
1447  else:
1448  p.mkdir(parents=True)
1449  if p.is_dir():
1450  return p.as_posix()
1451  else:
1452  raise FileNotFoundError(f"Attempted to create output_dir {p.as_posix()}, but it didn't work.")
1453 
1454  def _make_database(self):
1455  """
1456  Creates the CAF status database. If it already exists we don't overwrite it.
1457  """
1458  self._db_path = Path(self.output_dir, self._db_name).absolute()
1459  if self._db_path.exists():
1460  B2INFO(f"Previous CAF database found {self._db_path}")
1461  # Will create a new database + tables, or do nothing but checks we can connect to existing one
1462  with CAFDB(self._db_path) as db:
1463  pass
framework.CAF.heartbeat
heartbeat
The heartbeat (seconds) between polling for Calibrations that are finished.
Definition: framework.py:1192
framework.Calibration.iteration
def iteration(self)
Definition: framework.py:1064
framework.Calibration._
def _(self, value)
Definition: framework.py:887
strategies.SingleIOV
Definition: strategies.py:172
framework.Calibration.collector
def collector(self)
Definition: framework.py:763
framework.Calibration.pre_collector_path
def pre_collector_path(self)
Definition: framework.py:808
framework.Calibration._get_default_collection_attribute
def _get_default_collection_attribute(self, attr)
Definition: framework.py:743
framework.Collection.files_to_iovs
files_to_iovs
File -> Iov dictionary, should be : {absolute_file_path:iov} : Where iov is a :py:class:IoV <caf....
Definition: framework.py:81
framework.Calibration.database_chain
database_chain
The database chain that is applied to the algorithms.
Definition: framework.py:553
framework.Algorithm
Definition: framework.py:1085
framework.Collection.job_cmd
job_cmd
The Collector caf.backends.Job.cmd attribute.
Definition: framework.py:133
framework.Calibration.is_valid
def is_valid(self)
Definition: framework.py:608
framework.CalibrationBase.end_state
string end_state
The name of the successful completion state.
Definition: framework.py:308
framework.Calibration.run
def run(self)
Definition: framework.py:972
framework.Calibration._db_path
_db_path
Location of a SQLite database that will save the state of the calibration so that it can be restarted...
Definition: framework.py:576
framework.Calibration.algorithms_runner
algorithms_runner
The class that runs all the algorithms in this Calibration using their assigned :py:class:caf....
Definition: framework.py:562
framework.CalibrationBase.failed_dependencies
def failed_dependencies(self)
Definition: framework.py:394
framework.CAF._remove_missing_dependencies
def _remove_missing_dependencies(self)
Definition: framework.py:1218
framework.CAF._backend
_backend
Private backend attribute.
Definition: framework.py:1190
framework.Calibration.backend_args
def backend_args(self)
Definition: framework.py:856
framework.CAF.__init__
def __init__(self, calibration_defaults=None)
Definition: framework.py:1174
framework.CalibrationBase.dependencies_met
def dependencies_met(self)
Definition: framework.py:388
framework.Calibration.algorithms
algorithms
Algorithm classes that wil be run by this Calibration.
Definition: framework.py:534
framework.CAF.add_calibration
def add_calibration(self, calibration)
Definition: framework.py:1202
framework.Calibration.state
state
Definition: framework.py:987
framework.Calibration.heartbeat
heartbeat
This calibration's sleep time before rechecking to see if it can move state.
Definition: framework.py:572
framework.CalibrationBase.fail_state
string fail_state
The name of the failure state.
Definition: framework.py:311
framework.Algorithm.params
params
Parameters that could be used in the execution of the algorithm strategy/runner to modify behaviour.
Definition: framework.py:1132
Belle2::filter
std::map< ExpRun, std::pair< double, double > > filter(const std::map< ExpRun, std::pair< double, double >> &runs, double cut, std::map< ExpRun, std::pair< double, double >> &runsRemoved)
filter events to remove runs shorter than cut, it stores removed runs in runsRemoved
Definition: Splitter.cc:43
framework.Algorithm.data_input
data_input
Function called before the pre_algorithm method to setup the input data that the CalibrationAlgorithm...
Definition: framework.py:1117
framework.Calibration
Definition: framework.py:417
framework.Calibration.ignored_runs
ignored_runs
List of ExpRun that will be ignored by this Calibration.
Definition: framework.py:544
framework.Collection.default_max_collector_jobs
int default_max_collector_jobs
The default maximum number of collector jobs to create.
Definition: framework.py:63
framework.Collection.collector
collector
Collector module of this collection.
Definition: framework.py:70
framework.CalibrationBase.output_database_dir
output_database_dir
The directory where we'll store the local database payloads from this calibration.
Definition: framework.py:339
framework.Collection.use_local_database
def use_local_database(self, filename, directory="")
Definition: framework.py:175
framework.Algorithm.strategy
strategy
The algorithm stratgey that will be used when running over the collected data.
Definition: framework.py:1126
framework.Collection.backend_args
backend_args
Dictionary passed to the collector Job object to configure how the caf.backends.Backend instance shou...
Definition: framework.py:111
framework.Calibration.collector_full_update_interval
collector_full_update_interval
While checking if the collector is finished we don't bother wastefully checking every subjob's status...
Definition: framework.py:570
framework.Calibration.results
results
Output results of algorithms for each iteration.
Definition: framework.py:536
framework.Collection.job_script
job_script
Definition: framework.py:128
framework.Collection.max_files_per_collector_job
max_files_per_collector_job
Definition: framework.py:103
framework.Calibration._set_default_collection_attribute
def _set_default_collection_attribute(self, attr, value)
Definition: framework.py:753
framework.Calibration.reset_database
def reset_database(self, apply_to_default_collection=True)
Definition: framework.py:644
framework.Collection.database_chain
database_chain
The database chain used for this Collection.
Definition: framework.py:119
framework.Calibration.checkpoint_states
list checkpoint_states
Checkpoint states which we are allowed to restart from.
Definition: framework.py:504
runners.SeqAlgorithmsRunner
Definition: runners.py:85
framework.CAF
Definition: framework.py:1148
framework.CalibrationBase.jobs_to_submit
jobs_to_submit
A simple list of jobs that this Calibration wants submitted at some point.
Definition: framework.py:344
framework.CalibrationBase.iov
iov
IoV which will be calibrated.
Definition: framework.py:337
framework.Calibration.backend
backend
The backend <backends.Backend> we'll use for our collector submission in this calibration.
Definition: framework.py:565
framework.Collection
Definition: framework.py:45
framework.Collection.input_files
input_files
Internal input_files stored for this calibration.
Definition: framework.py:72
framework.Calibration.use_central_database
def use_central_database(self, global_tag, apply_to_default_collection=True)
Definition: framework.py:658
framework.Calibration._algorithms
_algorithms
Internal calibration algorithms stored for this calibration.
Definition: framework.py:514
framework.Calibration.max_files_per_collector_job
def max_files_per_collector_job(self)
Definition: framework.py:832
framework.CAF.run
def run(self, iov=None)
Definition: framework.py:1312
framework.CAF.calibration_defaults
calibration_defaults
Default options applied to each calibration known to the CAF, if the Calibration has these defined by...
Definition: framework.py:1198
framework.CalibrationBase.files_to_iovs
files_to_iovs
File -> Iov dictionary, should be : {absolute_file_path:iov} : Where iov is a :py:class:IoV <caf....
Definition: framework.py:329
framework.Collection.splitter
splitter
The SubjobSplitter to use when constructing collector subjobs from the overall Job object.
Definition: framework.py:99
framework.CAF._make_output_dir
def _make_output_dir(self)
Definition: framework.py:1435
framework.Collection.use_central_database
def use_central_database(self, global_tag)
Definition: framework.py:151
framework.Calibration.__init__
def __init__(self, name, collector=None, algorithms=None, input_files=None, pre_collector_path=None, database_chain=None, output_patterns=None, max_files_per_collector_job=None, max_collector_jobs=None, backend_args=None)
Definition: framework.py:508
framework.CalibrationBase.depends_on
def depends_on(self, calibration)
Definition: framework.py:360
framework.CAF._make_database
def _make_database(self)
Definition: framework.py:1454
framework.Calibration.output_patterns
def output_patterns(self)
Definition: framework.py:820
framework.Calibration.add_collection
def add_collection(self, name, collection)
Definition: framework.py:589
framework.CAF.backend
backend
backend property
Definition: framework.py:1296
framework.CAF._check_backend
def _check_backend(self)
Definition: framework.py:1289
framework.Calibration.strategies
strategies
The strategy that the algorithm(s) will be run against.
Definition: framework.py:549
submit_collector
Definition: submit_collector.py:1
framework.Calibration.pre_algorithms
def pre_algorithms(self)
Definition: framework.py:902
framework.CalibrationBase.save_payloads
save_payloads
Marks this Calibration as one which has payloads that should be copied and uploaded.
Definition: framework.py:342
framework.Algorithm.__init__
def __init__(self, algorithm, data_input=None, pre_algorithm=None)
Definition: framework.py:1107
framework.CalibrationBase
Definition: framework.py:290
framework.CalibrationBase.input_files
input_files
Files used for collection procedure.
Definition: framework.py:332
framework.Calibration.__repr__
def __repr__(self)
Definition: framework.py:967
framework.CAF.output_dir
output_dir
Output path to store results of calibration and bookkeeping information.
Definition: framework.py:1186
framework.Calibration.use_local_database
def use_local_database(self, filename, directory="", apply_to_default_collection=True)
Definition: framework.py:696
framework.CalibrationBase.run
def run(self)
Definition: framework.py:347
framework.Calibration.machine
machine
The caf.state_machines.CalibrationMachine that we will run to process this calibration start to finis...
Definition: framework.py:574
framework.Algorithm.default_inputdata_setup
def default_inputdata_setup(self, input_file_paths)
Definition: framework.py:1134
framework.Collection.output_patterns
output_patterns
Output patterns of files produced by collector which will be used to pass to the Algorithm....
Definition: framework.py:93
framework.CAF.default_calibration_config
dictionary default_calibration_config
The defaults for Calibrations.
Definition: framework.py:1169
framework.Collection.max_collector_jobs
max_collector_jobs
Definition: framework.py:105
framework.Algorithm.algorithm
algorithm
CalibrationAlgorithm instance (assumed to be true since the Calibration class checks)
Definition: framework.py:1111
framework.Algorithm.name
name
The name of the algorithm, default is the Algorithm class name.
Definition: framework.py:1113
framework.CAF._db_path
_db_path
The path of the SQLite DB.
Definition: framework.py:1200
framework.Collection._collector
_collector
Internal storage of collector attribute.
Definition: framework.py:253
framework.CAF.calibrations
calibrations
Dictionary of calibrations for this CAF instance.
Definition: framework.py:1178
framework.CAF.future_dependencies
future_dependencies
Dictionary of future dependencies of Calibration objects, where the value is all calibrations that wi...
Definition: framework.py:1181
framework.CAF._prune_invalid_collections
def _prune_invalid_collections(self)
Definition: framework.py:1298
framework.Algorithm.pre_algorithm
pre_algorithm
Function called after data_input but before algorithm.execute to do any remaining setup.
Definition: framework.py:1123
framework.Calibration._poll_collector
def _poll_collector(self)
Definition: framework.py:1028
framework.Calibration.max_collector_jobs
def max_collector_jobs(self)
Definition: framework.py:844
framework.Calibration.max_iterations
max_iterations
Variable to define the maximum number of iterations for this calibration specifically.
Definition: framework.py:539
framework.Collection._input_files
_input_files
Definition: framework.py:223
framework.CAF._order_calibrations
def _order_calibrations(self)
Definition: framework.py:1245
framework.CalibrationBase.is_valid
def is_valid(self)
Definition: framework.py:354
framework.Collection.pre_collector_path
pre_collector_path
Since many collectors require some different setup, if you set this attribute to a basf2....
Definition: framework.py:86
framework.Collection.uri_list_from_input_file
def uri_list_from_input_file(input_file)
Definition: framework.py:196
framework.CalibrationBase._apply_calibration_defaults
def _apply_calibration_defaults(self, defaults)
Definition: framework.py:404
framework.Calibration.collections
collections
Collections stored for this calibration.
Definition: framework.py:512
framework.CalibrationBase.name
name
Name of calibration object.
Definition: framework.py:318
framework.Calibration.default_collection_name
string default_collection_name
Default collection name.
Definition: framework.py:506
framework.CalibrationBase.future_dependencies
future_dependencies
List of calibration objects that depend on this one.
Definition: framework.py:320
framework.CalibrationBase.dependencies
dependencies
List of calibration objects, where each one is a dependency of this one.
Definition: framework.py:322
framework.CalibrationBase.__init__
def __init__(self, name, input_files=None)
Definition: framework.py:313
framework.Collection.reset_database
def reset_database(self)
Definition: framework.py:144
framework.CAF.order
order
The ordering and explicit future dependencies of calibrations.
Definition: framework.py:1188
framework.CAF._db_name
string _db_name
The name of the SQLite DB that gets created.
Definition: framework.py:1167
framework.CAF.dependencies
dependencies
Dictionary of dependencies of Calibration objects, where value is the list of Calibration objects tha...
Definition: framework.py:1184