Belle II Software development
framework.py
1#!/usr/bin/env python3
2
3
10
11"""
12This module implements several objects/functions to configure and run calibrations.
13These classes are used to construct the workflow of the calibration job.
14The actual processing code is mostly in the `caf.state_machines` module.
15"""
16
17__all__ = ["CalibrationBase", "Calibration", "Algorithm", "CAF"]
18
19import os
20from threading import Thread
21from time import sleep, time
22from pathlib import Path
23import shutil
24from glob import glob
25
26from basf2 import B2ERROR, B2WARNING, B2INFO, B2FATAL, B2DEBUG
27from basf2 import find_file
28from basf2 import conditions as b2conditions
29
30from abc import ABC, abstractmethod
31
32import caf
33from caf.utils import B2INFO_MULTILINE
34from caf.utils import past_from_future_dependencies
35from caf.utils import topological_sort
36from caf.utils import all_dependencies
37from caf.utils import method_dispatch
38from caf.utils import temporary_workdir
39from caf.utils import find_int_dirs
40from caf.utils import LocalDatabase
41from caf.utils import CentralDatabase
42from caf.utils import parse_file_uri
43
44import caf.strategies as strategies
45import caf.runners as runners
46from caf.backends import MaxSubjobsSplitter, MaxFilesSplitter
47from caf.state_machines import CalibrationMachine, ConditionError, MachineError
48from caf.database import CAFDB
49
50
51class Collection():
52 """
53 Keyword Arguments:
54 collector (str, basf2.Module): The collector module or module name for this `Collection`.
55 input_files (list[str]): The input files to be used for only this `Collection`.
56 pre_collection_path (basf2.Path): The reconstruction `basf2.Path` to be run prior to the Collector module.
57 database_chain (list[CentralDatabase, LocalDatabase]): The database chain to be used initially for this `Collection`.
58 output_patterns (list[str]): Output patterns of files produced by collector which will be used to pass to the
59 `Algorithm.data_input` function. Setting this here, replaces the default completely.
60 max_files_for_collector_job (int): Maximum number of input files sent to each collector subjob for this `Collection`.
61 Technically this sets the SubjobSplitter to be used, not compatible with max_collector_jobs.
62 max_collector_jobs (int): Maximum number of collector subjobs for this `Collection`.
63 Input files are split evenly between them. Technically this sets the SubjobSplitter to be used. Not compatible with
64 max_files_for_collector_job.
65 backend_args (dict): The args for the backend submission of this `Collection`.
66 """
67
69 default_max_collector_jobs = 1000
70
72 job_config = "collector_job.json"
73
74 def __init__(self,
75 collector=None,
76 input_files=None,
77 pre_collector_path=None,
78 database_chain=None,
79 output_patterns=None,
80 max_files_per_collector_job=None,
81 max_collector_jobs=None,
82 backend_args=None
83 ):
84 """
85 """
86
87 self.collector = collector
88
89 self.input_files = []
90 if input_files:
91 self.input_files = input_files
92
98 self.files_to_iovs = {}
99
104 if pre_collector_path:
105 self.pre_collector_path = pre_collector_path
106
110 self.output_patterns = ["CollectorOutput.root"]
111 if output_patterns:
112 self.output_patterns = output_patterns
113
114
116 self.splitter = None
117 if max_files_per_collector_job and max_collector_jobs:
118 B2FATAL("Cannot set both 'max_files_per_collector_job' and 'max_collector_jobs' of a collection!")
119 # \cond false positive doxygen warning
120 elif max_files_per_collector_job:
121 self.max_files_per_collector_job = max_files_per_collector_job
122 elif max_collector_jobs:
123 self.max_collector_jobs = max_collector_jobs
124 else:
126 # \endcond
127
128
130 self.backend_args = {}
131 if backend_args:
132 self.backend_args = backend_args
133
134
138 if database_chain:
139 self.database_chain = database_chain
140 else:
141 # This may seem weird but the changes to the DB interface mean that they have effectively swapped from being
142 # described well by appending to a list to a deque. So we do bit of reversal to translate it back and make the
143 # most important GT the last one encountered.
144 for tag in reversed(b2conditions.default_globaltags):
145 self.use_central_database(tag)
146
147
148 self.job_script = Path(find_file("calibration/scripts/caf/run_collector_path.py")).absolute()
149 """The basf2 steering file that will be used for Collector jobs run by this collection.
150This script will be copied into subjob directories as part of the input sandbox."""
151
152
153 self.job_cmd = ["basf2", self.job_script.name, "--job-information job_info.json"]
154
155 def reset_database(self):
156 """
157 Remove everything in the database_chain of this Calibration, including the default central database
158 tag automatically included from `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`.
159 """
160 self.database_chain = []
161
162 def use_central_database(self, global_tag):
163 """
164 Parameters:
165 global_tag (str): The central database global tag to use for this calibration.
166
167 Using this allows you to add a central database to the head of the global tag database chain for this collection.
168 The default database chain is just the central one from
169 `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`.
170 The input file global tag will always be overridden and never used unless explicitly set.
171
172 To turn off central database completely or use a custom tag as the base, you should call `Calibration.reset_database`
173 and start adding databases with `Calibration.use_local_database` and `Calibration.use_central_database`.
174
175 Alternatively you could set an empty list as the input database_chain when adding the Collection to the Calibration.
176
177 NOTE!! Since ``release-04-00-00`` the behaviour of basf2 conditions databases has changed.
178 All local database files MUST now be at the head of the 'chain', with all central database global tags in their own
179 list which will be checked after all local database files have been checked.
180
181 So even if you ask for ``["global_tag1", "localdb/database.txt", "global_tag2"]`` to be the database chain, the real order
182 that basf2 will use them is ``["global_tag1", "global_tag2", "localdb/database.txt"]`` where the file is checked first.
183 """
184 central_db = CentralDatabase(global_tag)
185 self.database_chain.append(central_db)
186
187 def use_local_database(self, filename, directory=""):
188 """
189 Parameters:
190 filename (str): The path to the database.txt of the local database
191 directory (str): The path to the payloads directory for this local database.
192
193 Append a local database to the chain for this collection.
194 You can call this function multiple times and each database will be added to the chain IN ORDER.
195 The databases are applied to this collection ONLY.
196
197 NOTE!! Since release-04-00-00 the behaviour of basf2 conditions databases has changed.
198 All local database files MUST now be at the head of the 'chain', with all central database global tags in their own
199 list which will be checked after all local database files have been checked.
200
201 So even if you ask for ["global_tag1", "localdb/database.txt", "global_tag2"] to be the database chain, the real order
202 that basf2 will use them is ["global_tag1", "global_tag2", "localdb/database.txt"] where the file is checked first.
203 """
204 local_db = LocalDatabase(filename, directory)
205 self.database_chain.append(local_db)
206
207 @staticmethod
209 """
210 Parameters:
211 input_file (str): A local file/glob pattern or XROOTD URI
212
213 Returns:
214 list: A list of the URIs found from the initial string.
215 """
216 # By default we assume it is a local file path if no "scheme" is given
217 uri = parse_file_uri(input_file)
218 if uri.scheme == "file":
219 # For local files we also perform a glob just in case it is a wildcard pattern.
220 # That way we will have all the uris of files separately
221 uris = [parse_file_uri(f).geturl() for f in glob(input_file)]
222 else:
223 # Just let everything else through and hop the backend can figure it out
224 uris = [input_file]
225 return uris
226
227 @property
228 def input_files(self):
229 """
230 """
231 return self._input_files
232
233 @input_files.setter
234 def input_files(self, value):
235 """
236 """
237 if isinstance(value, str):
238 # If it's a string, we convert to a list of URIs
239
241 elif isinstance(value, list):
242 # If it's a list we loop and do the same thing
243 total_files = []
244 for pattern in value:
245 total_files.extend(self.uri_list_from_input_file(pattern))
246 self._input_files = total_files
247 else:
248 raise TypeError("Input files must be a list or string")
249
250 @property
251 def collector(self):
252 """
253 """
254 return self._collector
255
256 @collector.setter
257 def collector(self, collector):
258 """
259 """
260 # check if collector is already a module or if we need to create one
261 # from the name
262 if collector:
263 from basf2 import Module
264 if isinstance(collector, str):
265 from basf2 import register_module
266 collector = register_module(collector)
267 if not isinstance(collector, Module):
268 B2ERROR("Collector needs to be either a Module or the name of such a module")
269
270 self._collector = collector
271
272 def is_valid(self):
273 """
274 """
275 if (not self.collector or not self.input_files):
276 return False
277 else:
278 return True
279
280 @property
282 """
283 """
284 if self.splitter:
285 return self.splitter.max_subjobs
286 else:
287 return None
288
289 @max_collector_jobs.setter
290 def max_collector_jobs(self, value):
291 """
292 """
293 if value is None:
294 self.splitter = None
295 else:
296 self.splitter = MaxSubjobsSplitter(max_subjobs=value)
297
298 @property
300 """
301 """
302 if self.splitter:
303 return self.splitter.max_files_per_subjob
304 else:
305 return None
306
307 @max_files_per_collector_job.setter
309 """
310 """
311 if value is None:
312 self.splitter = None
313 else:
314 self.splitter = MaxFilesSplitter(max_files_per_subjob=value)
315
316
317class CalibrationBase(ABC, Thread):
318 """
319 Abstract base class of Calibration types. The CAF implements the :py:class:`Calibration` class which inherits from
320 this and runs the C++ CalibrationCollectorModule and CalibrationAlgorithm classes. But by inheriting from this
321 class and providing the minimal necessary methods/attributes you could plug in your own Calibration types
322 that doesn't depend on the C++ CAF at all and run everything in your own way.
323
324 .. warning:: Writing your own class inheriting from :py:class:`CalibrationBase` class is not recommended!
325 But it's there if you really need it.
326
327 Parameters:
328 name (str): Name of this calibration object. Should be unique if you are going to run it.
329
330 Keyword Arguments:
331 input_files (list[str]): Input files for this calibration. May contain wildcard expressions usable by `glob.glob`.
332 """
333
335 end_state = "completed"
336
338 fail_state = "failed"
339
340 def __init__(self, name, input_files=None):
341 """
342 """
343 super().__init__()
344
345 self.name = name
346
348
349 self.dependencies = []
350
357 if input_files:
358
359 self.input_files = input_files
360 else:
361 self.input_files = []
362
363
364 self.iov = None
365
367
369 self.save_payloads = True
370
372
373 @abstractmethod
374 def run(self):
375 """
376 The most important method. Runs inside a new Thread and is called from `CalibrationBase.start`
377 once the dependencies of this `CalibrationBase` have returned with state == end_state i.e. "completed".
378 """
379
380 @abstractmethod
381 def is_valid(self):
382 """
383 A simple method you should implement that will return True or False depending on whether
384 the Calibration has been set up correctly and can be run safely.
385 """
386
387 def depends_on(self, calibration):
388 """
389 Parameters:
390 calibration (`CalibrationBase`): The Calibration object which will produce constants that this one depends on.
391
392 Adds dependency of this calibration on another i.e. This calibration
393 will not run until the dependency has completed, and the constants produced
394 will be used via the database chain.
395
396 You can define multiple dependencies for a single calibration simply
397 by calling this multiple times. Be careful when adding the calibration into
398 the `CAF` not to add a circular/cyclic dependency. If you do the sort will return an
399 empty order and the `CAF` processing will fail.
400
401 This function appends to the `CalibrationBase.dependencies` and `CalibrationBase.future_dependencies` attributes of this
402 `CalibrationBase` and the input one respectively. This prevents us having to do too much recalculation later on.
403 """
404 # Check that we don't have two calibration names that are the same
405 if self.name != calibration.name:
406 # Tests if we have the calibrations added as dependencies already and adds if not
407 if calibration not in self.dependencies:
408 self.dependencies.append(calibration)
409 if self not in calibration.dependencies:
410 calibration.future_dependencies.append(self)
411 else:
412 B2WARNING(f"Tried to add {calibration} as a dependency for {self} but they have the same name."
413 "Dependency was not added.")
414
416 """
417 Checks if all of the Calibrations that this one depends on have reached a successful end state.
418 """
419 return all(map(lambda x: x.state == x.end_state, self.dependencies))
420
422 """
423 Returns the list of calibrations in our dependency list that have failed.
424 """
425 failed = []
426 # Use local variable to avoid doxygen "no uniquely matching class member" warning
427 # (multiple classes in this file define self.dependencies with doc comments)
428 dependencies = self.dependencies
429 for calibration in dependencies:
430 if calibration.state == self.fail_state:
431 failed.append(calibration)
432 return failed
433
434 def _apply_calibration_defaults(self, defaults):
435 """
436 We pass in default calibration options from the `CAF` instance here if called.
437 Won't overwrite any options already set.
438 """
439 for key, value in defaults.items():
440 try:
441 if getattr(self, key) is None:
442 setattr(self, key, value)
443 except AttributeError:
444 print(f"The calibration {self.name} does not support the attribute {key}.")
445
446
448 """
449 Every Calibration object must have at least one collector at least one algorithm.
450 You have the option to add in your collector/algorithm by argument here, or add them
451 later by changing the properties.
452
453 If you plan to use multiple `Collection` objects I recommend that you only set the name here and add the Collections
454 separately via `add_collection()`.
455
456 Parameters:
457 name (str): Name of this calibration. It should be unique for use in the `CAF`
458 Keyword Arguments:
459 collector (str, `basf2.Module`): Should be set to a CalibrationCollectorModule() or a string with the module name.
460 algorithms (list, ``ROOT.Belle2.CalibrationAlgorithm``): The algorithm(s) to use for this `Calibration`.
461 input_files (str, list[str]): Input files for use by this Calibration. May contain wildcards usable by `glob.glob`
462
463 A Calibration won't be valid in the `CAF` until it has all of these four attributes set. For example:
464
465 >>> cal = Calibration('TestCalibration1')
466 >>> col1 = register_module('CaTest')
467 >>> cal.add_collection('TestColl', col1)
468
469 or equivalently
470
471 >>> cal = Calibration('TestCalibration1', 'CaTest')
472
473 If you want to run a basf2 :py:class:`path <basf2.Path>` before your collector module when running over data
474
475 >>> cal.pre_collector_path = my_basf2_path
476
477 You don't have to put a RootInput module in this pre-collection path, but you can if
478 you need some special parameters. If you want to process sroot files the you have to explicitly add
479 SeqRootInput to your pre-collection path.
480 The inputFileNames parameter of (Seq)RootInput will be set by the CAF automatically for you.
481
482
483 You can use optional arguments to pass in some/all during initialisation of the `Calibration` class
484
485 >>> cal = Calibration( 'TestCalibration1', 'CaTest', [alg1,alg2], ['/path/to/file.root'])
486
487 you can change the input file list later on, before running with `CAF`
488
489 >>> cal.input_files = ['path/to/*.root', 'other/path/to/file2.root']
490
491 If you have multiple collections from calling `add_collection()` then you should instead set the pre_collector_path,
492 input_files, database chain etc from there. See `Collection`.
493
494 Adding the CalibrationAlgorithm(s) is easy
495
496 >>> alg1 = TestAlgo()
497 >>> cal.algorithms = alg1
498
499 Or equivalently
500
501 >>> cal.algorithms = [alg1]
502
503 Or for multiple algorithms for one collector
504
505 >>> alg2 = TestAlgo()
506 >>> cal.algorithms = [alg1, alg2]
507
508 Note that when you set the algorithms, they are automatically wrapped and stored as a Python class
509 `Algorithm`. To access the C++ algorithm clas underneath directly do:
510
511 >>> cal.algorithms[i].algorithm
512
513 If you have a setup function that you want to run before each of the algorithms, set that with
514
515 >>> cal.pre_algorithms = my_function_object
516
517 If you want a different setup for each algorithm use a list with the same number of elements
518 as your algorithm list.
519
520 >>> cal.pre_algorithms = [my_function1, my_function2, ...]
521
522 You can also specify the dependencies of the calibration on others
523
524 >>> cal.depends_on(cal2)
525
526 By doing this, the `CAF` will respect the ordering of the calibrations and will pass the
527 calibration constants created by earlier completed calibrations to dependent ones.
528 """
529
530 moves = ["submit_collector", "complete", "run_algorithms", "iterate", "fail_fully"]
531
532 alg_output_dir = "algorithm_output"
533
534 checkpoint_states = ["init", "collector_completed", "completed"]
535
536 default_collection_name = "default"
537
538 def __init__(self,
539 name,
540 collector=None,
541 algorithms=None,
542 input_files=None,
543 pre_collector_path=None,
544 database_chain=None,
545 output_patterns=None,
546 max_files_per_collector_job=None,
547 max_collector_jobs=None,
548 backend_args=None
549 ):
550 """
551 """
552
553 self.collections = {}
554
555 self._algorithms = []
556
557 # Default collection added, will have None type and requires setting later via `self.collector`, or will take the
558 # CollectorModule/module name directly.
560 Collection(collector,
561 input_files,
562 pre_collector_path,
563 database_chain,
564 output_patterns,
565 max_files_per_collector_job,
566 max_collector_jobs,
567 backend_args
568 ))
569
570 super().__init__(name, input_files)
571 if algorithms:
572
575 self.algorithms = algorithms
576
577 self.results = {}
578
580 self.max_iterations = None
581
585 self.ignored_runs = None
586 if self.algorithms:
587
591
594 if database_chain:
595 self.database_chain = database_chain
596 else:
597 # This database is already applied to the `Collection` automatically, so don't do it again
598 for tag in reversed(b2conditions.default_globaltags):
599 self.use_central_database(tag, apply_to_default_collection=False)
600
604
606 self.backend = None
607
612
613 self.heartbeat = 3
614
615 self.machine = None
616
617 self._db_path = None
618
619 def add_collection(self, name, collection):
620 """
621 Parameters:
622 name (str): Unique name of this `Collection` in the Calibration.
623 collection (`Collection`): `Collection` object to use.
624
625 Adds a new `Collection` object to the `Calibration`. Any valid Collection will be used in the Calibration.
626 A default Collection is automatically added but isn't valid and won't run unless you have assigned a collector
627 + input files.
628 You can ignore the default one and only add your own custom Collections. You can configure the default from the
629 Calibration(...) arguments or after creating the Calibration object via directly setting the cal.collector, cal.input_files
630 attributes.
631 """
632 if name not in self.collections:
633 self.collections[name] = collection
634 else:
635 B2WARNING(f"A Collection with the name '{name}' already exists in this Calibration. It has not been added."
636 "Please use another name.")
637
638 def is_valid(self):
639 """
640 A full calibration consists of a collector AND an associated algorithm AND input_files.
641
642 Returns False if:
643 1) We are missing any of the above.
644 2) There are multiple Collections and the Collectors have mis-matched granularities.
645 3) Any of our Collectors have granularities that don't match what our Strategy can use.
646 """
647 if not self.algorithms:
648 B2WARNING(f"Empty algorithm list for {self.name}.")
649 return False
650
651 if not any([collection.is_valid() for collection in self.collections.values()]):
652 B2WARNING(f"No valid Collections for {self.name}.")
653 return False
654
655 granularities = []
656 for collection in self.collections.values():
657 if collection.is_valid():
658 collector_params = collection.collector.available_params()
659 for param in collector_params:
660 if param.name == "granularity":
661 granularities.append(param.values)
662 if len(set(granularities)) > 1:
663 B2WARNING("Multiple different granularities set for the Collections in this Calibration.")
664 return False
665
666 for alg in self.algorithms:
667 alg_type = type(alg.algorithm).__name__
668 incorrect_gran = [granularity not in alg.strategy.allowed_granularities for granularity in granularities]
669 if any(incorrect_gran):
670 B2WARNING(f"Selected strategy for {alg_type} does not match a collector's granularity.")
671 return False
672 return True
673
674 def reset_database(self, apply_to_default_collection=True):
675 """
676 Keyword Arguments:
677 apply_to_default_collection (bool): Should we also reset the default collection?
678
679 Remove everything in the database_chain of this Calibration, including the default central database tag automatically
680 included from `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`. This will NOT affect the
681 database chain of any `Collection` other than the default one. You can prevent the default Collection from having its chain
682 reset by setting 'apply_to_default_collection' to False.
683 """
684 self.database_chain = []
685 if self.default_collection_name in self.collections and apply_to_default_collection:
687
688 def use_central_database(self, global_tag, apply_to_default_collection=True):
689 """
690 Parameters:
691 global_tag (str): The central database global tag to use for this calibration.
692
693 Keyword Arguments:
694 apply_to_default_collection (bool): Should we also call use_central_database on the default collection (if it exists)
695
696 Using this allows you to append a central database to the database chain for this calibration.
697 The default database chain is just the central one from
698 `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`.
699 To turn off central database completely or use a custom tag as the base, you should call `Calibration.reset_database`
700 and start adding databases with `Calibration.use_local_database` and `Calibration.use_central_database`.
701
702 Note that the database chain attached to the `Calibration` will only affect the default `Collection` (if it exists),
703 and the algorithm processes. So calling:
704
705 >> cal.use_central_database("global_tag")
706
707 will modify the database chain used by all the algorithms assigned to this `Calibration`, and modifies the database chain
708 assigned to
709
710 >> cal.collections['default'].database_chain
711
712 But calling
713
714 >> cal.use_central_database(file_path, payload_dir, False)
715
716 will add the database to the Algorithm processes, but leave the default Collection database chain untouched.
717 So if you have multiple Collections in this Calibration *their database chains are separate*.
718 To specify an additional `CentralDatabase` for a different collection, you will have to call:
719
720 >> cal.collections['OtherCollection'].use_central_database("global_tag")
721 """
722 central_db = CentralDatabase(global_tag)
723 self.database_chain.append(central_db)
724 if self.default_collection_name in self.collections and apply_to_default_collection:
726
727 def use_local_database(self, filename, directory="", apply_to_default_collection=True):
728 """
729 Parameters:
730 filename (str): The path to the database.txt of the local database
731
732 Keyword Argumemts:
733 directory (str): The path to the payloads directory for this local database.
734 apply_to_default_collection (bool): Should we also call use_local_database on the default collection (if it exists)
735
736 Append a local database to the chain for this calibration.
737 You can call this function multiple times and each database will be added to the chain IN ORDER.
738 The databases are applied to this calibration ONLY.
739 The Local and Central databases applied via these functions are applied to the algorithm processes and optionally
740 the default `Collection` job as a database chain.
741 There are other databases applied to the processes later, checked by basf2 in this order:
742
743 1) Local Database from previous iteration of this Calibration.
744 2) Local Database chain from output of previous dependent Calibrations.
745 3) This chain of Local and Central databases where the last added is checked first.
746
747 Note that this function on the `Calibration` object will only affect the default `Collection` if it exists and if
748 'apply_to_default_collection' remains True. So calling:
749
750 >> cal.use_local_database(file_path, payload_dir)
751
752 will modify the database chain used by all the algorithms assigned to this `Calibration`, and modifies the database chain
753 assigned to
754
755 >> cal.collections['default'].database_chain
756
757 But calling
758
759 >> cal.use_local_database(file_path, payload_dir, False)
760
761 will add the database to the Algorithm processes, but leave the default Collection database chain untouched.
762
763 If you have multiple Collections in this Calibration *their database chains are separate*.
764 To specify an additional `LocalDatabase` for a different collection, you will have to call:
765
766 >> cal.collections['OtherCollection'].use_local_database(file_path, payload_dir)
767
768 """
769 local_db = LocalDatabase(filename, directory)
770 self.database_chain.append(local_db)
771 if self.default_collection_name in self.collections and apply_to_default_collection:
772 self.collections[self.default_collection_name].use_local_database(filename, directory)
773
775 """
776 """
777 if self.default_collection_name in self.collections:
778 return getattr(self.collections[self.default_collection_name], attr)
779 else:
780 B2WARNING(f"You tried to get the attribute '{attr}' from the Calibration '{self.name}', "
781 "but the default collection doesn't exist."
782 f"You should use the cal.collections['CollectionName'].{attr} to access a custom "
783 "collection's attributes directly.")
784 return None
785
786 def _set_default_collection_attribute(self, attr, value):
787 """
788 """
789 if self.default_collection_name in self.collections:
790 setattr(self.collections[self.default_collection_name], attr, value)
791 else:
792 B2WARNING(f"You tried to set the attribute '{attr}' from the Calibration '{self.name}', "
793 "but the default collection doesn't exist."
794 f"You should use the cal.collections['CollectionName'].{attr} to access a custom "
795 "collection's attributes directly.")
796
797 @property
798 def collector(self):
799 """
800 """
801 return self._get_default_collection_attribute("collector")
802
803 @collector.setter
804 def collector(self, collector):
805 """
806 """
807 # check if collector is already a module or if we need to create one
808 # from the name
809 from basf2 import Module
810 if isinstance(collector, str):
811 from basf2 import register_module
812 collector = register_module(collector)
813 if not isinstance(collector, Module):
814 B2ERROR("Collector needs to be either a Module or the name of such a module")
815
816 self._set_default_collection_attribute("collector", collector)
817
818 @property
819 def input_files(self):
820 """
821 """
822 return self._get_default_collection_attribute("input_files")
823
824 @input_files.setter
825 def input_files(self, files):
826 """
827 """
828 self._set_default_collection_attribute("input_files", files)
829
830 @property
831 def files_to_iovs(self):
832 """
833 """
834 return self._get_default_collection_attribute("files_to_iovs")
835
836 @files_to_iovs.setter
837 def files_to_iovs(self, file_map):
838 """
839 """
840 self._set_default_collection_attribute("files_to_iovs", file_map)
841
842 @property
844 """
845 """
846 return self._get_default_collection_attribute("pre_collector_path")
847
848 @pre_collector_path.setter
849 def pre_collector_path(self, path):
850 """
851 """
852 self._set_default_collection_attribute("pre_collector_path", path)
853
854 @property
856 """
857 """
858 return self._get_default_collection_attribute("output_patterns")
859
860 @output_patterns.setter
861 def output_patterns(self, patterns):
862 """
863 """
864 self._set_default_collection_attribute("output_patterns", patterns)
865
866 @property
868 """
869 """
870 return self._get_default_collection_attribute("max_files_per_collector_job")
871
872 @max_files_per_collector_job.setter
873 def max_files_per_collector_job(self, max_files):
874 """
875 """
876 self._set_default_collection_attribute("max_files_per_collector_job", max_files)
877
878 @property
880 """
881 """
882 return self._get_default_collection_attribute("max_collector_jobs")
883
884 @max_collector_jobs.setter
885 def max_collector_jobs(self, max_jobs):
886 """
887 """
888 self._set_default_collection_attribute("max_collector_jobs", max_jobs)
889
890 @property
891 def backend_args(self):
892 """
893 """
894 return self._get_default_collection_attribute("backend_args")
895
896 @backend_args.setter
897 def backend_args(self, args):
898 """
899 """
900 self._set_default_collection_attribute("backend_args", args)
901
902 @property
903 def algorithms(self):
904 """
905 """
906 return self._algorithms
907
908 @algorithms.setter
909 @method_dispatch
910 def algorithms(self, value):
911 """
912 """
913 from ROOT import Belle2 # noqa: make the Belle2 namespace available
914 from ROOT.Belle2 import CalibrationAlgorithm
915 if isinstance(value, CalibrationAlgorithm):
916 self._algorithms = [Algorithm(value)]
917 else:
918 B2ERROR(f"Something other than CalibrationAlgorithm instance passed in ({type(value)}). "
919 "Algorithm needs to inherit from Belle2::CalibrationAlgorithm")
920
921 # Doxygen doesn't understand @method_dispatch overloads (def _) and emits
922 # "no uniquely matching class member found" warnings. Hide them from doxygen.
923 # @cond
924 @algorithms.fset.register(tuple)
925 @algorithms.fset.register(list)
926 def _(self, value):
927 """
928 Alternate algorithms setter for lists and tuples of CalibrationAlgorithms.
929 """
930 from ROOT import Belle2 # noqa: make the Belle2 namespace available
931 from ROOT.Belle2 import CalibrationAlgorithm
932 if value:
933 self._algorithms = []
934 for alg in value:
935 if isinstance(alg, CalibrationAlgorithm):
936 self._algorithms.append(Algorithm(alg))
937 else:
938 B2ERROR(f"Something other than CalibrationAlgorithm instance passed in {type(value)}."
939 "Algorithm needs to inherit from Belle2::CalibrationAlgorithm")
940 # @endcond
941
942 @property
943 def pre_algorithms(self):
944 """
945 Callback run prior to each algorithm iteration.
946 """
947 return [alg.pre_algorithm for alg in self.algorithms]
948
949 @pre_algorithms.setter
950 @method_dispatch
951 def pre_algorithms(self, func):
952 """
953 """
954 if func:
955 for alg in self.algorithms:
956 alg.pre_algorithm = func
957 else:
958 B2ERROR("Something evaluated as False passed in as pre_algorithm function.")
959
960 # @cond
961 @pre_algorithms.fset.register(tuple)
962 @pre_algorithms.fset.register(list)
963 def _(self, values):
964 """
965 Alternate pre_algorithms setter for lists and tuples of functions, should be one per algorithm.
966 """
967 if values:
968 if len(values) == len(self.algorithms):
969 for func, alg in zip(values, self.algorithms):
970 alg.pre_algorithm = func
971 else:
972 B2ERROR("Number of functions and number of algorithms doesn't match.")
973 else:
974 B2ERROR("Empty container passed in for pre_algorithm functions")
975 # @endcond
976
977 @property
978 def strategies(self):
979 """
980 The `caf.strategies.AlgorithmStrategy` or `list` of them used when running the algorithm(s).
981 """
982 return [alg.strategy for alg in self.algorithms]
983
984 @strategies.setter
985 @method_dispatch
986 def strategies(self, strategy):
987 """
988 """
989 if strategy:
990 for alg in self.algorithms:
991 alg.strategy = strategy
992 else:
993 B2ERROR("Something evaluated as False passed in as a strategy.")
994
995 # @cond
996 @strategies.fset.register(tuple)
997 @strategies.fset.register(list)
998 def _(self, values):
999 """
1000 Alternate strategies setter for lists and tuples of functions, should be one per algorithm.
1001 """
1002 if values:
1003 if len(values) == len(self.algorithms):
1004 for strategy, alg in zip(strategies, self.algorithms):
1005 alg.strategy = strategy
1006 else:
1007 B2ERROR("Number of strategies and number of algorithms doesn't match.")
1008 else:
1009 B2ERROR("Empty container passed in for strategies list")
1010 # @endcond
1011
1012 def __repr__(self):
1013 """
1014 """
1015 return self.name
1016
1017 def run(self):
1018 """
1019 Main logic of the Calibration object.
1020 Will be run in a new Thread by calling the start() method.
1021 """
1022
1023 def _print_execution_time(start, stop, calibration, iteration, step):
1024 """
1025 Simple function to calculate the execution time of a step and printing it on stdout.
1026 """
1027 elapsed = stop - start
1028 hours = int(elapsed // 3600)
1029 minutes = int((elapsed % 3600) // 60)
1030 B2INFO(f"Execution time of '{step}' for '{calibration}:{iteration}': {hours}:{minutes} (hours:minutes)")
1031
1032 with CAFDB(self._db_path, read_only=True) as db:
1033 initial_state = db.get_calibration_value(self.name, "checkpoint")
1034 initial_iteration = db.get_calibration_value(self.name, "iteration")
1035 B2INFO(f"Initial status of {self.name} found to be state={initial_state}, iteration={initial_iteration}")
1036 self.machine = CalibrationMachine(self,
1037 iov_to_calibrate=self.iov,
1038 initial_state=initial_state,
1039 iteration=initial_iteration)
1040
1041 self.state = initial_state
1042 self.machine.root_dir = Path(os.getcwd(), self.name)
1043 self.machine.collector_backend = self.backend
1044
1045 # Before we start running, let's clean up any iteration directories from iterations above our initial one.
1046 # Should prevent confusion between attempts if we fail again.
1047 all_iteration_paths = find_int_dirs(self.machine.root_dir)
1048 for iteration_path in all_iteration_paths:
1049 if int(iteration_path.name) > initial_iteration:
1050 shutil.rmtree(iteration_path)
1051
1052 while self.state != self.end_state and self.state != self.fail_state:
1053 if self.state == "init":
1054 start = time()
1055 try:
1056 B2INFO(f"Attempting collector submission for calibration {self.name}.")
1058 except Exception as err:
1059 _print_execution_time(start, time(), self.name, self.machine.iteration, "collectors")
1060 B2FATAL(str(err))
1061
1062 self._poll_collector()
1063
1064 # Print the execution time of the collectors step
1065 # This includes also the time needed to submit the collector jobs
1066 _print_execution_time(start, time(), self.name, self.machine.iteration, "collectors")
1067
1068 # If we failed take us to the final fail state
1069 if self.state == "collector_failed":
1070 self.machine.fail_fully()
1071 return
1072
1073 # It's possible that we might raise an error while attempting to run due
1074 # to some problems e.g. Missing collector output files
1075 # We catch the error and exit with failed state so the CAF will stop
1076 start = time()
1077 try:
1078 B2INFO(f"Attempting to run algorithms for calibration {self.name}.")
1079 self.machine.run_algorithms()
1080 except MachineError as err:
1081 B2ERROR(str(err))
1082 self.machine.fail()
1083
1084 # Print the execution time of the algorithm step
1085 _print_execution_time(start, time(), self.name, self.machine.iteration, "algorihtm")
1086
1087 # If we failed take us to the final fail state
1088 if self.machine.state == "algorithms_failed":
1089 self.machine.fail_fully()
1090 return
1091
1093 """
1094 """
1095 while self.state == "running_collector":
1096 try:
1097 self.machine.complete()
1098 # ConditionError is thrown when the conditions for the transition have returned false, it's not serious.
1099 except ConditionError:
1100 try:
1101 B2DEBUG(29, f"Checking if collector jobs for calibration {self.name} have failed.")
1102 self.machine.fail()
1103 except ConditionError:
1104 pass
1105 sleep(self.heartbeat) # Sleep until we want to check again
1106
1107 @property
1108 def state(self):
1109 """
1110 The current major state of the calibration in the database file. The machine may have a different state.
1111 """
1112 with CAFDB(self._db_path, read_only=True) as db:
1113 state = db.get_calibration_value(self.name, "state")
1114 return state
1115
1116 @state.setter
1117 def state(self, state):
1118 """
1119 """
1120 B2DEBUG(29, f"Setting {self.name} to state {state}.")
1121 with CAFDB(self._db_path) as db:
1122 db.update_calibration_value(self.name, "state", str(state))
1123 if state in self.checkpoint_states:
1124 db.update_calibration_value(self.name, "checkpoint", str(state))
1125 B2DEBUG(29, f"{self.name} set to {state}.")
1126
1127 @property
1128 def iteration(self):
1129 """
1130 Retrieves the current iteration number in the database file.
1131
1132 Returns:
1133 int: The current iteration number
1134 """
1135 with CAFDB(self._db_path, read_only=True) as db:
1136 iteration = db.get_calibration_value(self.name, "iteration")
1137 return iteration
1138
1139 @iteration.setter
1140 def iteration(self, iteration):
1141 """
1142 """
1143 B2DEBUG(29, f"Setting {self.name} to {iteration}.")
1144 with CAFDB(self._db_path) as db:
1145 db.update_calibration_value(self.name, "iteration", iteration)
1146 B2DEBUG(29, f"{self.name} set to {self.iteration}.")
1147
1148
1150 """
1151 Parameters:
1152 algorithm: The CalibrationAlgorithm instance that we want to execute.
1153 Keyword Arguments:
1154 data_input : An optional function that sets the input files of the algorithm.
1155 pre_algorithm : An optional function that runs just prior to execution of the algorithm.
1156 Useful for set up e.g. module initialisation
1157
1158 This is a simple wrapper class around the C++ CalibrationAlgorithm class.
1159 It helps to add functionality to algorithms for use by the Calibration and CAF classes rather
1160 than separating the logic into those classes directly.
1161
1162 This is **not** currently a class that a user should interact with much during `CAF`
1163 setup (unless you're doing something advanced).
1164 The `Calibration` class should be doing the most of the creation of the defaults for these objects.
1165
1166 Setting the `data_input` function might be necessary if you have set the `Calibration.output_patterns`.
1167 Also, setting the `pre_algorithm` to a function that should execute prior to each `strategies.AlgorithmStrategy`
1168 is often useful i.e. by calling for the Geometry module to initialise.
1169 """
1170
1171 def __init__(self, algorithm, data_input=None, pre_algorithm=None):
1172 """
1173 """
1174
1175 self.algorithm = algorithm
1176
1177 cppname = type(algorithm).__cpp_name__
1178
1179 self.name = cppname[cppname.rfind('::') + 2:]
1180
1183 self.data_input = data_input
1184 if not self.data_input:
1186
1189 self.pre_algorithm = pre_algorithm
1190
1193
1198 self.params = {}
1199
1200 def default_inputdata_setup(self, input_file_paths):
1201 """
1202 Simple setup to set the input file names to the algorithm. Applied to the data_input attribute
1203 by default. This simply takes all files returned from the `Calibration.output_patterns` and filters
1204 for only the CollectorOutput.root files. Then it sets them as input files to the CalibrationAlgorithm class.
1205 """
1206 collector_output_files = list(filter(lambda file_path: "CollectorOutput.root" == Path(file_path).name,
1207 input_file_paths))
1208 info_lines = [f"Input files used in {self.name}:"]
1209 info_lines.extend(collector_output_files)
1210 B2INFO_MULTILINE(info_lines)
1211 self.algorithm.setInputFileNames(collector_output_files)
1212
1213
1214class CAF():
1215 """
1216 Parameters:
1217 calibration_defaults (dict): A dictionary of default options for calibrations run by this `CAF` instance e.g.
1218
1219 >>> calibration_defaults={"max_iterations":2}
1220
1221 This class holds `Calibration` objects and processes them. It defines the initial configuration/setup
1222 for the calibrations. But most of the real processing is done through the `caf.state_machines.CalibrationMachine`.
1223
1224 The `CAF` class essentially does some initial setup, holds the `CalibrationBase` instances and calls the
1225 `CalibrationBase.start` when the dependencies are met.
1226
1227 Much of the checking for consistency is done in this class so that no processing is done with an invalid
1228 setup. Choosing which files to use as input should be done from outside during the setup of the `CAF` and
1229 `CalibrationBase` instances.
1230 """
1231
1232
1233 _db_name = "caf_state.db"
1234
1235 default_calibration_config = {
1236 "max_iterations": 5,
1237 "ignored_runs": []
1238 }
1239
1240 def __init__(self, calibration_defaults=None):
1241 """
1242 """
1243
1245
1248
1251
1252 self.output_dir = "calibration_results"
1253
1254 self.order = None
1255
1256 self._backend = None
1257
1258 self.heartbeat = 5
1259
1260 if not calibration_defaults:
1261 calibration_defaults = {}
1262
1264 self.calibration_defaults = {**self.default_calibration_config, **calibration_defaults}
1265
1266 self._db_path = None
1267
1268 def add_calibration(self, calibration):
1269 """
1270 Adds a `Calibration` that is to be used in this program to the list.
1271 Also adds an empty dependency list to the overall dictionary.
1272 You should not directly alter a `Calibration` object after it has been
1273 added here.
1274 """
1275 if calibration.is_valid():
1276 if calibration.name not in self.calibrations:
1277 self.calibrations[calibration.name] = calibration
1278 else:
1279 B2WARNING(f"Tried to add a calibration with the name {calibration.name} twice.")
1280 else:
1281 B2WARNING(f"Tried to add incomplete/invalid calibration ({calibration.name}) to the framework."
1282 "It was not added and will not be part of the final process.")
1283
1285 """
1286 This checks the future and past dependencies of each `Calibration` in the `CAF`.
1287 If any dependencies are not known to the `CAF` then they are removed from the `Calibration`
1288 object directly.
1289 """
1290 calibration_names = [calibration.name for calibration in self.calibrations.values()]
1291
1292 def is_dependency_in_caf(dependency):
1293 """
1294 Quick function to use with filter() and check dependencies against calibrations known to `CAF`
1295 """
1296 dependency_in_caf = dependency.name in calibration_names
1297 if not dependency_in_caf:
1298 B2WARNING(f"The calibration {dependency.name} is a required dependency but is not in the CAF."
1299 " It has been removed as a dependency.")
1300 return dependency_in_caf
1301
1302 # Check that there aren't dependencies on calibrations not added to the framework
1303 # Remove them from the calibration objects if there are.
1304 for calibration in self.calibrations.values():
1305 filtered_future_dependencies = list(filter(is_dependency_in_caf, calibration.future_dependencies))
1306 calibration.future_dependencies = filtered_future_dependencies
1307
1308 filtered_dependencies = list(filter(is_dependency_in_caf, calibration.dependencies))
1309 calibration.dependencies = filtered_dependencies
1310
1312 """
1313 - Uses dependency attributes of calibrations to create a dependency dictionary and passes it
1314 to a sorting algorithm.
1315 - Returns valid OrderedDict if sort was successful, empty one if it failed (most likely a cyclic dependency)
1316 """
1317 # First remove any dependencies on calibrations not added to the CAF
1319 # Filling dependencies dictionaries of CAF for sorting, only explicit dependencies for now
1320 # Note that they currently use the names not the calibration objects.
1321 for calibration in self.calibrations.values():
1322 future_dependencies_names = [dependency.name for dependency in calibration.future_dependencies]
1323 past_dependencies_names = [dependency.name for dependency in calibration.dependencies]
1324
1325 self.future_dependencies[calibration.name] = future_dependencies_names
1326 self.dependencies[calibration.name] = past_dependencies_names
1327 # Gives us a list of A (not THE) valid ordering and checks for cyclic dependencies
1328 order = topological_sort(self.future_dependencies)
1329 if not order:
1330 return False
1331
1332 # Get an ordered dictionary of the sort order but including all implicit dependencies.
1333 ordered_full_dependencies = all_dependencies(self.future_dependencies, order)
1334
1335 # Return all the implicit+explicit past dependencies
1336 full_past_dependencies = past_from_future_dependencies(ordered_full_dependencies)
1337 # Correct each calibration's dependency list to reflect the implicit dependencies
1338 for calibration in self.calibrations.values():
1339 full_deps = full_past_dependencies[calibration.name]
1340 explicit_deps = [cal.name for cal in calibration.dependencies]
1341 for dep in full_deps:
1342 if dep not in explicit_deps:
1343 calibration.dependencies.append(self.calibrations[dep])
1344 # At this point the calibrations have their full dependencies but they aren't in topological
1345 # sort order. Correct that here
1346 ordered_dependency_list = []
1347 for ordered_calibration_name in order:
1348 if ordered_calibration_name in [dep.name for dep in calibration.dependencies]:
1349 ordered_dependency_list.append(self.calibrations[ordered_calibration_name])
1350 calibration.dependencies = ordered_dependency_list
1351 order = ordered_full_dependencies
1352 # We should also patch in all of the implicit dependencies for the calibrations
1353 return order
1354
1356 """
1357 Makes sure that the CAF has a valid backend setup. If one isn't set by the user (or if the
1358 one that is stored isn't a valid Backend object) we should create a default Local backend.
1359 """
1360 if not isinstance(self._backend, caf.backends.Backend):
1361
1362 self.backend = caf.backends.Local()
1363
1365 """
1366 Checks all current calibrations and removes any invalid Collections from their collections list.
1367 """
1368 B2INFO("Checking for any invalid Collections in Calibrations.")
1369 for calibration in self.calibrations.values():
1370 valid_collections = {}
1371 for name, collection in calibration.collections.items():
1372 if collection.is_valid():
1373 valid_collections[name] = collection
1374 else:
1375 B2WARNING(f"Removing invalid Collection '{name}' from Calibration '{calibration.name}'.")
1376 calibration.collections = valid_collections
1377
1378 def run(self, iov=None):
1379 """
1380 Keyword Arguments:
1381 iov(`caf.utils.IoV`): IoV to calibrate for this processing run. Only the input files necessary to calibrate
1382 this IoV will be used in the collection step.
1383
1384 This function runs the overall calibration job, saves the outputs to the output_dir directory,
1385 and creates database payloads.
1386
1387 Upload of final databases is not done here. This simply creates the local databases in
1388 the output directory. You should check the validity of your new local database before uploading
1389 to the conditions DB via the basf2 tools/interface to the DB.
1390 """
1391 if not self.calibrations:
1392 B2FATAL("There were no Calibration objects to run. Maybe you tried to add invalid ones?")
1393 # Checks whether the dependencies we've added will give a valid order
1394 order = self._order_calibrations()
1395 if not order:
1396 B2FATAL("Couldn't order the calibrations properly. Could be a cyclic dependency.")
1397
1398 # Check that a backend has been set and use default Local() one if not
1399 self._check_backend()
1400
1402
1403 # Creates the overall output directory and reset the attribute to use an absolute path to it.
1404 self.output_dir = self._make_output_dir()
1405
1406 # Creates a SQLite DB to save the status of the various calibrations
1407 self._make_database()
1408
1409 # Enter the overall output dir during processing and opena connection to the DB
1410 with temporary_workdir(self.output_dir):
1411 db = CAFDB(self._db_path)
1412 db.open()
1413 db_initial_calibrations = db.query("select * from calibrations").fetchall()
1414 for calibration in self.calibrations.values():
1415 # Apply defaults given to the `CAF` to the calibrations if they aren't set
1416 calibration._apply_calibration_defaults(self.calibration_defaults)
1417 calibration._db_path = self._db_path
1418 calibration.output_database_dir = Path(self.output_dir, calibration.name, "outputdb").as_posix()
1419 calibration.iov = iov
1420 if not calibration.backend:
1421 calibration.backend = self.backend
1422 # Do some checking of the db to see if we need to add an entry for this calibration
1423 if calibration.name not in [db_cal[0] for db_cal in db_initial_calibrations]:
1424 db.insert_calibration(calibration.name)
1425 db.commit()
1426 else:
1427 for cal_info in db_initial_calibrations:
1428 if cal_info[0] == calibration.name:
1429 cal_initial_state = cal_info[2]
1430 cal_initial_iteration = cal_info[3]
1431 B2INFO(f"Previous entry in database found for {calibration.name}.")
1432 B2INFO(f"Setting {calibration.name} state to checkpoint state '{cal_initial_state}'.")
1433 calibration.state = cal_initial_state
1434 B2INFO(f"Setting {calibration.name} iteration to '{cal_initial_iteration}'.")
1435 calibration.iteration = cal_initial_iteration
1436 # Daemonize so that it exits if the main program exits
1437 calibration.daemon = True
1438
1439 db.close()
1440
1441 # Is it possible to keep going?
1442 keep_running = True
1443 while keep_running:
1444 keep_running = False
1445 # Do we have calibrations that may yet complete?
1446 remaining_calibrations = []
1447
1448 for calibration in self.calibrations.values():
1449 # Find the currently ended calibrations (may not be joined yet)
1450 if (calibration.state == CalibrationBase.end_state or calibration.state == CalibrationBase.fail_state):
1451 # Search for any alive Calibrations and join them
1452 if calibration.is_alive():
1453 B2DEBUG(29, f"Joining {calibration.name}.")
1454 calibration.join()
1455 else:
1456 if calibration.dependencies_met():
1457 if not calibration.is_alive():
1458 B2DEBUG(29, f"Starting {calibration.name}.")
1459 try:
1460 calibration.start()
1461 except RuntimeError:
1462 # Catch the case when the calibration just finished so it ended up here
1463 # in the "else" and not above where it should have been joined.
1464 B2DEBUG(29, f"{calibration.name} probably just finished, join it later.")
1465 remaining_calibrations.append(calibration)
1466 else:
1467 if not calibration.failed_dependencies():
1468 remaining_calibrations.append(calibration)
1469 if remaining_calibrations:
1470 keep_running = True
1471 # Loop over jobs that the calibrations want submitted and submit them.
1472 # We do this here because some backends don't like us submitting in parallel from multiple CalibrationThreads
1473 # So this is like a mini job queue without getting too clever with it
1474 for calibration in remaining_calibrations:
1475 for job in calibration.jobs_to_submit[:]:
1476 calibration.backend.submit(job)
1477 calibration.jobs_to_submit.remove(job)
1478 sleep(self.heartbeat)
1479
1480 B2INFO("Printing summary of final CAF status.")
1481 with CAFDB(self._db_path, read_only=True) as db:
1482 print(db.output_calibration_table())
1483
1484 @property
1485 def backend(self):
1486 """
1487 The `backend <backends.Backend>` that runs the collector job.
1488 When set, this is checked that a `backends.Backend` class instance was passed in.
1489 """
1490 return self._backend
1491
1492 @backend.setter
1493 def backend(self, backend):
1494 """
1495 """
1496 if isinstance(backend, caf.backends.Backend):
1497 self._backend = backend
1498 else:
1499 B2ERROR('Backend property must inherit from Backend class.')
1500
1502 """
1503 Creates the output directory. If it already exists we are now going to try and restart the program from the last state.
1504
1505 Returns:
1506 str: The absolute path of the new output_dir
1507 """
1508 p = Path(self.output_dir).resolve()
1509 if p.is_dir():
1510 B2INFO(f"{p.as_posix()} output directory already exists. "
1511 "We will try to restart from the previous finishing state.")
1512 return p.as_posix()
1513 else:
1514 p.mkdir(parents=True)
1515 if p.is_dir():
1516 return p.as_posix()
1517 else:
1518 raise FileNotFoundError(f"Attempted to create output_dir {p.as_posix()}, but it didn't work.")
1519
1521 """
1522 Creates the CAF status database. If it already exists we don't overwrite it.
1523 """
1524 self._db_path = Path(self.output_dir, self._db_name).absolute()
1525 if self._db_path.exists():
1526 B2INFO(f"Previous CAF database found {self._db_path}")
1527 # Will create a new database + tables, or do nothing but checks we can connect to existing one
1528 with CAFDB(self._db_path):
1529 pass
pre_algorithm
Function called after data_input but before algorithm.execute to do any remaining setup.
__init__(self, algorithm, data_input=None, pre_algorithm=None)
data_input
Function called before the pre_algorithm method to setup the input data that the CalibrationAlgorithm...
dict params
Parameters that could be used in the execution of the algorithm strategy/runner to modify behaviour.
algorithm
CalibrationAlgorithm instance (assumed to be true since the Calibration class checks)
strategy
The algorithm stratgey that will be used when running over the collected data.
default_inputdata_setup(self, input_file_paths)
dict dependencies
Dictionary of dependencies of Calibration objects, where value is the list of Calibration objects tha...
add_calibration(self, calibration)
_remove_missing_dependencies(self)
_order_calibrations(self)
int heartbeat
The heartbeat (seconds) between polling for Calibrations that are finished.
dict default_calibration_config
The defaults for Calibrations.
_make_database(self)
dict calibrations
Dictionary of calibrations for this CAF instance.
dict calibration_defaults
Default options applied to each calibration known to the CAF, if the Calibration has these defined by...
_backend
Private backend attribute.
dict future_dependencies
Dictionary of future dependencies of Calibration objects, where the value is all calibrations that wi...
_check_backend(self)
_db_path
The path of the SQLite DB.
order
The ordering and explicit future dependencies of calibrations.
__init__(self, calibration_defaults=None)
run(self, iov=None)
str _db_name
The name of the SQLite DB that gets created.
backend
backend property
_prune_invalid_collections(self)
_make_output_dir(self)
str output_dir
Output path to store results of calibration and bookkeeping information.
list input_files
Files used for collection procedure.
Definition framework.py:359
list dependencies
List of calibration objects, where each one is a dependency of this one.
Definition framework.py:349
dict files_to_iovs
File -> Iov dictionary, should be :
Definition framework.py:356
__init__(self, name, input_files=None)
Definition framework.py:340
_apply_calibration_defaults(self, defaults)
Definition framework.py:434
list future_dependencies
List of calibration objects that depend on this one.
Definition framework.py:347
str end_state
The name of the successful completion state.
Definition framework.py:335
str fail_state
The name of the failure state.
Definition framework.py:338
bool save_payloads
Marks this Calibration as one which has payloads that should be copied and uploaded.
Definition framework.py:369
list jobs_to_submit
A simple list of jobs that this Calibration wants submitted at some point.
Definition framework.py:371
str output_database_dir
The directory where we'll store the local database payloads from this calibration.
Definition framework.py:366
name
Name of calibration object.
Definition framework.py:345
depends_on(self, calibration)
Definition framework.py:387
iov
IoV which will be calibrated.
Definition framework.py:364
ignored_runs
List of ExpRun that will be ignored by this Calibration.
Definition framework.py:585
use_central_database(self, global_tag, apply_to_default_collection=True)
Definition framework.py:688
list database_chain
The database chain that is applied to the algorithms.
Definition framework.py:593
strategies
The strategy that the algorithm(s) will be run against.
Definition framework.py:590
dict results
Output results of algorithms for each iteration.
Definition framework.py:577
int heartbeat
This calibration's sleep time before rechecking to see if it can move state.
Definition framework.py:613
int collector_full_update_interval
While checking if the collector is finished we don't bother wastefully checking every subjob's status...
Definition framework.py:611
machine
The caf.state_machines.CalibrationMachine that we will run to process this calibration start to finis...
Definition framework.py:615
list checkpoint_states
Checkpoint states which we are allowed to restart from.
Definition framework.py:534
_db_path
Location of a SQLite database that will save the state of the calibration so that it can be restarted...
Definition framework.py:617
max_iterations
Variable to define the maximum number of iterations for this calibration specifically.
Definition framework.py:580
use_local_database(self, filename, directory="", apply_to_default_collection=True)
Definition framework.py:727
_get_default_collection_attribute(self, attr)
Definition framework.py:774
list _algorithms
Internal calibration algorithms stored for this calibration.
Definition framework.py:555
reset_database(self, apply_to_default_collection=True)
Definition framework.py:674
add_collection(self, name, collection)
Definition framework.py:619
algorithms
Algorithm classes that will be run by this Calibration.
Definition framework.py:575
backend
The backend <backends.Backend> we'll use for our collector submission in this calibration.
Definition framework.py:606
algorithms_runner
The class that runs all the algorithms in this Calibration using their assigned :py:class:caf....
Definition framework.py:603
max_files_per_collector_job(self)
Definition framework.py:867
dict collections
Collections stored for this calibration.
Definition framework.py:553
__init__(self, name, collector=None, algorithms=None, input_files=None, pre_collector_path=None, database_chain=None, output_patterns=None, max_files_per_collector_job=None, max_collector_jobs=None, backend_args=None)
Definition framework.py:549
str default_collection_name
Default collection name.
Definition framework.py:536
_set_default_collection_attribute(self, attr, value)
Definition framework.py:786
list job_cmd
The Collector caf.backends.Job.cmd attribute.
Definition framework.py:153
int default_max_collector_jobs
The default maximum number of collector jobs to create.
Definition framework.py:69
list input_files
Internal input_files stored for this calibration.
Definition framework.py:89
dict backend_args
Dictionary passed to the collector Job object to configure how the caf.backends.Backend instance shou...
Definition framework.py:130
_collector
Internal storage of collector attribute.
Definition framework.py:270
dict files_to_iovs
File -> Iov dictionary, should be :
Definition framework.py:98
list database_chain
The database chain used for this Collection.
Definition framework.py:137
__init__(self, collector=None, input_files=None, pre_collector_path=None, database_chain=None, output_patterns=None, max_files_per_collector_job=None, max_collector_jobs=None, backend_args=None)
Definition framework.py:83
list output_patterns
Output patterns of files produced by collector which will be used to pass to the Algorithm....
Definition framework.py:110
splitter
The SubjobSplitter to use when constructing collector subjobs from the overall Job object.
Definition framework.py:116
_input_files
set input files
Definition framework.py:240
pre_collector_path
Since many collectors require some different setup, if you set this attribute to a basf2....
Definition framework.py:103
max_files_per_collector_job(self)
Definition framework.py:299
collector
Collector module of this collection.
Definition framework.py:87
uri_list_from_input_file(input_file)
Definition framework.py:208
use_local_database(self, filename, directory="")
Definition framework.py:187
use_central_database(self, global_tag)
Definition framework.py:162
STL class.