Belle II Software  release-06-00-14
framework.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 
4 
11 
12 """
13 This module implements several objects/functions to configure and run calibrations.
14 These classes are used to construct the workflow of the calibration job.
15 The actual processing code is mostly in the `caf.state_machines` module.
16 """
17 
18 __all__ = ["CalibrationBase", "Calibration", "Algorithm", "CAF"]
19 
20 import os
21 from threading import Thread
22 from time import sleep
23 from pathlib import Path
24 import shutil
25 from glob import glob
26 
27 from basf2 import B2ERROR, B2WARNING, B2INFO, B2FATAL, B2DEBUG
28 from basf2 import find_file
29 from basf2 import conditions as b2conditions
30 
31 from abc import ABC, abstractmethod
32 
33 import caf
34 from caf.utils import B2INFO_MULTILINE
35 from caf.utils import past_from_future_dependencies
36 from caf.utils import topological_sort
37 from caf.utils import all_dependencies
38 from caf.utils import method_dispatch
39 from caf.utils import temporary_workdir
40 from caf.utils import find_int_dirs
41 from caf.utils import LocalDatabase
42 from caf.utils import CentralDatabase
43 from caf.utils import parse_file_uri
44 
45 import caf.strategies as strategies
46 import caf.runners as runners
47 from caf.backends import MaxSubjobsSplitter, MaxFilesSplitter
48 from caf.state_machines import CalibrationMachine, ConditionError, MachineError
49 from caf.database import CAFDB
50 
51 
52 class Collection():
53  """
54  Keyword Arguments:
55  collector (str, basf2.Module): The collector module or module name for this `Collection`.
56  input_files (list[str]): The input files to be used for only this `Collection`.
57  pre_collection_path (basf2.Path): The reconstruction `basf2.Path` to be run prior to the Collector module.
58  database_chain (list[CentralDatabase, LocalDatabase]): The database chain to be used initially for this `Collection`.
59  output_patterns (list[str]): Output patterns of files produced by collector which will be used to pass to the
60  `Algorithm.data_input` function. Setting this here, replaces the default completely.
61  max_files_for_collector_job (int): Maximum number of input files sent to each collector subjob for this `Collection`.
62  Technically this sets the SubjobSplitter to be used, not compatible with max_collector_jobs.
63  max_collector_jobs (int): Maximum number of collector subjobs for this `Collection`.
64  Input files are split evenly between them. Technically this sets the SubjobSplitter to be used. Not compatible with
65  max_files_for_collector_job.
66  backend_args (dict): The args for the backend submission of this `Collection`.
67  """
68 
70  default_max_collector_jobs = 1000
71 
73  job_config = "collector_job.json"
74 
75  def __init__(self,
76  collector=None,
77  input_files=None,
78  pre_collector_path=None,
79  database_chain=None,
80  output_patterns=None,
81  max_files_per_collector_job=None,
82  max_collector_jobs=None,
83  backend_args=None
84  ):
85 
86  self.collectorcollectorcollectorcollector = collector
87 
88  self.input_filesinput_filesinput_filesinput_files = []
89  if input_files:
90  self.input_filesinput_filesinput_filesinput_files = input_files
91 
97  self.files_to_iovsfiles_to_iovs = {}
98 
102  self.pre_collector_pathpre_collector_path = None
103  if pre_collector_path:
104  self.pre_collector_pathpre_collector_path = pre_collector_path
105 
109  self.output_patternsoutput_patterns = ["CollectorOutput.root"]
110  if output_patterns:
111  self.output_patternsoutput_patterns = output_patterns
112 
113 
115  self.splittersplitter = None
116  if max_files_per_collector_job and max_collector_jobs:
117  B2FATAL("Cannot set both 'max_files_per_collector_job' and 'max_collector_jobs' of a collection!")
118  elif max_files_per_collector_job:
119  self.max_files_per_collector_jobmax_files_per_collector_jobmax_files_per_collector_jobmax_files_per_collector_job = max_files_per_collector_job
120  elif max_collector_jobs:
121  self.max_collector_jobsmax_collector_jobsmax_collector_jobsmax_collector_jobs = max_collector_jobs
122  else:
123  self.max_collector_jobsmax_collector_jobsmax_collector_jobsmax_collector_jobs = self.default_max_collector_jobsdefault_max_collector_jobs
124 
125 
127  self.backend_argsbackend_args = {}
128  if backend_args:
129  self.backend_argsbackend_args = backend_args
130 
131  if database_chain:
132 
135  self.database_chaindatabase_chain = database_chain
136  else:
137  self.database_chaindatabase_chain = []
138  # This may seem weird but the changes to the DB interface mean that they have effectively swapped from being
139  # described well by appending to a list to a deque. So we do bit of reversal to translate it back and make the
140  # most important GT the last one encountered.
141  for tag in reversed(b2conditions.default_globaltags):
142  self.use_central_databaseuse_central_database(tag)
143 
144  self.job_scriptjob_script = Path(find_file("calibration/scripts/caf/run_collector_path.py")).absolute()
145  """The basf2 steering file that will be used for Collector jobs run by this collection.
146 This script will be copied into subjob directories as part of the input sandbox."""
147 
148 
149  self.job_cmdjob_cmd = ["basf2", self.job_scriptjob_script.name, "--job-information job_info.json"]
150 
151  def reset_database(self):
152  """
153  Remove everything in the database_chain of this Calibration, including the default central database
154  tag automatically included from `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`.
155  """
156  self.database_chaindatabase_chain = []
157 
158  def use_central_database(self, global_tag):
159  """
160  Parameters:
161  global_tag (str): The central database global tag to use for this calibration.
162 
163  Using this allows you to add a central database to the head of the global tag database chain for this collection.
164  The default database chain is just the central one from
165  `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`.
166  The input file global tag will always be overrided and never used unless explicitly set.
167 
168  To turn off central database completely or use a custom tag as the base, you should call `Calibration.reset_database`
169  and start adding databases with `Calibration.use_local_database` and `Calibration.use_central_database`.
170 
171  Alternatively you could set an empty list as the input database_chain when adding the Collection to the Calibration.
172 
173  NOTE!! Since ``release-04-00-00`` the behaviour of basf2 conditions databases has changed.
174  All local database files MUST now be at the head of the 'chain', with all central database global tags in their own
175  list which will be checked after all local database files have been checked.
176 
177  So even if you ask for ``["global_tag1", "localdb/database.txt", "global_tag2"]`` to be the database chain, the real order
178  that basf2 will use them is ``["global_tag1", "global_tag2", "localdb/database.txt"]`` where the file is checked first.
179  """
180  central_db = CentralDatabase(global_tag)
181  self.database_chaindatabase_chain.append(central_db)
182 
183  def use_local_database(self, filename, directory=""):
184  """
185  Parameters:
186  filename (str): The path to the database.txt of the local database
187  directory (str): The path to the payloads directory for this local database.
188 
189  Append a local database to the chain for this collection.
190  You can call this function multiple times and each database will be added to the chain IN ORDER.
191  The databases are applied to this collection ONLY.
192 
193  NOTE!! Since release-04-00-00 the behaviour of basf2 conditions databases has changed.
194  All local database files MUST now be at the head of the 'chain', with all central database global tags in their own
195  list which will be checked after all local database files have been checked.
196 
197  So even if you ask for ["global_tag1", "localdb/database.txt", "global_tag2"] to be the database chain, the real order
198  that basf2 will use them is ["global_tag1", "global_tag2", "localdb/database.txt"] where the file is checked first.
199  """
200  local_db = LocalDatabase(filename, directory)
201  self.database_chaindatabase_chain.append(local_db)
202 
203  @staticmethod
204  def uri_list_from_input_file(input_file):
205  """
206  Parameters:
207  input_file (str): A local file/glob pattern or XROOTD URI
208 
209  Returns:
210  list: A list of the URIs found from the initial string.
211  """
212  # By default we assume it is a local file path if no "scheme" is given
213  uri = parse_file_uri(input_file)
214  if uri.scheme == "file":
215  # For local files we also perform a glob just in case it is a wildcard pattern.
216  # That way we will have all the uris of files separately
217  uris = [parse_file_uri(f).geturl() for f in glob(input_file)]
218  else:
219  # Just let everything else through and hop the backend can figure it out
220  uris = [input_file]
221  return uris
222 
223  @property
224  def input_files(self):
225  return self._input_files_input_files
226 
227  @input_files.setter
228  def input_files(self, value):
229  if isinstance(value, str):
230  # If it's a string, we convert to a list of URIs
231  self._input_files_input_files = self.uri_list_from_input_fileuri_list_from_input_file(value)
232  elif isinstance(value, list):
233  # If it's a list we loop and do the same thing
234  total_files = []
235  for pattern in value:
236  total_files.extend(self.uri_list_from_input_fileuri_list_from_input_file(pattern))
237  self._input_files_input_files = total_files
238  else:
239  raise TypeError("Input files must be a list or string")
240 
241  @property
242  def collector(self):
243  """
244  """
245  return self._collector_collector
246 
247  @collector.setter
248  def collector(self, collector):
249  """
250  """
251  # check if collector is already a module or if we need to create one
252  # from the name
253  if collector:
254  from basf2 import Module
255  if isinstance(collector, str):
256  from basf2 import register_module
257  collector = register_module(collector)
258  if not isinstance(collector, Module):
259  B2ERROR("Collector needs to be either a Module or the name of such a module")
260 
261  self._collector_collector = collector
262 
263  def is_valid(self):
264  if (not self.collectorcollectorcollectorcollector or not self.input_filesinput_filesinput_filesinput_files):
265  return False
266  else:
267  return True
268 
269  @property
270  def max_collector_jobs(self):
271  if self.splittersplitter:
272  return self.splittersplitter.max_subjobs
273  else:
274  return None
275 
276  @max_collector_jobs.setter
277  def max_collector_jobs(self, value):
278  if value is None:
279  self.splittersplitter = None
280  else:
281  self.splittersplitter = MaxSubjobsSplitter(max_subjobs=value)
282 
283  @property
284  def max_files_per_collector_job(self):
285  if self.splittersplitter:
286  return self.splittersplitter.max_files_per_subjob
287  else:
288  return None
289 
290  @max_files_per_collector_job.setter
291  def max_files_per_collector_job(self, value):
292  if value is None:
293  self.splittersplitter = None
294  else:
295  self.splittersplitter = MaxFilesSplitter(max_files_per_subjob=value)
296 
297 
298 class CalibrationBase(ABC, Thread):
299  """
300  Abstract base class of Calibration types. The CAF implements the :py:class:`Calibration` class which inherits from
301  this and runs the C++ CalibrationCollectorModule and CalibrationAlgorithm classes. But by inheriting from this
302  class and providing the minimal necessary methods/attributes you could plug in your own Calibration types
303  that doesn't depend on the C++ CAF at all and run everything in your own way.
304 
305  .. warning:: Writing your own class inheriting from :py:class:`CalibrationBase` class is not recommended!
306  But it's there if you really need it.
307 
308  Parameters:
309  name (str): Name of this calibration object. Should be unique if you are going to run it.
310 
311  Keyword Arguments:
312  input_files (list[str]): Input files for this calibration. May contain wildcard expressions useable by `glob.glob`.
313  """
314 
316  end_state = "completed"
317 
319  fail_state = "failed"
320 
321  def __init__(self, name, input_files=None):
322  """
323  """
324  super().__init__()
325 
326  self.namename = name
327 
328  self.future_dependenciesfuture_dependencies = []
329 
330  self.dependenciesdependencies = []
331 
337  self.files_to_iovsfiles_to_iovs = {}
338  if input_files:
339 
340  self.input_filesinput_files = input_files
341  else:
342  self.input_filesinput_files = []
343 
344 
345  self.ioviov = None
346 
347  self.output_database_diroutput_database_dir = ""
348 
350  self.save_payloadssave_payloads = True
351 
352  self.jobs_to_submitjobs_to_submit = []
353 
354  @abstractmethod
355  def run(self):
356  """
357  The most important method. Runs inside a new Thread and is called from `CalibrationBase.start`
358  once the dependencies of this `CalibrationBase` have returned with state == end_state i.e. "completed".
359  """
360 
361  @abstractmethod
362  def is_valid(self):
363  """
364  A simple method you should implement that will return True or False depending on whether
365  the Calibration has been set up correctly and can be run safely.
366  """
367 
368  def depends_on(self, calibration):
369  """
370  Parameters:
371  calibration (`CalibrationBase`): The Calibration object which will produce constants that this one depends on.
372 
373  Adds dependency of this calibration on another i.e. This calibration
374  will not run until the dependency has completed, and the constants produced
375  will be used via the database chain.
376 
377  You can define multiple dependencies for a single calibration simply
378  by calling this multiple times. Be careful when adding the calibration into
379  the `CAF` not to add a circular/cyclic dependency. If you do the sort will return an
380  empty order and the `CAF` processing will fail.
381 
382  This function appens to the `CalibrationBase.dependencies` and `CalibrationBase.future_dependencies` attributes of this
383  `CalibrationBase` and the input one respectively. This prevents us having to do too much recalculation later on.
384  """
385  # Check that we don't have two calibration names that are the same
386  if self.name != calibration.name:
387  # Tests if we have the calibrations added as dependencies already and adds if not
388  if calibration not in self.dependencies:
389  self.dependencies.append(calibration)
390  if self not in calibration.dependencies:
391  calibration.future_dependencies.append(self)
392  else:
393  B2WARNING((f"Tried to add {calibration} as a dependency for {self} but they have the same name."
394  "Dependency was not added."))
395 
396  def dependencies_met(self):
397  """
398  Checks if all of the Calibrations that this one depends on have reached a successful end state.
399  """
400  return all(map(lambda x: x.state == x.end_state, self.dependenciesdependencies))
401 
403  """
404  Returns the list of calibrations in our dependency list that have failed.
405  """
406  failed = []
407  for calibration in self.dependenciesdependencies:
408  if calibration.state == self.fail_statefail_state:
409  failed.append(calibration)
410  return failed
411 
412  def _apply_calibration_defaults(self, defaults):
413  """
414  We pass in default calibration options from the `CAF` instance here if called.
415  Won't overwrite any options already set.
416  """
417  for key, value in defaults.items():
418  try:
419  if getattr(self, key) is None:
420  setattr(self, key, value)
421  except AttributeError:
422  print(f"The calibration {self.name} does not support the attribute {key}.")
423 
424 
426  """
427  Every Calibration object must have at least one collector at least one algorithm.
428  You have the option to add in your collector/algorithm by argument here, or add them
429  later by changing the properties.
430 
431  If you plan to use multiple `Collection` objects I recommend that you only set the name here and add the Collections
432  separately via `add_collection()`.
433 
434  Parameters:
435  name (str): Name of this calibration. It should be unique for use in the `CAF`
436  Keyword Arguments:
437  collector (str, `basf2.Module`): Should be set to a CalibrationCollectorModule() or a string with the module name.
438  algorithms (list, ``ROOT.Belle2.CalibrationAlgorithm``): The algorithm(s) to use for this `Calibration`.
439  input_files (str, list[str]): Input files for use by this Calibration. May contain wildcards useable by `glob.glob`
440 
441  A Calibration won't be valid in the `CAF` until it has all of these four attributes set. For example:
442 
443  >>> cal = Calibration('TestCalibration1')
444  >>> col1 = register_module('CaTest')
445  >>> cal.add_collection('TestColl', col1)
446 
447  or equivalently
448 
449  >>> cal = Calibration('TestCalibration1', 'CaTest')
450 
451  If you want to run a basf2 :py:class:`path <basf2.Path>` before your collector module when running over data
452 
453  >>> cal.pre_collector_path = my_basf2_path
454 
455  You don't have to put a RootInput module in this pre-collection path, but you can if
456  you need some special parameters. If you want to process sroot files the you have to explicitly add
457  SeqRootInput to your pre-collection path.
458  The inputFileNames parameter of (Seq)RootInput will be set by the CAF automatically for you.
459 
460 
461  You can use optional arguments to pass in some/all during initialisation of the `Calibration` class
462 
463  >>> cal = Calibration( 'TestCalibration1', 'CaTest', [alg1,alg2], ['/path/to/file.root'])
464 
465  you can change the input file list later on, before running with `CAF`
466 
467  >>> cal.input_files = ['path/to/*.root', 'other/path/to/file2.root']
468 
469  If you have multiple collections from calling `add_collection()` then you should instead set the pre_collector_path,
470  input_files, database chain etc from there. See `Collection`.
471 
472  Adding the CalibrationAlgorithm(s) is easy
473 
474  >>> alg1 = TestAlgo()
475  >>> cal.algorithms = alg1
476 
477  Or equivalently
478 
479  >>> cal.algorithms = [alg1]
480 
481  Or for multiple algorithms for one collector
482 
483  >>> alg2 = TestAlgo()
484  >>> cal.algorithms = [alg1, alg2]
485 
486  Note that when you set the algorithms, they are automatically wrapped and stored as a Python class
487  `Algorithm`. To access the C++ algorithm clas underneath directly do:
488 
489  >>> cal.algorithms[i].algorithm
490 
491  If you have a setup function that you want to run before each of the algorithms, set that with
492 
493  >>> cal.pre_algorithms = my_function_object
494 
495  If you want a different setup for each algorithm use a list with the same number of elements
496  as your algorithm list.
497 
498  >>> cal.pre_algorithms = [my_function1, my_function2, ...]
499 
500  You can also specify the dependencies of the calibration on others
501 
502  >>> cal.depends_on(cal2)
503 
504  By doing this, the `CAF` will respect the ordering of the calibrations and will pass the
505  calibration constants created by earlier completed calibrations to dependent ones.
506  """
507 
508  moves = ["submit_collector", "complete", "run_algorithms", "iterate", "fail_fully"]
509 
510  alg_output_dir = "algorithm_output"
511 
512  checkpoint_states = ["init", "collector_completed", "completed"]
513 
514  default_collection_name = "default"
515 
516  def __init__(self,
517  name,
518  collector=None,
519  algorithms=None,
520  input_files=None,
521  pre_collector_path=None,
522  database_chain=None,
523  output_patterns=None,
524  max_files_per_collector_job=None,
525  max_collector_jobs=None,
526  backend_args=None
527  ):
528  """
529  """
530 
531  self.collectionscollections = {}
532 
533  self._algorithms_algorithms = []
534 
535  # Default collection added, will have None type and requires setting later via `self.collector`, or will take the
536  # CollectorModule/module name directly.
537  self.add_collectionadd_collection(self.default_collection_namedefault_collection_name,
538  Collection(collector,
539  input_files,
540  pre_collector_path,
541  database_chain,
542  output_patterns,
543  max_files_per_collector_job,
544  max_collector_jobs,
545  backend_args
546  ))
547 
548  super().__init__(name, input_files)
549  if algorithms:
550 
553  self.algorithmsalgorithmsalgorithmsalgorithms = algorithms
554 
555  self.resultsresults = {}
556 
558  self.max_iterationsmax_iterations = None
559 
563  self.ignored_runsignored_runs = None
564  if self.algorithmsalgorithmsalgorithmsalgorithms:
565 
569  if database_chain:
570 
572  self.database_chaindatabase_chain = database_chain
573  else:
574  self.database_chaindatabase_chain = []
575  # This database is already applied to the `Collection` automatically, so don't do it again
576  for tag in reversed(b2conditions.default_globaltags):
577  self.use_central_databaseuse_central_database(tag, apply_to_default_collection=False)
578 
582 
584  self.backendbackend = None
585 
589  self.collector_full_update_intervalcollector_full_update_interval = 30
590 
591  self.heartbeatheartbeat = 3
592 
593  self.machinemachine = None
594 
595  self._db_path_db_path = None
596 
597  def add_collection(self, name, collection):
598  """
599  Parameters:
600  name (str): Unique name of this `Collection` in the Calibration.
601  collection (`Collection`): `Collection` object to use.
602 
603  Adds a new `Collection` object to the `Calibration`. Any valid Collection will be used in the Calibration.
604  A default Collection is automatically added but isn't valid and won't run unless you have assigned a collector
605  + input files.
606  You can ignore the default one and only add your own custom Collections. You can configure the default from the
607  Calibration(...) arguments or after creating the Calibration object via directly setting the cal.collector, cal.input_files
608  attributes.
609  """
610  if name not in self.collectionscollections:
611  self.collectionscollections[name] = collection
612  else:
613  B2WARNING(f"A Collection with the name '{name}' already exists in this Calibration. It has not been added."
614  "Please use another name.")
615 
616  def is_valid(self):
617  """
618  A full calibration consists of a collector AND an associated algorithm AND input_files.
619 
620  Returns False if:
621  1) We are missing any of the above.
622  2) There are multiple Collections and the Collectors have mis-matched granularities.
623  3) Any of our Collectors have granularities that don't match what our Strategy can use.
624  """
625  if not self.algorithmsalgorithmsalgorithmsalgorithms:
626  B2WARNING(f"Empty algorithm list for {self.name}.")
627  return False
628 
629  if not any([collection.is_valid() for collection in self.collectionscollections.values()]):
630  B2WARNING(f"No valid Collections for {self.name}.")
631  return False
632 
633  granularities = []
634  for collection in self.collectionscollections.values():
635  if collection.is_valid():
636  collector_params = collection.collector.available_params()
637  for param in collector_params:
638  if param.name == "granularity":
639  granularities.append(param.values)
640  if len(set(granularities)) > 1:
641  B2WARNING("Multiple different granularities set for the Collections in this Calibration.")
642  return False
643 
644  for alg in self.algorithmsalgorithmsalgorithmsalgorithms:
645  alg_type = type(alg.algorithm).__name__
646  incorrect_gran = [granularity not in alg.strategy.allowed_granularities for granularity in granularities]
647  if any(incorrect_gran):
648  B2WARNING(f"Selected strategy for {alg_type} does not match a collector's granularity.")
649  return False
650  return True
651 
652  def reset_database(self, apply_to_default_collection=True):
653  """
654  Keyword Arguments:
655  apply_to_default_collection (bool): Should we also reset the default collection?
656 
657  Remove everything in the database_chain of this Calibration, including the default central database tag automatically
658  included from `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`. This will NOT affect the
659  database chain of any `Collection` other than the default one. You can prevent the default Collection from having its chain
660  reset by setting 'apply_to_default_collection' to False.
661  """
662  self.database_chaindatabase_chain = []
663  if self.default_collection_namedefault_collection_name in self.collectionscollections and apply_to_default_collection:
664  self.collectionscollections[self.default_collection_namedefault_collection_name].reset_database()
665 
666  def use_central_database(self, global_tag, apply_to_default_collection=True):
667  """
668  Parameters:
669  global_tag (str): The central database global tag to use for this calibration.
670 
671  Keyword Arguments:
672  apply_to_default_collection (bool): Should we also call use_central_database on the default collection (if it exists)
673 
674  Using this allows you to append a central database to the database chain for this calibration.
675  The default database chain is just the central one from
676  `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`.
677  To turn off central database completely or use a custom tag as the base, you should call `Calibration.reset_database`
678  and start adding databases with `Calibration.use_local_database` and `Calibration.use_central_database`.
679 
680  Note that the database chain attached to the `Calibration` will only affect the default `Collection` (if it exists),
681  and the algorithm processes. So calling:
682 
683  >> cal.use_central_database("global_tag")
684 
685  will modify the database chain used by all the algorithms assigned to this `Calibration`, and modifies the database chain
686  assigned to
687 
688  >> cal.collections['default'].database_chain
689 
690  But calling
691 
692  >> cal.use_central_database(file_path, payload_dir, False)
693 
694  will add the database to the Algorithm processes, but leave the default Collection database chain untouched.
695  So if you have multiple Collections in this Calibration *their database chains are separate*.
696  To specify an additional `CentralDatabase` for a different collection, you will have to call:
697 
698  >> cal.collections['OtherCollection'].use_central_database("global_tag")
699  """
700  central_db = CentralDatabase(global_tag)
701  self.database_chaindatabase_chain.append(central_db)
702  if self.default_collection_namedefault_collection_name in self.collectionscollections and apply_to_default_collection:
703  self.collectionscollections[self.default_collection_namedefault_collection_name].use_central_database(global_tag)
704 
705  def use_local_database(self, filename, directory="", apply_to_default_collection=True):
706  """
707  Parameters:
708  filename (str): The path to the database.txt of the local database
709 
710  Keyword Argumemts:
711  directory (str): The path to the payloads directory for this local database.
712  apply_to_default_collection (bool): Should we also call use_local_database on the default collection (if it exists)
713 
714  Append a local database to the chain for this calibration.
715  You can call this function multiple times and each database will be added to the chain IN ORDER.
716  The databases are applied to this calibration ONLY.
717  The Local and Central databases applied via these functions are applied to the algorithm processes and optionally
718  the default `Collection` job as a database chain.
719  There are other databases applied to the processes later, checked by basf2 in this order:
720 
721  1) Local Database from previous iteration of this Calibration.
722  2) Local Database chain from output of previous dependent Calibrations.
723  3) This chain of Local and Central databases where the last added is checked first.
724 
725  Note that this function on the `Calibration` object will only affect the default `Collection` if it exists and if
726  'apply_to_default_collection' remains True. So calling:
727 
728  >> cal.use_local_database(file_path, payload_dir)
729 
730  will modify the database chain used by all the algorithms assigned to this `Calibration`, and modifies the database chain
731  assigned to
732 
733  >> cal.collections['default'].database_chain
734 
735  But calling
736 
737  >> cal.use_local_database(file_path, payload_dir, False)
738 
739  will add the database to the Algorithm processes, but leave the default Collection database chain untouched.
740 
741  If you have multiple Collections in this Calibration *their database chains are separate*.
742  To specify an additional `LocalDatabase` for a different collection, you will have to call:
743 
744  >> cal.collections['OtherCollection'].use_local_database(file_path, payload_dir)
745 
746  """
747  local_db = LocalDatabase(filename, directory)
748  self.database_chaindatabase_chain.append(local_db)
749  if self.default_collection_namedefault_collection_name in self.collectionscollections and apply_to_default_collection:
750  self.collectionscollections[self.default_collection_namedefault_collection_name].use_local_database(filename, directory)
751 
752  def _get_default_collection_attribute(self, attr):
753  if self.default_collection_namedefault_collection_name in self.collectionscollections:
754  return getattr(self.collectionscollections[self.default_collection_namedefault_collection_name], attr)
755  else:
756  B2WARNING(f"You tried to get the attribute '{attr}' from the Calibration '{self.name}', "
757  "but the default collection doesn't exist."
758  f"You should use the cal.collections['CollectionName'].{attr} to access a custom "
759  "collection's attributes directly.")
760  return None
761 
762  def _set_default_collection_attribute(self, attr, value):
763  if self.default_collection_namedefault_collection_name in self.collectionscollections:
764  setattr(self.collectionscollections[self.default_collection_namedefault_collection_name], attr, value)
765  else:
766  B2WARNING(f"You tried to set the attribute '{attr}' from the Calibration '{self.name}', "
767  "but the default collection doesn't exist."
768  f"You should use the cal.collections['CollectionName'].{attr} to access a custom "
769  "collection's attributes directly.")
770 
771  @property
772  def collector(self):
773  """
774  """
775  return self._get_default_collection_attribute_get_default_collection_attribute("collector")
776 
777  @collector.setter
778  def collector(self, collector):
779  """
780  """
781  # check if collector is already a module or if we need to create one
782  # from the name
783  from basf2 import Module
784  if isinstance(collector, str):
785  from basf2 import register_module
786  collector = register_module(collector)
787  if not isinstance(collector, Module):
788  B2ERROR("Collector needs to be either a Module or the name of such a module")
789 
790  self._set_default_collection_attribute_set_default_collection_attribute("collector", collector)
791 
792  @property
793  def input_files(self):
794  """
795  """
796  return self._get_default_collection_attribute_get_default_collection_attribute("input_files")
797 
798  @input_files.setter
799  def input_files(self, files):
800  """
801  """
802  self._set_default_collection_attribute_set_default_collection_attribute("input_files", files)
803 
804  @property
805  def files_to_iovs(self):
806  """
807  """
808  return self._get_default_collection_attribute_get_default_collection_attribute("files_to_iovs")
809 
810  @files_to_iovs.setter
811  def files_to_iovs(self, file_map):
812  """
813  """
814  self._set_default_collection_attribute_set_default_collection_attribute("files_to_iovs", file_map)
815 
816  @property
818  """
819  """
820  return self._get_default_collection_attribute_get_default_collection_attribute("pre_collector_path")
821 
822  @pre_collector_path.setter
823  def pre_collector_path(self, path):
824  """
825  """
826  self._set_default_collection_attribute_set_default_collection_attribute("pre_collector_path", path)
827 
828  @property
829  def output_patterns(self):
830  """
831  """
832  return self._get_default_collection_attribute_get_default_collection_attribute("output_patterns")
833 
834  @output_patterns.setter
835  def output_patterns(self, patterns):
836  """
837  """
838  self._set_default_collection_attribute_set_default_collection_attribute("output_patterns", patterns)
839 
840  @property
842  """
843  """
844  return self._get_default_collection_attribute_get_default_collection_attribute("max_files_per_collector_job")
845 
846  @max_files_per_collector_job.setter
847  def max_files_per_collector_job(self, max_files):
848  """
849  """
850  self._set_default_collection_attribute_set_default_collection_attribute("max_files_per_collector_job", max_files)
851 
852  @property
854  """
855  """
856  return self._get_default_collection_attribute_get_default_collection_attribute("max_collector_jobs")
857 
858  @max_collector_jobs.setter
859  def max_collector_jobs(self, max_jobs):
860  """
861  """
862  self._set_default_collection_attribute_set_default_collection_attribute("max_collector_jobs", max_jobs)
863 
864  @property
865  def backend_args(self):
866  """
867  """
868  return self._get_default_collection_attribute_get_default_collection_attribute("backend_args")
869 
870  @backend_args.setter
871  def backend_args(self, args):
872  """
873  """
874  self._set_default_collection_attribute_set_default_collection_attribute("backend_args", args)
875 
876  @property
877  def algorithms(self):
878  """
879  """
880  return self._algorithms_algorithms
881 
882  @algorithms.setter
883  @method_dispatch
884  def algorithms(self, value):
885  """
886  """
887  from ROOT.Belle2 import CalibrationAlgorithm
888  if isinstance(value, CalibrationAlgorithm):
889  self._algorithms_algorithms = [Algorithm(value)]
890  else:
891  B2ERROR(f"Something other than CalibrationAlgorithm instance passed in ({type(value)}). "
892  "Algorithm needs to inherit from Belle2::CalibrationAlgorithm")
893 
894  @algorithms.fset.register(tuple)
895  @algorithms.fset.register(list)
896  def _(self, value):
897  """
898  Alternate algorithms setter for lists and tuples of CalibrationAlgorithms.
899  """
900  from ROOT.Belle2 import CalibrationAlgorithm
901  if value:
902  self._algorithms_algorithms = []
903  for alg in value:
904  if isinstance(alg, CalibrationAlgorithm):
905  self._algorithms_algorithms.append(Algorithm(alg))
906  else:
907  B2ERROR((f"Something other than CalibrationAlgorithm instance passed in {type(value)}."
908  "Algorithm needs to inherit from Belle2::CalibrationAlgorithm"))
909 
910  @property
911  def pre_algorithms(self):
912  """
913  Callback run prior to each algorithm iteration.
914  """
915  return [alg.pre_algorithm for alg in self.algorithmsalgorithmsalgorithmsalgorithms]
916 
917  @pre_algorithms.setter
918  @method_dispatch
919  def pre_algorithms(self, func):
920  """
921  """
922  if func:
923  for alg in self.algorithmsalgorithmsalgorithmsalgorithms:
924  alg.pre_algorithm = func
925  else:
926  B2ERROR("Something evaluated as False passed in as pre_algorithm function.")
927 
928  @pre_algorithms.fset.register(tuple)
929  @pre_algorithms.fset.register(list)
930  def _(self, values):
931  """
932  Alternate pre_algorithms setter for lists and tuples of functions, should be one per algorithm.
933  """
934  if values:
935  if len(values) == len(self.algorithmsalgorithmsalgorithmsalgorithms):
936  for func, alg in zip(values, self.algorithmsalgorithmsalgorithmsalgorithms):
937  alg.pre_algorithm = func
938  else:
939  B2ERROR("Number of functions and number of algorithms doesn't match.")
940  else:
941  B2ERROR("Empty container passed in for pre_algorithm functions")
942 
943  @property
944  def strategies(self):
945  """
946  The `caf.strategies.AlgorithmStrategy` or `list` of them used when running the algorithm(s).
947  """
948  return [alg.strategy for alg in self.algorithmsalgorithmsalgorithmsalgorithms]
949 
950  @strategies.setter
951  @method_dispatch
952  def strategies(self, strategy):
953  """
954  """
955  if strategy:
956  for alg in self.algorithmsalgorithmsalgorithmsalgorithms:
957  alg.strategy = strategy
958  else:
959  B2ERROR("Something evaluated as False passed in as a strategy.")
960 
961  @strategies.fset.register(tuple)
962  @strategies.fset.register(list)
963  def _(self, values):
964  """
965  Alternate strategies setter for lists and tuples of functions, should be one per algorithm.
966  """
967  if values:
968  if len(values) == len(self.algorithmsalgorithmsalgorithmsalgorithms):
969  for strategy, alg in zip(strategies, self.algorithmsalgorithmsalgorithmsalgorithms):
970  alg.strategy = strategy
971  else:
972  B2ERROR("Number of strategies and number of algorithms doesn't match.")
973  else:
974  B2ERROR("Empty container passed in for strategies list")
975 
976  def __repr__(self):
977  """
978  """
979  return self.namename
980 
981  def run(self):
982  """
983  Main logic of the Calibration object.
984  Will be run in a new Thread by calling the start() method.
985  """
986  with CAFDB(self._db_path_db_path, read_only=True) as db:
987  initial_state = db.get_calibration_value(self.namename, "checkpoint")
988  initial_iteration = db.get_calibration_value(self.namename, "iteration")
989  B2INFO("Initial status of {} found to be state={}, iteration={}".format(self.namename,
990  initial_state,
991  initial_iteration))
992  self.machinemachine = CalibrationMachine(self,
993  iov_to_calibrate=self.ioviov,
994  initial_state=initial_state,
995  iteration=initial_iteration)
996  self.statestatestatestate = initial_state
997  self.machinemachine.root_dir = Path(os.getcwd(), self.namename)
998  self.machinemachine.collector_backend = self.backendbackend
999 
1000  # Before we start running, let's clean up any iteration directories from iterations above our initial one.
1001  # Should prevent confusion between attempts if we fail again.
1002  all_iteration_paths = find_int_dirs(self.machinemachine.root_dir)
1003  for iteration_path in all_iteration_paths:
1004  if int(iteration_path.name) > initial_iteration:
1005  shutil.rmtree(iteration_path)
1006 
1007  while self.statestatestatestate != self.end_stateend_state and self.statestatestatestate != self.fail_statefail_state:
1008  if self.statestatestatestate == "init":
1009  try:
1010  B2INFO(f"Attempting collector submission for calibration {self.name}.")
1011  self.machinemachine.submit_collector()
1012  except Exception as err:
1013  B2FATAL(str(err))
1014 
1015  self._poll_collector_poll_collector()
1016 
1017  # If we failed take us to the final fail state
1018  if self.statestatestatestate == "collector_failed":
1019  self.machinemachine.fail_fully()
1020  return
1021 
1022  # It's possible that we might raise an error while attempting to run due
1023  # to some problems e.g. Missing collector output files
1024  # We catch the error and exit with failed state so the CAF will stop
1025  try:
1026  B2INFO(f"Attempting to run algorithms for calibration {self.name}.")
1027  self.machinemachine.run_algorithms()
1028  except MachineError as err:
1029  B2ERROR(str(err))
1030  self.machinemachine.fail()
1031 
1032  # If we failed take us to the final fail state
1033  if self.machinemachine.state == "algorithms_failed":
1034  self.machinemachine.fail_fully()
1035  return
1036 
1037  def _poll_collector(self):
1038  """
1039  """
1040  while self.statestatestatestate == "running_collector":
1041  try:
1042  self.machinemachine.complete()
1043  # ConditionError is thrown when the condtions for the transition have returned false, it's not serious.
1044  except ConditionError:
1045  try:
1046  B2DEBUG(29, f"Checking if collector jobs for calibration {self.name} have failed.")
1047  self.machinemachine.fail()
1048  except ConditionError:
1049  pass
1050  sleep(self.heartbeatheartbeat) # Sleep until we want to check again
1051 
1052  @property
1053  def state(self):
1054  """
1055  The current major state of the calibration in the database file. The machine may have a different state.
1056  """
1057  with CAFDB(self._db_path_db_path, read_only=True) as db:
1058  state = db.get_calibration_value(self.namename, "state")
1059  return state
1060 
1061  @state.setter
1062  def state(self, state):
1063  """
1064  """
1065  B2DEBUG(29, f"Setting {self.name} to state {state}.")
1066  with CAFDB(self._db_path_db_path) as db:
1067  db.update_calibration_value(self.namename, "state", str(state))
1068  if state in self.checkpoint_statescheckpoint_states:
1069  db.update_calibration_value(self.namename, "checkpoint", str(state))
1070  B2DEBUG(29, f"{self.name} set to {state}.")
1071 
1072  @property
1073  def iteration(self):
1074  """
1075  Retrieves the current iteration number in the database file.
1076 
1077  Returns:
1078  int: The current iteration number
1079  """
1080  with CAFDB(self._db_path_db_path, read_only=True) as db:
1081  iteration = db.get_calibration_value(self.namename, "iteration")
1082  return iteration
1083 
1084  @iteration.setter
1085  def iteration(self, iteration):
1086  """
1087  """
1088  B2DEBUG(29, f"Setting {self.name} to {iteration}.")
1089  with CAFDB(self._db_path_db_path) as db:
1090  db.update_calibration_value(self.namename, "iteration", iteration)
1091  B2DEBUG(29, f"{self.name} set to {self.iteration}.")
1092 
1093 
1094 class Algorithm():
1095  """
1096  Parameters:
1097  algorithm: The CalibrationAlgorithm instance that we want to execute.
1098  Keyword Arguments:
1099  data_input (types.FunctionType): An optional function that sets the input files of the algorithm.
1100  pre_algorithm (types.FunctionType): An optional function that runs just prior to execution of the algorithm.
1101  Useful for set up e.g. module initialisation
1102 
1103  This is a simple wrapper class around the C++ CalibrationAlgorithm class.
1104  It helps to add functionality to algorithms for use by the Calibration and CAF classes rather
1105  than separating the logic into those classes directly.
1106 
1107  This is **not** currently a class that a user should interact with much during `CAF`
1108  setup (unless you're doing something advanced).
1109  The `Calibration` class should be doing the most of the creation of the defaults for these objects.
1110 
1111  Setting the `data_input` function might be necessary if you have set the `Calibration.output_patterns`.
1112  Also, setting the `pre_algorithm` to a function that should execute prior to each `strategies.AlgorithmStrategy`
1113  is often useful i.e. by calling for the Geometry module to initialise.
1114  """
1115 
1116  def __init__(self, algorithm, data_input=None, pre_algorithm=None):
1117  """
1118  """
1119 
1120  self.algorithmalgorithm = algorithm
1121 
1122  cppname = type(algorithm).__cpp_name__
1123  self.namename = cppname[cppname.rfind('::') + 2:]
1124 
1127  self.data_inputdata_input = data_input
1128  if not self.data_inputdata_input:
1129  self.data_inputdata_input = self.default_inputdata_setupdefault_inputdata_setup
1130 
1133  self.pre_algorithmpre_algorithm = pre_algorithm
1134 
1137 
1142  self.paramsparams = {}
1143 
1144  def default_inputdata_setup(self, input_file_paths):
1145  """
1146  Simple setup to set the input file names to the algorithm. Applied to the data_input attribute
1147  by default. This simply takes all files returned from the `Calibration.output_patterns` and filters
1148  for only the CollectorOutput.root files. Then it sets them as input files to the CalibrationAlgorithm class.
1149  """
1150  collector_output_files = list(filter(lambda file_path: "CollectorOutput.root" == Path(file_path).name,
1151  input_file_paths))
1152  info_lines = [f"Input files used in {self.name}:"]
1153  info_lines.extend(collector_output_files)
1154  B2INFO_MULTILINE(info_lines)
1155  self.algorithmalgorithm.setInputFileNames(collector_output_files)
1156 
1157 
1158 class CAF():
1159  """
1160  Parameters:
1161  calibration_defaults (dict): A dictionary of default options for calibrations run by this `CAF` instance e.g.
1162 
1163  >>> calibration_defaults={"max_iterations":2}
1164 
1165  This class holds `Calibration` objects and processes them. It defines the initial configuration/setup
1166  for the calibrations. But most of the real processing is done through the `caf.state_machines.CalibrationMachine`.
1167 
1168  The `CAF` class essentially does some initial setup, holds the `CalibrationBase` instances and calls the
1169  `CalibrationBase.start` when the dependencies are met.
1170 
1171  Much of the checking for consistency is done in this class so that no processing is done with an invalid
1172  setup. Choosing which files to use as input should be done from outside during the setup of the `CAF` and
1173  `CalibrationBase` instances.
1174  """
1175 
1176 
1177  _db_name = "caf_state.db"
1178 
1179  default_calibration_config = {
1180  "max_iterations": 5,
1181  "ignored_runs": []
1182  }
1183 
1184  def __init__(self, calibration_defaults=None):
1185  """
1186  """
1187 
1188  self.calibrationscalibrations = {}
1189 
1191  self.future_dependenciesfuture_dependencies = {}
1192 
1194  self.dependenciesdependencies = {}
1195 
1196  self.output_diroutput_dir = "calibration_results"
1197 
1198  self.orderorder = None
1199 
1200  self._backend_backend = None
1201 
1202  self.heartbeatheartbeat = 5
1203 
1204  if not calibration_defaults:
1205  calibration_defaults = {}
1206 
1208  self.calibration_defaultscalibration_defaults = {**self.default_calibration_configdefault_calibration_config, **calibration_defaults}
1209 
1210  self._db_path_db_path = None
1211 
1212  def add_calibration(self, calibration):
1213  """
1214  Adds a `Calibration` that is to be used in this program to the list.
1215  Also adds an empty dependency list to the overall dictionary.
1216  You should not directly alter a `Calibration` object after it has been
1217  added here.
1218  """
1219  if calibration.is_valid():
1220  if calibration.name not in self.calibrationscalibrations:
1221  self.calibrationscalibrations[calibration.name] = calibration
1222  else:
1223  B2WARNING(f"Tried to add a calibration with the name {calibration.name} twice.")
1224  else:
1225  B2WARNING((f"Tried to add incomplete/invalid calibration ({calibration.name}) to the framwork."
1226  "It was not added and will not be part of the final process."))
1227 
1229  """
1230  This checks the future and past dependencies of each `Calibration` in the `CAF`.
1231  If any dependencies are not known to the `CAF` then they are removed from the `Calibration`
1232  object directly.
1233  """
1234  calibration_names = [calibration.name for calibration in self.calibrationscalibrations.values()]
1235 
1236  def is_dependency_in_caf(dependency):
1237  """
1238  Quick function to use with filter() and check dependencies against calibrations known to `CAF`
1239  """
1240  dependency_in_caf = dependency.name in calibration_names
1241  if not dependency_in_caf:
1242  B2WARNING(f"The calibration {dependency.name} is a required dependency but is not in the CAF."
1243  " It has been removed as a dependency.")
1244  return dependency_in_caf
1245 
1246  # Check that there aren't dependencies on calibrations not added to the framework
1247  # Remove them from the calibration objects if there are.
1248  for calibration in self.calibrationscalibrations.values():
1249  filtered_future_dependencies = list(filter(is_dependency_in_caf, calibration.future_dependencies))
1250  calibration.future_dependencies = filtered_future_dependencies
1251 
1252  filtered_dependencies = list(filter(is_dependency_in_caf, calibration.dependencies))
1253  calibration.dependencies = filtered_dependencies
1254 
1256  """
1257  - Uses dependency atrributes of calibrations to create a dependency dictionary and passes it
1258  to a sorting algorithm.
1259  - Returns valid OrderedDict if sort was succesful, empty one if it failed (most likely a cyclic dependency)
1260  """
1261  # First remove any dependencies on calibrations not added to the CAF
1262  self._remove_missing_dependencies_remove_missing_dependencies()
1263  # Filling dependencies dictionaries of CAF for sorting, only explicit dependencies for now
1264  # Note that they currently use the names not the calibration objects.
1265  for calibration in self.calibrationscalibrations.values():
1266  future_dependencies_names = [dependency.name for dependency in calibration.future_dependencies]
1267  past_dependencies_names = [dependency.name for dependency in calibration.dependencies]
1268 
1269  self.future_dependenciesfuture_dependencies[calibration.name] = future_dependencies_names
1270  self.dependenciesdependencies[calibration.name] = past_dependencies_names
1271  # Gives us a list of A (not THE) valid ordering and checks for cyclic dependencies
1272  order = topological_sort(self.future_dependenciesfuture_dependencies)
1273  if not order:
1274  return False
1275 
1276  # Get an ordered dictionary of the sort order but including all implicit dependencies.
1277  ordered_full_dependencies = all_dependencies(self.future_dependenciesfuture_dependencies, order)
1278 
1279  # Return all the implicit+explicit past dependencies
1280  full_past_dependencies = past_from_future_dependencies(ordered_full_dependencies)
1281  # Correct each calibration's dependency list to reflect the implicit dependencies
1282  for calibration in self.calibrationscalibrations.values():
1283  full_deps = full_past_dependencies[calibration.name]
1284  explicit_deps = [cal.name for cal in calibration.dependencies]
1285  for dep in full_deps:
1286  if dep not in explicit_deps:
1287  calibration.dependencies.append(self.calibrationscalibrations[dep])
1288  # At this point the calibrations have their full dependencies but they aren't in topological
1289  # sort order. Correct that here
1290  ordered_dependency_list = []
1291  for ordered_calibration_name in order:
1292  if ordered_calibration_name in [dep.name for dep in calibration.dependencies]:
1293  ordered_dependency_list.append(self.calibrationscalibrations[ordered_calibration_name])
1294  calibration.dependencies = ordered_dependency_list
1295  order = ordered_full_dependencies
1296  # We should also patch in all of the implicit dependencies for the calibrations
1297  return order
1298 
1299  def _check_backend(self):
1300  """
1301  Makes sure that the CAF has a valid backend setup. If one isn't set by the user (or if the
1302  one that is stored isn't a valid Backend object) we should create a default Local backend.
1303  """
1304  if not isinstance(self._backend_backend, caf.backends.Backend):
1305 
1306  self.backendbackendbackendbackend = caf.backends.Local()
1307 
1309  """
1310  Checks all current calibrations and removes any invalid Collections from their collections list.
1311  """
1312  B2INFO("Checking for any invalid Collections in Calibrations.")
1313  for calibration in self.calibrationscalibrations.values():
1314  valid_collections = {}
1315  for name, collection in calibration.collections.items():
1316  if collection.is_valid():
1317  valid_collections[name] = collection
1318  else:
1319  B2WARNING(f"Removing invalid Collection '{name}' from Calibration '{calibration.name}'.")
1320  calibration.collections = valid_collections
1321 
1322  def run(self, iov=None):
1323  """
1324  Keyword Arguments:
1325  iov(`caf.utils.IoV`): IoV to calibrate for this processing run. Only the input files necessary to calibrate
1326  this IoV will be used in the collection step.
1327 
1328  This function runs the overall calibration job, saves the outputs to the output_dir directory,
1329  and creates database payloads.
1330 
1331  Upload of final databases is not done here. This simply creates the local databases in
1332  the output directory. You should check the validity of your new local database before uploading
1333  to the conditions DB via the basf2 tools/interface to the DB.
1334  """
1335  if not self.calibrationscalibrations:
1336  B2FATAL("There were no Calibration objects to run. Maybe you tried to add invalid ones?")
1337  # Checks whether the dependencies we've added will give a valid order
1338  order = self._order_calibrations_order_calibrations()
1339  if not order:
1340  B2FATAL("Couldn't order the calibrations properly. Could be a cyclic dependency.")
1341 
1342  # Check that a backend has been set and use default Local() one if not
1343  self._check_backend_check_backend()
1344 
1345  self._prune_invalid_collections_prune_invalid_collections()
1346 
1347  # Creates the overall output directory and reset the attribute to use an absolute path to it.
1348  self.output_diroutput_dir = self._make_output_dir_make_output_dir()
1349 
1350  # Creates a SQLite DB to save the status of the various calibrations
1351  self._make_database_make_database()
1352 
1353  # Enter the overall output dir during processing and opena connection to the DB
1354  with temporary_workdir(self.output_diroutput_dir):
1355  db = CAFDB(self._db_path_db_path)
1356  db.open()
1357  db_initial_calibrations = db.query("select * from calibrations").fetchall()
1358  for calibration in self.calibrationscalibrations.values():
1359  # Apply defaults given to the `CAF` to the calibrations if they aren't set
1360  calibration._apply_calibration_defaults(self.calibration_defaultscalibration_defaults)
1361  calibration._db_path = self._db_path_db_path
1362  calibration.output_database_dir = Path(self.output_diroutput_dir, calibration.name, "outputdb").as_posix()
1363  calibration.iov = iov
1364  if not calibration.backend:
1365  calibration.backend = self.backendbackendbackendbackend
1366  # Do some checking of the db to see if we need to add an entry for this calibration
1367  if calibration.name not in [db_cal[0] for db_cal in db_initial_calibrations]:
1368  db.insert_calibration(calibration.name)
1369  db.commit()
1370  else:
1371  for cal_info in db_initial_calibrations:
1372  if cal_info[0] == calibration.name:
1373  cal_initial_state = cal_info[2]
1374  cal_initial_iteration = cal_info[3]
1375  B2INFO(f"Previous entry in database found for {calibration.name}.")
1376  B2INFO(f"Setting {calibration.name} state to checkpoint state '{cal_initial_state}'.")
1377  calibration.state = cal_initial_state
1378  B2INFO(f"Setting {calibration.name} iteration to '{cal_initial_iteration}'.")
1379  calibration.iteration = cal_initial_iteration
1380  # Daemonize so that it exits if the main program exits
1381  calibration.daemon = True
1382 
1383  db.close()
1384 
1385  # Is it possible to keep going?
1386  keep_running = True
1387  while keep_running:
1388  keep_running = False
1389  # Do we have calibrations that may yet complete?
1390  remaining_calibrations = []
1391 
1392  for calibration in self.calibrationscalibrations.values():
1393  # Find the currently ended calibrations (may not be joined yet)
1394  if (calibration.state == CalibrationBase.end_state or calibration.state == CalibrationBase.fail_state):
1395  # Search for any alive Calibrations and join them
1396  if calibration.is_alive():
1397  B2DEBUG(29, f"Joining {calibration.name}.")
1398  calibration.join()
1399  else:
1400  if calibration.dependencies_met():
1401  if not calibration.is_alive():
1402  B2DEBUG(29, f"Starting {calibration.name}.")
1403  try:
1404  calibration.start()
1405  except RuntimeError:
1406  # Catch the case when the calibration just finished so it ended up here
1407  # in the "else" and not above where it should have been joined.
1408  B2DEBUG(29, f"{calibration.name} probably just finished, join it later.")
1409  remaining_calibrations.append(calibration)
1410  else:
1411  if not calibration.failed_dependencies():
1412  remaining_calibrations.append(calibration)
1413  if remaining_calibrations:
1414  keep_running = True
1415  # Loop over jobs that the calibrations want submitted and submit them.
1416  # We do this here because some backends don't like us submitting in parallel from multiple CalibrationThreads
1417  # So this is like a mini job queue without getting too clever with it
1418  for calibration in remaining_calibrations:
1419  for job in calibration.jobs_to_submit[:]:
1420  calibration.backend.submit(job)
1421  calibration.jobs_to_submit.remove(job)
1422  sleep(self.heartbeatheartbeat)
1423 
1424  B2INFO("Printing summary of final CAF status.")
1425  with CAFDB(self._db_path_db_path, read_only=True) as db:
1426  print(db.output_calibration_table())
1427 
1428  @property
1429  def backend(self):
1430  """
1431  The `backend <backends.Backend>` that runs the collector job.
1432  When set, this is checked that a `backends.Backend` class instance was passed in.
1433  """
1434  return self._backend_backend
1435 
1436  @backend.setter
1437  def backend(self, backend):
1438  """
1439  """
1440  if isinstance(backend, caf.backends.Backend):
1441  self._backend_backend = backend
1442  else:
1443  B2ERROR('Backend property must inherit from Backend class.')
1444 
1445  def _make_output_dir(self):
1446  """
1447  Creates the output directory. If it already exists we are now going to try and restart the program from the last state.
1448 
1449  Returns:
1450  str: The absolute path of the new output_dir
1451  """
1452  p = Path(self.output_diroutput_dir).resolve()
1453  if p.is_dir():
1454  B2INFO(f"{p.as_posix()} output directory already exists. "
1455  "We will try to restart from the previous finishing state.")
1456  return p.as_posix()
1457  else:
1458  p.mkdir(parents=True)
1459  if p.is_dir():
1460  return p.as_posix()
1461  else:
1462  raise FileNotFoundError(f"Attempted to create output_dir {p.as_posix()}, but it didn't work.")
1463 
1464  def _make_database(self):
1465  """
1466  Creates the CAF status database. If it already exists we don't overwrite it.
1467  """
1468  self._db_path_db_path = Path(self.output_diroutput_dir, self._db_name_db_name).absolute()
1469  if self._db_path_db_path.exists():
1470  B2INFO(f"Previous CAF database found {self._db_path}")
1471  # Will create a new database + tables, or do nothing but checks we can connect to existing one
1472  with CAFDB(self._db_path_db_path):
1473  pass
pre_algorithm
Function called after data_input but before algorithm.execute to do any remaining setup.
Definition: framework.py:1133
data_input
Function called before the pre_algorithm method to setup the input data that the CalibrationAlgorithm...
Definition: framework.py:1127
params
Parameters that could be used in the execution of the algorithm strategy/runner to modify behaviour.
Definition: framework.py:1142
def __init__(self, algorithm, data_input=None, pre_algorithm=None)
Definition: framework.py:1116
algorithm
CalibrationAlgorithm instance (assumed to be true since the Calibration class checks)
Definition: framework.py:1120
strategy
The algorithm stratgey that will be used when running over the collected data.
Definition: framework.py:1136
name
The name of the algorithm, default is the Algorithm class name.
Definition: framework.py:1123
def default_inputdata_setup(self, input_file_paths)
Definition: framework.py:1144
def _check_backend(self)
Definition: framework.py:1299
def backend(self, backend)
Definition: framework.py:1437
def backend(self)
Definition: framework.py:1429
def _prune_invalid_collections(self)
Definition: framework.py:1308
calibrations
Dictionary of calibrations for this CAF instance.
Definition: framework.py:1188
heartbeat
The heartbeat (seconds) between polling for Calibrations that are finished.
Definition: framework.py:1202
future_dependencies
Dictionary of future dependencies of Calibration objects, where the value is all calibrations that wi...
Definition: framework.py:1191
calibration_defaults
Default options applied to each calibration known to the CAF, if the Calibration has these defined by...
Definition: framework.py:1208
_backend
Private backend attribute.
Definition: framework.py:1200
_db_path
The path of the SQLite DB.
Definition: framework.py:1210
def _order_calibrations(self)
Definition: framework.py:1255
order
The ordering and explicit future dependencies of calibrations.
Definition: framework.py:1198
def _make_database(self)
Definition: framework.py:1464
string _db_name
The name of the SQLite DB that gets created.
Definition: framework.py:1177
def _make_output_dir(self)
Definition: framework.py:1445
def add_calibration(self, calibration)
Definition: framework.py:1212
dictionary default_calibration_config
The defaults for Calibrations.
Definition: framework.py:1179
backend
backend property
Definition: framework.py:1306
def run(self, iov=None)
Definition: framework.py:1322
output_dir
Output path to store results of calibration and bookkeeping information.
Definition: framework.py:1196
dependencies
Dictionary of dependencies of Calibration objects, where value is the list of Calibration objects tha...
Definition: framework.py:1194
def _remove_missing_dependencies(self)
Definition: framework.py:1228
def __init__(self, calibration_defaults=None)
Definition: framework.py:1184
def __init__(self, name, input_files=None)
Definition: framework.py:321
output_database_dir
The directory where we'll store the local database payloads from this calibration.
Definition: framework.py:347
input_files
Files used for collection procedure.
Definition: framework.py:340
save_payloads
Marks this Calibration as one which has payloads that should be copied and uploaded.
Definition: framework.py:350
def depends_on(self, calibration)
Definition: framework.py:368
files_to_iovs
File -> Iov dictionary, should be : {absolute_file_path:iov} : Where iov is a :py:class:IoV <caf....
Definition: framework.py:337
future_dependencies
List of calibration objects that depend on this one.
Definition: framework.py:328
name
Name of calibration object.
Definition: framework.py:326
jobs_to_submit
A simple list of jobs that this Calibration wants submitted at some point.
Definition: framework.py:352
string end_state
The name of the successful completion state.
Definition: framework.py:316
string fail_state
The name of the failure state.
Definition: framework.py:319
def _apply_calibration_defaults(self, defaults)
Definition: framework.py:412
def failed_dependencies(self)
Definition: framework.py:402
iov
IoV which will be calibrated.
Definition: framework.py:345
dependencies
List of calibration objects, where each one is a dependency of this one.
Definition: framework.py:330
def _poll_collector(self)
Definition: framework.py:1037
def algorithms(self, value)
Definition: framework.py:884
def use_central_database(self, global_tag, apply_to_default_collection=True)
Definition: framework.py:666
def strategies(self)
Definition: framework.py:944
def _(self, value)
Definition: framework.py:896
collections
Collections stored for this calibration.
Definition: framework.py:531
ignored_runs
List of ExpRun that will be ignored by this Calibration.
Definition: framework.py:563
def pre_algorithms(self)
Definition: framework.py:911
results
Output results of algorithms for each iteration.
Definition: framework.py:555
def backend_args(self)
Definition: framework.py:865
strategies
The strategy that the algorithm(s) will be run against.
Definition: framework.py:568
def __init__(self, name, collector=None, algorithms=None, input_files=None, pre_collector_path=None, database_chain=None, output_patterns=None, max_files_per_collector_job=None, max_collector_jobs=None, backend_args=None)
Definition: framework.py:527
_algorithms
Internal calibration algorithms stored for this calibration.
Definition: framework.py:533
def use_local_database(self, filename, directory="", apply_to_default_collection=True)
Definition: framework.py:705
def _get_default_collection_attribute(self, attr)
Definition: framework.py:752
heartbeat
This calibration's sleep time before rechecking to see if it can move state.
Definition: framework.py:591
def strategies(self, strategy)
Definition: framework.py:952
machine
The caf.state_machines.CalibrationMachine that we will run to process this calibration start to finis...
Definition: framework.py:593
def max_files_per_collector_job(self)
Definition: framework.py:841
def _set_default_collection_attribute(self, attr, value)
Definition: framework.py:762
def algorithms(self)
Definition: framework.py:877
list checkpoint_states
Checkpoint states which we are allowed to restart from.
Definition: framework.py:512
database_chain
The database chain that is applied to the algorithms.
Definition: framework.py:572
_db_path
Location of a SQLite database that will save the state of the calibration so that it can be restarted...
Definition: framework.py:595
max_iterations
Variable to define the maximum number of iterations for this calibration specifically.
Definition: framework.py:558
def pre_collector_path(self)
Definition: framework.py:817
def add_collection(self, name, collection)
Definition: framework.py:597
collector_full_update_interval
While checking if the collector is finished we don't bother wastefully checking every subjob's status...
Definition: framework.py:589
def reset_database(self, apply_to_default_collection=True)
Definition: framework.py:652
def output_patterns(self)
Definition: framework.py:829
def state(self, state)
Definition: framework.py:1062
algorithms
Algorithm classes that wil be run by this Calibration.
Definition: framework.py:553
backend
The backend <backends.Backend> we'll use for our collector submission in this calibration.
Definition: framework.py:584
algorithms_runner
The class that runs all the algorithms in this Calibration using their assigned :py:class:caf....
Definition: framework.py:581
string default_collection_name
Default collection name.
Definition: framework.py:514
def max_collector_jobs(self)
Definition: framework.py:853
int default_max_collector_jobs
The default maximum number of collector jobs to create.
Definition: framework.py:70
input_files
Internal input_files stored for this calibration.
Definition: framework.py:88
_collector
Internal storage of collector attribute.
Definition: framework.py:261
def max_collector_jobs(self, value)
Definition: framework.py:277
files_to_iovs
File -> Iov dictionary, should be : {absolute_file_path:iov} : Where iov is a :py:class:IoV <caf....
Definition: framework.py:97
def reset_database(self)
Definition: framework.py:151
def max_files_per_collector_job(self)
Definition: framework.py:284
database_chain
The database chain used for this Collection.
Definition: framework.py:135
def input_files(self, value)
Definition: framework.py:228
output_patterns
Output patterns of files produced by collector which will be used to pass to the Algorithm....
Definition: framework.py:109
def collector(self)
Definition: framework.py:242
splitter
The SubjobSplitter to use when constructing collector subjobs from the overall Job object.
Definition: framework.py:115
job_cmd
The Collector caf.backends.Job.cmd attribute.
Definition: framework.py:149
def input_files(self)
Definition: framework.py:224
def use_local_database(self, filename, directory="")
Definition: framework.py:183
backend_args
Dictionary passed to the collector Job object to configure how the caf.backends.Backend instance shou...
Definition: framework.py:127
def max_files_per_collector_job(self, value)
Definition: framework.py:291
pre_collector_path
Since many collectors require some different setup, if you set this attribute to a basf2....
Definition: framework.py:102
collector
Collector module of this collection.
Definition: framework.py:86
def uri_list_from_input_file(input_file)
Definition: framework.py:204
def collector(self, collector)
Definition: framework.py:248
def max_collector_jobs(self)
Definition: framework.py:270
def use_central_database(self, global_tag)
Definition: framework.py:158
std::map< ExpRun, std::pair< double, double > > filter(const std::map< ExpRun, std::pair< double, double >> &runs, double cut, std::map< ExpRun, std::pair< double, double >> &runsRemoved)
filter events to remove runs shorter than cut, it stores removed runs in runsRemoved
Definition: Splitter.cc:40