Belle II Software development
framework.py
1#!/usr/bin/env python3
2
3# disable doxygen check for this file
4# @cond
5
6
13
14"""
15This module implements several objects/functions to configure and run calibrations.
16These classes are used to construct the workflow of the calibration job.
17The actual processing code is mostly in the `caf.state_machines` module.
18"""
19
20__all__ = ["CalibrationBase", "Calibration", "Algorithm", "CAF"]
21
22import os
23from threading import Thread
24from time import sleep
25from pathlib import Path
26import shutil
27from glob import glob
28
29from basf2 import B2ERROR, B2WARNING, B2INFO, B2FATAL, B2DEBUG
30from basf2 import find_file
31from basf2 import conditions as b2conditions
32
33from abc import ABC, abstractmethod
34
35import caf
36from caf.utils import B2INFO_MULTILINE
37from caf.utils import past_from_future_dependencies
38from caf.utils import topological_sort
39from caf.utils import all_dependencies
40from caf.utils import method_dispatch
41from caf.utils import temporary_workdir
42from caf.utils import find_int_dirs
43from caf.utils import LocalDatabase
44from caf.utils import CentralDatabase
45from caf.utils import parse_file_uri
46
47import caf.strategies as strategies
48import caf.runners as runners
49from caf.backends import MaxSubjobsSplitter, MaxFilesSplitter
50from caf.state_machines import CalibrationMachine, ConditionError, MachineError
51from caf.database import CAFDB
52
53
54class Collection():
55 """
56 Keyword Arguments:
57 collector (str, basf2.Module): The collector module or module name for this `Collection`.
58 input_files (list[str]): The input files to be used for only this `Collection`.
59 pre_collection_path (basf2.Path): The reconstruction `basf2.Path` to be run prior to the Collector module.
60 database_chain (list[CentralDatabase, LocalDatabase]): The database chain to be used initially for this `Collection`.
61 output_patterns (list[str]): Output patterns of files produced by collector which will be used to pass to the
62 `Algorithm.data_input` function. Setting this here, replaces the default completely.
63 max_files_for_collector_job (int): Maximum number of input files sent to each collector subjob for this `Collection`.
64 Technically this sets the SubjobSplitter to be used, not compatible with max_collector_jobs.
65 max_collector_jobs (int): Maximum number of collector subjobs for this `Collection`.
66 Input files are split evenly between them. Technically this sets the SubjobSplitter to be used. Not compatible with
67 max_files_for_collector_job.
68 backend_args (dict): The args for the backend submission of this `Collection`.
69 """
70
72 default_max_collector_jobs = 1000
73
75 job_config = "collector_job.json"
76
77 def __init__(self,
78 collector=None,
79 input_files=None,
80 pre_collector_path=None,
81 database_chain=None,
82 output_patterns=None,
83 max_files_per_collector_job=None,
84 max_collector_jobs=None,
85 backend_args=None
86 ):
87
88 self.collector = collector
89
90 self.input_files = []
91 if input_files:
92 self.input_files = input_files
93
99 self.files_to_iovs = {}
100
104 self.pre_collector_path = None
105 if pre_collector_path:
106 self.pre_collector_path = pre_collector_path
107
111 self.output_patterns = ["CollectorOutput.root"]
112 if output_patterns:
113 self.output_patterns = output_patterns
114
115
117 self.splitter = None
118 if max_files_per_collector_job and max_collector_jobs:
119 B2FATAL("Cannot set both 'max_files_per_collector_job' and 'max_collector_jobs' of a collection!")
120 elif max_files_per_collector_job:
121 self.max_files_per_collector_job = max_files_per_collector_job
122 elif max_collector_jobs:
123 self.max_collector_jobs = max_collector_jobs
124 else:
125 self.max_collector_jobs = self.default_max_collector_jobs
126
127
129 self.backend_args = {}
130 if backend_args:
131 self.backend_args = backend_args
132
133 if database_chain:
134
137 self.database_chain = database_chain
138 else:
139 self.database_chain = []
140 # This may seem weird but the changes to the DB interface mean that they have effectively swapped from being
141 # described well by appending to a list to a deque. So we do bit of reversal to translate it back and make the
142 # most important GT the last one encountered.
143 for tag in reversed(b2conditions.default_globaltags):
144 self.use_central_database(tag)
145
146 self.job_script = Path(find_file("calibration/scripts/caf/run_collector_path.py")).absolute()
147 """The basf2 steering file that will be used for Collector jobs run by this collection.
148This script will be copied into subjob directories as part of the input sandbox."""
149
150
151 self.job_cmd = ["basf2", self.job_script.name, "--job-information job_info.json"]
152
153 def reset_database(self):
154 """
155 Remove everything in the database_chain of this Calibration, including the default central database
156 tag automatically included from `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`.
157 """
158 self.database_chain = []
159
160 def use_central_database(self, global_tag):
161 """
162 Parameters:
163 global_tag (str): The central database global tag to use for this calibration.
164
165 Using this allows you to add a central database to the head of the global tag database chain for this collection.
166 The default database chain is just the central one from
167 `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`.
168 The input file global tag will always be overridden and never used unless explicitly set.
169
170 To turn off central database completely or use a custom tag as the base, you should call `Calibration.reset_database`
171 and start adding databases with `Calibration.use_local_database` and `Calibration.use_central_database`.
172
173 Alternatively you could set an empty list as the input database_chain when adding the Collection to the Calibration.
174
175 NOTE!! Since ``release-04-00-00`` the behaviour of basf2 conditions databases has changed.
176 All local database files MUST now be at the head of the 'chain', with all central database global tags in their own
177 list which will be checked after all local database files have been checked.
178
179 So even if you ask for ``["global_tag1", "localdb/database.txt", "global_tag2"]`` to be the database chain, the real order
180 that basf2 will use them is ``["global_tag1", "global_tag2", "localdb/database.txt"]`` where the file is checked first.
181 """
182 central_db = CentralDatabase(global_tag)
183 self.database_chain.append(central_db)
184
185 def use_local_database(self, filename, directory=""):
186 """
187 Parameters:
188 filename (str): The path to the database.txt of the local database
189 directory (str): The path to the payloads directory for this local database.
190
191 Append a local database to the chain for this collection.
192 You can call this function multiple times and each database will be added to the chain IN ORDER.
193 The databases are applied to this collection ONLY.
194
195 NOTE!! Since release-04-00-00 the behaviour of basf2 conditions databases has changed.
196 All local database files MUST now be at the head of the 'chain', with all central database global tags in their own
197 list which will be checked after all local database files have been checked.
198
199 So even if you ask for ["global_tag1", "localdb/database.txt", "global_tag2"] to be the database chain, the real order
200 that basf2 will use them is ["global_tag1", "global_tag2", "localdb/database.txt"] where the file is checked first.
201 """
202 local_db = LocalDatabase(filename, directory)
203 self.database_chain.append(local_db)
204
205 @staticmethod
206 def uri_list_from_input_file(input_file):
207 """
208 Parameters:
209 input_file (str): A local file/glob pattern or XROOTD URI
210
211 Returns:
212 list: A list of the URIs found from the initial string.
213 """
214 # By default we assume it is a local file path if no "scheme" is given
215 uri = parse_file_uri(input_file)
216 if uri.scheme == "file":
217 # For local files we also perform a glob just in case it is a wildcard pattern.
218 # That way we will have all the uris of files separately
219 uris = [parse_file_uri(f).geturl() for f in glob(input_file)]
220 else:
221 # Just let everything else through and hop the backend can figure it out
222 uris = [input_file]
223 return uris
224
225 @property
226 def input_files(self):
227 return self._input_files
228
229 @input_files.setter
230 def input_files(self, value):
231 if isinstance(value, str):
232 # If it's a string, we convert to a list of URIs
233 self._input_files = self.uri_list_from_input_file(value)
234 elif isinstance(value, list):
235 # If it's a list we loop and do the same thing
236 total_files = []
237 for pattern in value:
238 total_files.extend(self.uri_list_from_input_file(pattern))
239 self._input_files = total_files
240 else:
241 raise TypeError("Input files must be a list or string")
242
243 @property
244 def collector(self):
245 """
246 """
247 return self._collector
248
249 @collector.setter
250 def collector(self, collector):
251 """
252 """
253 # check if collector is already a module or if we need to create one
254 # from the name
255 if collector:
256 from basf2 import Module
257 if isinstance(collector, str):
258 from basf2 import register_module
259 collector = register_module(collector)
260 if not isinstance(collector, Module):
261 B2ERROR("Collector needs to be either a Module or the name of such a module")
262
263 self._collector = collector
264
265 def is_valid(self):
266 if (not self.collector or not self.input_files):
267 return False
268 else:
269 return True
270
271 @property
272 def max_collector_jobs(self):
273 if self.splitter:
274 return self.splitter.max_subjobs
275 else:
276 return None
277
278 @max_collector_jobs.setter
279 def max_collector_jobs(self, value):
280 if value is None:
281 self.splitter = None
282 else:
283 self.splitter = MaxSubjobsSplitter(max_subjobs=value)
284
285 @property
286 def max_files_per_collector_job(self):
287 if self.splitter:
288 return self.splitter.max_files_per_subjob
289 else:
290 return None
291
292 @max_files_per_collector_job.setter
293 def max_files_per_collector_job(self, value):
294 if value is None:
295 self.splitter = None
296 else:
297 self.splitter = MaxFilesSplitter(max_files_per_subjob=value)
298
299
300class CalibrationBase(ABC, Thread):
301 """
302 Abstract base class of Calibration types. The CAF implements the :py:class:`Calibration` class which inherits from
303 this and runs the C++ CalibrationCollectorModule and CalibrationAlgorithm classes. But by inheriting from this
304 class and providing the minimal necessary methods/attributes you could plug in your own Calibration types
305 that doesn't depend on the C++ CAF at all and run everything in your own way.
306
307 .. warning:: Writing your own class inheriting from :py:class:`CalibrationBase` class is not recommended!
308 But it's there if you really need it.
309
310 Parameters:
311 name (str): Name of this calibration object. Should be unique if you are going to run it.
312
313 Keyword Arguments:
314 input_files (list[str]): Input files for this calibration. May contain wildcard expressions usable by `glob.glob`.
315 """
316
318 end_state = "completed"
319
321 fail_state = "failed"
322
323 def __init__(self, name, input_files=None):
324 """
325 """
326 super().__init__()
327
328 self.name = name
329
330 self.future_dependencies = []
331
332 self.dependencies = []
333
339 self.files_to_iovs = {}
340 if input_files:
341
342 self.input_files = input_files
343 else:
344 self.input_files = []
345
346
347 self.iov = None
348
349 self.output_database_dir = ""
350
352 self.save_payloads = True
353
354 self.jobs_to_submit = []
355
356 @abstractmethod
357 def run(self):
358 """
359 The most important method. Runs inside a new Thread and is called from `CalibrationBase.start`
360 once the dependencies of this `CalibrationBase` have returned with state == end_state i.e. "completed".
361 """
362
363 @abstractmethod
364 def is_valid(self):
365 """
366 A simple method you should implement that will return True or False depending on whether
367 the Calibration has been set up correctly and can be run safely.
368 """
369
370 def depends_on(self, calibration):
371 """
372 Parameters:
373 calibration (`CalibrationBase`): The Calibration object which will produce constants that this one depends on.
374
375 Adds dependency of this calibration on another i.e. This calibration
376 will not run until the dependency has completed, and the constants produced
377 will be used via the database chain.
378
379 You can define multiple dependencies for a single calibration simply
380 by calling this multiple times. Be careful when adding the calibration into
381 the `CAF` not to add a circular/cyclic dependency. If you do the sort will return an
382 empty order and the `CAF` processing will fail.
383
384 This function appends to the `CalibrationBase.dependencies` and `CalibrationBase.future_dependencies` attributes of this
385 `CalibrationBase` and the input one respectively. This prevents us having to do too much recalculation later on.
386 """
387 # Check that we don't have two calibration names that are the same
388 if self.name != calibration.name:
389 # Tests if we have the calibrations added as dependencies already and adds if not
390 if calibration not in self.dependencies:
391 self.dependencies.append(calibration)
392 if self not in calibration.dependencies:
393 calibration.future_dependencies.append(self)
394 else:
395 B2WARNING(f"Tried to add {calibration} as a dependency for {self} but they have the same name."
396 "Dependency was not added.")
397
398 def dependencies_met(self):
399 """
400 Checks if all of the Calibrations that this one depends on have reached a successful end state.
401 """
402 return all(map(lambda x: x.state == x.end_state, self.dependencies))
403
404 def failed_dependencies(self):
405 """
406 Returns the list of calibrations in our dependency list that have failed.
407 """
408 failed = []
409 for calibration in self.dependencies:
410 if calibration.state == self.fail_state:
411 failed.append(calibration)
412 return failed
413
414 def _apply_calibration_defaults(self, defaults):
415 """
416 We pass in default calibration options from the `CAF` instance here if called.
417 Won't overwrite any options already set.
418 """
419 for key, value in defaults.items():
420 try:
421 if getattr(self, key) is None:
422 setattr(self, key, value)
423 except AttributeError:
424 print(f"The calibration {self.name} does not support the attribute {key}.")
425
426
427class Calibration(CalibrationBase):
428 """
429 Every Calibration object must have at least one collector at least one algorithm.
430 You have the option to add in your collector/algorithm by argument here, or add them
431 later by changing the properties.
432
433 If you plan to use multiple `Collection` objects I recommend that you only set the name here and add the Collections
434 separately via `add_collection()`.
435
436 Parameters:
437 name (str): Name of this calibration. It should be unique for use in the `CAF`
438 Keyword Arguments:
439 collector (str, `basf2.Module`): Should be set to a CalibrationCollectorModule() or a string with the module name.
440 algorithms (list, ``ROOT.Belle2.CalibrationAlgorithm``): The algorithm(s) to use for this `Calibration`.
441 input_files (str, list[str]): Input files for use by this Calibration. May contain wildcards usable by `glob.glob`
442
443 A Calibration won't be valid in the `CAF` until it has all of these four attributes set. For example:
444
445 >>> cal = Calibration('TestCalibration1')
446 >>> col1 = register_module('CaTest')
447 >>> cal.add_collection('TestColl', col1)
448
449 or equivalently
450
451 >>> cal = Calibration('TestCalibration1', 'CaTest')
452
453 If you want to run a basf2 :py:class:`path <basf2.Path>` before your collector module when running over data
454
455 >>> cal.pre_collector_path = my_basf2_path
456
457 You don't have to put a RootInput module in this pre-collection path, but you can if
458 you need some special parameters. If you want to process sroot files the you have to explicitly add
459 SeqRootInput to your pre-collection path.
460 The inputFileNames parameter of (Seq)RootInput will be set by the CAF automatically for you.
461
462
463 You can use optional arguments to pass in some/all during initialisation of the `Calibration` class
464
465 >>> cal = Calibration( 'TestCalibration1', 'CaTest', [alg1,alg2], ['/path/to/file.root'])
466
467 you can change the input file list later on, before running with `CAF`
468
469 >>> cal.input_files = ['path/to/*.root', 'other/path/to/file2.root']
470
471 If you have multiple collections from calling `add_collection()` then you should instead set the pre_collector_path,
472 input_files, database chain etc from there. See `Collection`.
473
474 Adding the CalibrationAlgorithm(s) is easy
475
476 >>> alg1 = TestAlgo()
477 >>> cal.algorithms = alg1
478
479 Or equivalently
480
481 >>> cal.algorithms = [alg1]
482
483 Or for multiple algorithms for one collector
484
485 >>> alg2 = TestAlgo()
486 >>> cal.algorithms = [alg1, alg2]
487
488 Note that when you set the algorithms, they are automatically wrapped and stored as a Python class
489 `Algorithm`. To access the C++ algorithm clas underneath directly do:
490
491 >>> cal.algorithms[i].algorithm
492
493 If you have a setup function that you want to run before each of the algorithms, set that with
494
495 >>> cal.pre_algorithms = my_function_object
496
497 If you want a different setup for each algorithm use a list with the same number of elements
498 as your algorithm list.
499
500 >>> cal.pre_algorithms = [my_function1, my_function2, ...]
501
502 You can also specify the dependencies of the calibration on others
503
504 >>> cal.depends_on(cal2)
505
506 By doing this, the `CAF` will respect the ordering of the calibrations and will pass the
507 calibration constants created by earlier completed calibrations to dependent ones.
508 """
509
510 moves = ["submit_collector", "complete", "run_algorithms", "iterate", "fail_fully"]
511
512 alg_output_dir = "algorithm_output"
513
514 checkpoint_states = ["init", "collector_completed", "completed"]
515
516 default_collection_name = "default"
517
518 def __init__(self,
519 name,
520 collector=None,
521 algorithms=None,
522 input_files=None,
523 pre_collector_path=None,
524 database_chain=None,
525 output_patterns=None,
526 max_files_per_collector_job=None,
527 max_collector_jobs=None,
528 backend_args=None
529 ):
530 """
531 """
532
533 self.collections = {}
534
535 self._algorithms = []
536
537 # Default collection added, will have None type and requires setting later via `self.collector`, or will take the
538 # CollectorModule/module name directly.
539 self.add_collection(self.default_collection_name,
540 Collection(collector,
541 input_files,
542 pre_collector_path,
543 database_chain,
544 output_patterns,
545 max_files_per_collector_job,
546 max_collector_jobs,
547 backend_args
548 ))
549
550 super().__init__(name, input_files)
551 if algorithms:
552
555 self.algorithms = algorithms
556
557 self.results = {}
558
560 self.max_iterations = None
561
565 self.ignored_runs = None
566 if self.algorithms:
567
570 self.strategies = strategies.SingleIOV
571 if database_chain:
572
574 self.database_chain = database_chain
575 else:
576 self.database_chain = []
577 # This database is already applied to the `Collection` automatically, so don't do it again
578 for tag in reversed(b2conditions.default_globaltags):
579 self.use_central_database(tag, apply_to_default_collection=False)
580
583 self.algorithms_runner = runners.SeqAlgorithmsRunner
584
586 self.backend = None
587
591 self.collector_full_update_interval = 30
592
593 self.heartbeat = 3
594
595 self.machine = None
596
597 self._db_path = None
598
599 def add_collection(self, name, collection):
600 """
601 Parameters:
602 name (str): Unique name of this `Collection` in the Calibration.
603 collection (`Collection`): `Collection` object to use.
604
605 Adds a new `Collection` object to the `Calibration`. Any valid Collection will be used in the Calibration.
606 A default Collection is automatically added but isn't valid and won't run unless you have assigned a collector
607 + input files.
608 You can ignore the default one and only add your own custom Collections. You can configure the default from the
609 Calibration(...) arguments or after creating the Calibration object via directly setting the cal.collector, cal.input_files
610 attributes.
611 """
612 if name not in self.collections:
613 self.collections[name] = collection
614 else:
615 B2WARNING(f"A Collection with the name '{name}' already exists in this Calibration. It has not been added."
616 "Please use another name.")
617
618 def is_valid(self):
619 """
620 A full calibration consists of a collector AND an associated algorithm AND input_files.
621
622 Returns False if:
623 1) We are missing any of the above.
624 2) There are multiple Collections and the Collectors have mis-matched granularities.
625 3) Any of our Collectors have granularities that don't match what our Strategy can use.
626 """
627 if not self.algorithms:
628 B2WARNING(f"Empty algorithm list for {self.name}.")
629 return False
630
631 if not any([collection.is_valid() for collection in self.collections.values()]):
632 B2WARNING(f"No valid Collections for {self.name}.")
633 return False
634
635 granularities = []
636 for collection in self.collections.values():
637 if collection.is_valid():
638 collector_params = collection.collector.available_params()
639 for param in collector_params:
640 if param.name == "granularity":
641 granularities.append(param.values)
642 if len(set(granularities)) > 1:
643 B2WARNING("Multiple different granularities set for the Collections in this Calibration.")
644 return False
645
646 for alg in self.algorithms:
647 alg_type = type(alg.algorithm).__name__
648 incorrect_gran = [granularity not in alg.strategy.allowed_granularities for granularity in granularities]
649 if any(incorrect_gran):
650 B2WARNING(f"Selected strategy for {alg_type} does not match a collector's granularity.")
651 return False
652 return True
653
654 def reset_database(self, apply_to_default_collection=True):
655 """
656 Keyword Arguments:
657 apply_to_default_collection (bool): Should we also reset the default collection?
658
659 Remove everything in the database_chain of this Calibration, including the default central database tag automatically
660 included from `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`. This will NOT affect the
661 database chain of any `Collection` other than the default one. You can prevent the default Collection from having its chain
662 reset by setting 'apply_to_default_collection' to False.
663 """
664 self.database_chain = []
665 if self.default_collection_name in self.collections and apply_to_default_collection:
666 self.collections[self.default_collection_name].reset_database()
667
668 def use_central_database(self, global_tag, apply_to_default_collection=True):
669 """
670 Parameters:
671 global_tag (str): The central database global tag to use for this calibration.
672
673 Keyword Arguments:
674 apply_to_default_collection (bool): Should we also call use_central_database on the default collection (if it exists)
675
676 Using this allows you to append a central database to the database chain for this calibration.
677 The default database chain is just the central one from
678 `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`.
679 To turn off central database completely or use a custom tag as the base, you should call `Calibration.reset_database`
680 and start adding databases with `Calibration.use_local_database` and `Calibration.use_central_database`.
681
682 Note that the database chain attached to the `Calibration` will only affect the default `Collection` (if it exists),
683 and the algorithm processes. So calling:
684
685 >> cal.use_central_database("global_tag")
686
687 will modify the database chain used by all the algorithms assigned to this `Calibration`, and modifies the database chain
688 assigned to
689
690 >> cal.collections['default'].database_chain
691
692 But calling
693
694 >> cal.use_central_database(file_path, payload_dir, False)
695
696 will add the database to the Algorithm processes, but leave the default Collection database chain untouched.
697 So if you have multiple Collections in this Calibration *their database chains are separate*.
698 To specify an additional `CentralDatabase` for a different collection, you will have to call:
699
700 >> cal.collections['OtherCollection'].use_central_database("global_tag")
701 """
702 central_db = CentralDatabase(global_tag)
703 self.database_chain.append(central_db)
704 if self.default_collection_name in self.collections and apply_to_default_collection:
705 self.collections[self.default_collection_name].use_central_database(global_tag)
706
707 def use_local_database(self, filename, directory="", apply_to_default_collection=True):
708 """
709 Parameters:
710 filename (str): The path to the database.txt of the local database
711
712 Keyword Argumemts:
713 directory (str): The path to the payloads directory for this local database.
714 apply_to_default_collection (bool): Should we also call use_local_database on the default collection (if it exists)
715
716 Append a local database to the chain for this calibration.
717 You can call this function multiple times and each database will be added to the chain IN ORDER.
718 The databases are applied to this calibration ONLY.
719 The Local and Central databases applied via these functions are applied to the algorithm processes and optionally
720 the default `Collection` job as a database chain.
721 There are other databases applied to the processes later, checked by basf2 in this order:
722
723 1) Local Database from previous iteration of this Calibration.
724 2) Local Database chain from output of previous dependent Calibrations.
725 3) This chain of Local and Central databases where the last added is checked first.
726
727 Note that this function on the `Calibration` object will only affect the default `Collection` if it exists and if
728 'apply_to_default_collection' remains True. So calling:
729
730 >> cal.use_local_database(file_path, payload_dir)
731
732 will modify the database chain used by all the algorithms assigned to this `Calibration`, and modifies the database chain
733 assigned to
734
735 >> cal.collections['default'].database_chain
736
737 But calling
738
739 >> cal.use_local_database(file_path, payload_dir, False)
740
741 will add the database to the Algorithm processes, but leave the default Collection database chain untouched.
742
743 If you have multiple Collections in this Calibration *their database chains are separate*.
744 To specify an additional `LocalDatabase` for a different collection, you will have to call:
745
746 >> cal.collections['OtherCollection'].use_local_database(file_path, payload_dir)
747
748 """
749 local_db = LocalDatabase(filename, directory)
750 self.database_chain.append(local_db)
751 if self.default_collection_name in self.collections and apply_to_default_collection:
752 self.collections[self.default_collection_name].use_local_database(filename, directory)
753
754 def _get_default_collection_attribute(self, attr):
755 if self.default_collection_name in self.collections:
756 return getattr(self.collections[self.default_collection_name], attr)
757 else:
758 B2WARNING(f"You tried to get the attribute '{attr}' from the Calibration '{self.name}', "
759 "but the default collection doesn't exist."
760 f"You should use the cal.collections['CollectionName'].{attr} to access a custom "
761 "collection's attributes directly.")
762 return None
763
764 def _set_default_collection_attribute(self, attr, value):
765 if self.default_collection_name in self.collections:
766 setattr(self.collections[self.default_collection_name], attr, value)
767 else:
768 B2WARNING(f"You tried to set the attribute '{attr}' from the Calibration '{self.name}', "
769 "but the default collection doesn't exist."
770 f"You should use the cal.collections['CollectionName'].{attr} to access a custom "
771 "collection's attributes directly.")
772
773 @property
774 def collector(self):
775 """
776 """
777 return self._get_default_collection_attribute("collector")
778
779 @collector.setter
780 def collector(self, collector):
781 """
782 """
783 # check if collector is already a module or if we need to create one
784 # from the name
785 from basf2 import Module
786 if isinstance(collector, str):
787 from basf2 import register_module
788 collector = register_module(collector)
789 if not isinstance(collector, Module):
790 B2ERROR("Collector needs to be either a Module or the name of such a module")
791
792 self._set_default_collection_attribute("collector", collector)
793
794 @property
795 def input_files(self):
796 """
797 """
798 return self._get_default_collection_attribute("input_files")
799
800 @input_files.setter
801 def input_files(self, files):
802 """
803 """
804 self._set_default_collection_attribute("input_files", files)
805
806 @property
807 def files_to_iovs(self):
808 """
809 """
810 return self._get_default_collection_attribute("files_to_iovs")
811
812 @files_to_iovs.setter
813 def files_to_iovs(self, file_map):
814 """
815 """
816 self._set_default_collection_attribute("files_to_iovs", file_map)
817
818 @property
819 def pre_collector_path(self):
820 """
821 """
822 return self._get_default_collection_attribute("pre_collector_path")
823
824 @pre_collector_path.setter
825 def pre_collector_path(self, path):
826 """
827 """
828 self._set_default_collection_attribute("pre_collector_path", path)
829
830 @property
831 def output_patterns(self):
832 """
833 """
834 return self._get_default_collection_attribute("output_patterns")
835
836 @output_patterns.setter
837 def output_patterns(self, patterns):
838 """
839 """
840 self._set_default_collection_attribute("output_patterns", patterns)
841
842 @property
843 def max_files_per_collector_job(self):
844 """
845 """
846 return self._get_default_collection_attribute("max_files_per_collector_job")
847
848 @max_files_per_collector_job.setter
849 def max_files_per_collector_job(self, max_files):
850 """
851 """
852 self._set_default_collection_attribute("max_files_per_collector_job", max_files)
853
854 @property
855 def max_collector_jobs(self):
856 """
857 """
858 return self._get_default_collection_attribute("max_collector_jobs")
859
860 @max_collector_jobs.setter
861 def max_collector_jobs(self, max_jobs):
862 """
863 """
864 self._set_default_collection_attribute("max_collector_jobs", max_jobs)
865
866 @property
867 def backend_args(self):
868 """
869 """
870 return self._get_default_collection_attribute("backend_args")
871
872 @backend_args.setter
873 def backend_args(self, args):
874 """
875 """
876 self._set_default_collection_attribute("backend_args", args)
877
878 @property
879 def algorithms(self):
880 """
881 """
882 return self._algorithms
883
884 @algorithms.setter
885 @method_dispatch
886 def algorithms(self, value):
887 """
888 """
889 from ROOT import Belle2 # noqa: make the Belle2 namespace available
890 from ROOT.Belle2 import CalibrationAlgorithm
891 if isinstance(value, CalibrationAlgorithm):
892 self._algorithms = [Algorithm(value)]
893 else:
894 B2ERROR(f"Something other than CalibrationAlgorithm instance passed in ({type(value)}). "
895 "Algorithm needs to inherit from Belle2::CalibrationAlgorithm")
896
897 @algorithms.fset.register(tuple)
898 @algorithms.fset.register(list)
899 def _(self, value):
900 """
901 Alternate algorithms setter for lists and tuples of CalibrationAlgorithms.
902 """
903 from ROOT import Belle2 # noqa: make the Belle2 namespace available
904 from ROOT.Belle2 import CalibrationAlgorithm
905 if value:
906 self._algorithms = []
907 for alg in value:
908 if isinstance(alg, CalibrationAlgorithm):
909 self._algorithms.append(Algorithm(alg))
910 else:
911 B2ERROR(f"Something other than CalibrationAlgorithm instance passed in {type(value)}."
912 "Algorithm needs to inherit from Belle2::CalibrationAlgorithm")
913
914 @property
915 def pre_algorithms(self):
916 """
917 Callback run prior to each algorithm iteration.
918 """
919 return [alg.pre_algorithm for alg in self.algorithms]
920
921 @pre_algorithms.setter
922 @method_dispatch
923 def pre_algorithms(self, func):
924 """
925 """
926 if func:
927 for alg in self.algorithms:
928 alg.pre_algorithm = func
929 else:
930 B2ERROR("Something evaluated as False passed in as pre_algorithm function.")
931
932 @pre_algorithms.fset.register(tuple)
933 @pre_algorithms.fset.register(list)
934 def _(self, values):
935 """
936 Alternate pre_algorithms setter for lists and tuples of functions, should be one per algorithm.
937 """
938 if values:
939 if len(values) == len(self.algorithms):
940 for func, alg in zip(values, self.algorithms):
941 alg.pre_algorithm = func
942 else:
943 B2ERROR("Number of functions and number of algorithms doesn't match.")
944 else:
945 B2ERROR("Empty container passed in for pre_algorithm functions")
946
947 @property
948 def strategies(self):
949 """
950 The `caf.strategies.AlgorithmStrategy` or `list` of them used when running the algorithm(s).
951 """
952 return [alg.strategy for alg in self.algorithms]
953
954 @strategies.setter
955 @method_dispatch
956 def strategies(self, strategy):
957 """
958 """
959 if strategy:
960 for alg in self.algorithms:
961 alg.strategy = strategy
962 else:
963 B2ERROR("Something evaluated as False passed in as a strategy.")
964
965 @strategies.fset.register(tuple)
966 @strategies.fset.register(list)
967 def _(self, values):
968 """
969 Alternate strategies setter for lists and tuples of functions, should be one per algorithm.
970 """
971 if values:
972 if len(values) == len(self.algorithms):
973 for strategy, alg in zip(strategies, self.algorithms):
974 alg.strategy = strategy
975 else:
976 B2ERROR("Number of strategies and number of algorithms doesn't match.")
977 else:
978 B2ERROR("Empty container passed in for strategies list")
979
980 def __repr__(self):
981 """
982 """
983 return self.name
984
985 def run(self):
986 """
987 Main logic of the Calibration object.
988 Will be run in a new Thread by calling the start() method.
989 """
990 with CAFDB(self._db_path, read_only=True) as db:
991 initial_state = db.get_calibration_value(self.name, "checkpoint")
992 initial_iteration = db.get_calibration_value(self.name, "iteration")
993 B2INFO(f"Initial status of {self.name} found to be state={initial_state}, iteration={initial_iteration}")
994 self.machine = CalibrationMachine(self,
995 iov_to_calibrate=self.iov,
996 initial_state=initial_state,
997 iteration=initial_iteration)
998 self.state = initial_state
999 self.machine.root_dir = Path(os.getcwd(), self.name)
1000 self.machine.collector_backend = self.backend
1001
1002 # Before we start running, let's clean up any iteration directories from iterations above our initial one.
1003 # Should prevent confusion between attempts if we fail again.
1004 all_iteration_paths = find_int_dirs(self.machine.root_dir)
1005 for iteration_path in all_iteration_paths:
1006 if int(iteration_path.name) > initial_iteration:
1007 shutil.rmtree(iteration_path)
1008
1009 while self.state != self.end_state and self.state != self.fail_state:
1010 if self.state == "init":
1011 try:
1012 B2INFO(f"Attempting collector submission for calibration {self.name}.")
1013 self.machine.submit_collector()
1014 except Exception as err:
1015 B2FATAL(str(err))
1016
1017 self._poll_collector()
1018
1019 # If we failed take us to the final fail state
1020 if self.state == "collector_failed":
1021 self.machine.fail_fully()
1022 return
1023
1024 # It's possible that we might raise an error while attempting to run due
1025 # to some problems e.g. Missing collector output files
1026 # We catch the error and exit with failed state so the CAF will stop
1027 try:
1028 B2INFO(f"Attempting to run algorithms for calibration {self.name}.")
1029 self.machine.run_algorithms()
1030 except MachineError as err:
1031 B2ERROR(str(err))
1032 self.machine.fail()
1033
1034 # If we failed take us to the final fail state
1035 if self.machine.state == "algorithms_failed":
1036 self.machine.fail_fully()
1037 return
1038
1039 def _poll_collector(self):
1040 """
1041 """
1042 while self.state == "running_collector":
1043 try:
1044 self.machine.complete()
1045 # ConditionError is thrown when the conditions for the transition have returned false, it's not serious.
1046 except ConditionError:
1047 try:
1048 B2DEBUG(29, f"Checking if collector jobs for calibration {self.name} have failed.")
1049 self.machine.fail()
1050 except ConditionError:
1051 pass
1052 sleep(self.heartbeat) # Sleep until we want to check again
1053
1054 @property
1055 def state(self):
1056 """
1057 The current major state of the calibration in the database file. The machine may have a different state.
1058 """
1059 with CAFDB(self._db_path, read_only=True) as db:
1060 state = db.get_calibration_value(self.name, "state")
1061 return state
1062
1063 @state.setter
1064 def state(self, state):
1065 """
1066 """
1067 B2DEBUG(29, f"Setting {self.name} to state {state}.")
1068 with CAFDB(self._db_path) as db:
1069 db.update_calibration_value(self.name, "state", str(state))
1070 if state in self.checkpoint_states:
1071 db.update_calibration_value(self.name, "checkpoint", str(state))
1072 B2DEBUG(29, f"{self.name} set to {state}.")
1073
1074 @property
1075 def iteration(self):
1076 """
1077 Retrieves the current iteration number in the database file.
1078
1079 Returns:
1080 int: The current iteration number
1081 """
1082 with CAFDB(self._db_path, read_only=True) as db:
1083 iteration = db.get_calibration_value(self.name, "iteration")
1084 return iteration
1085
1086 @iteration.setter
1087 def iteration(self, iteration):
1088 """
1089 """
1090 B2DEBUG(29, f"Setting {self.name} to {iteration}.")
1091 with CAFDB(self._db_path) as db:
1092 db.update_calibration_value(self.name, "iteration", iteration)
1093 B2DEBUG(29, f"{self.name} set to {self.iteration}.")
1094
1095
1096class Algorithm():
1097 """
1098 Parameters:
1099 algorithm: The CalibrationAlgorithm instance that we want to execute.
1100 Keyword Arguments:
1101 data_input : An optional function that sets the input files of the algorithm.
1102 pre_algorithm : An optional function that runs just prior to execution of the algorithm.
1103 Useful for set up e.g. module initialisation
1104
1105 This is a simple wrapper class around the C++ CalibrationAlgorithm class.
1106 It helps to add functionality to algorithms for use by the Calibration and CAF classes rather
1107 than separating the logic into those classes directly.
1108
1109 This is **not** currently a class that a user should interact with much during `CAF`
1110 setup (unless you're doing something advanced).
1111 The `Calibration` class should be doing the most of the creation of the defaults for these objects.
1112
1113 Setting the `data_input` function might be necessary if you have set the `Calibration.output_patterns`.
1114 Also, setting the `pre_algorithm` to a function that should execute prior to each `strategies.AlgorithmStrategy`
1115 is often useful i.e. by calling for the Geometry module to initialise.
1116 """
1117
1118 def __init__(self, algorithm, data_input=None, pre_algorithm=None):
1119 """
1120 """
1121
1122 self.algorithm = algorithm
1123
1124 cppname = type(algorithm).__cpp_name__
1125 self.name = cppname[cppname.rfind('::') + 2:]
1126
1129 self.data_input = data_input
1130 if not self.data_input:
1131 self.data_input = self.default_inputdata_setup
1132
1135 self.pre_algorithm = pre_algorithm
1136
1138 self.strategy = strategies.SingleIOV
1139
1144 self.params = {}
1145
1146 def default_inputdata_setup(self, input_file_paths):
1147 """
1148 Simple setup to set the input file names to the algorithm. Applied to the data_input attribute
1149 by default. This simply takes all files returned from the `Calibration.output_patterns` and filters
1150 for only the CollectorOutput.root files. Then it sets them as input files to the CalibrationAlgorithm class.
1151 """
1152 collector_output_files = list(filter(lambda file_path: "CollectorOutput.root" == Path(file_path).name,
1153 input_file_paths))
1154 info_lines = [f"Input files used in {self.name}:"]
1155 info_lines.extend(collector_output_files)
1156 B2INFO_MULTILINE(info_lines)
1157 self.algorithm.setInputFileNames(collector_output_files)
1158
1159
1160class CAF():
1161 """
1162 Parameters:
1163 calibration_defaults (dict): A dictionary of default options for calibrations run by this `CAF` instance e.g.
1164
1165 >>> calibration_defaults={"max_iterations":2}
1166
1167 This class holds `Calibration` objects and processes them. It defines the initial configuration/setup
1168 for the calibrations. But most of the real processing is done through the `caf.state_machines.CalibrationMachine`.
1169
1170 The `CAF` class essentially does some initial setup, holds the `CalibrationBase` instances and calls the
1171 `CalibrationBase.start` when the dependencies are met.
1172
1173 Much of the checking for consistency is done in this class so that no processing is done with an invalid
1174 setup. Choosing which files to use as input should be done from outside during the setup of the `CAF` and
1175 `CalibrationBase` instances.
1176 """
1177
1178
1179 _db_name = "caf_state.db"
1180
1181 default_calibration_config = {
1182 "max_iterations": 5,
1183 "ignored_runs": []
1184 }
1185
1186 def __init__(self, calibration_defaults=None):
1187 """
1188 """
1189
1190 self.calibrations = {}
1191
1193 self.future_dependencies = {}
1194
1196 self.dependencies = {}
1197
1198 self.output_dir = "calibration_results"
1199
1200 self.order = None
1201
1202 self._backend = None
1203
1204 self.heartbeat = 5
1205
1206 if not calibration_defaults:
1207 calibration_defaults = {}
1208
1210 self.calibration_defaults = {**self.default_calibration_config, **calibration_defaults}
1211
1212 self._db_path = None
1213
1214 def add_calibration(self, calibration):
1215 """
1216 Adds a `Calibration` that is to be used in this program to the list.
1217 Also adds an empty dependency list to the overall dictionary.
1218 You should not directly alter a `Calibration` object after it has been
1219 added here.
1220 """
1221 if calibration.is_valid():
1222 if calibration.name not in self.calibrations:
1223 self.calibrations[calibration.name] = calibration
1224 else:
1225 B2WARNING(f"Tried to add a calibration with the name {calibration.name} twice.")
1226 else:
1227 B2WARNING(f"Tried to add incomplete/invalid calibration ({calibration.name}) to the framework."
1228 "It was not added and will not be part of the final process.")
1229
1230 def _remove_missing_dependencies(self):
1231 """
1232 This checks the future and past dependencies of each `Calibration` in the `CAF`.
1233 If any dependencies are not known to the `CAF` then they are removed from the `Calibration`
1234 object directly.
1235 """
1236 calibration_names = [calibration.name for calibration in self.calibrations.values()]
1237
1238 def is_dependency_in_caf(dependency):
1239 """
1240 Quick function to use with filter() and check dependencies against calibrations known to `CAF`
1241 """
1242 dependency_in_caf = dependency.name in calibration_names
1243 if not dependency_in_caf:
1244 B2WARNING(f"The calibration {dependency.name} is a required dependency but is not in the CAF."
1245 " It has been removed as a dependency.")
1246 return dependency_in_caf
1247
1248 # Check that there aren't dependencies on calibrations not added to the framework
1249 # Remove them from the calibration objects if there are.
1250 for calibration in self.calibrations.values():
1251 filtered_future_dependencies = list(filter(is_dependency_in_caf, calibration.future_dependencies))
1252 calibration.future_dependencies = filtered_future_dependencies
1253
1254 filtered_dependencies = list(filter(is_dependency_in_caf, calibration.dependencies))
1255 calibration.dependencies = filtered_dependencies
1256
1257 def _order_calibrations(self):
1258 """
1259 - Uses dependency attributes of calibrations to create a dependency dictionary and passes it
1260 to a sorting algorithm.
1261 - Returns valid OrderedDict if sort was successful, empty one if it failed (most likely a cyclic dependency)
1262 """
1263 # First remove any dependencies on calibrations not added to the CAF
1264 self._remove_missing_dependencies()
1265 # Filling dependencies dictionaries of CAF for sorting, only explicit dependencies for now
1266 # Note that they currently use the names not the calibration objects.
1267 for calibration in self.calibrations.values():
1268 future_dependencies_names = [dependency.name for dependency in calibration.future_dependencies]
1269 past_dependencies_names = [dependency.name for dependency in calibration.dependencies]
1270
1271 self.future_dependencies[calibration.name] = future_dependencies_names
1272 self.dependencies[calibration.name] = past_dependencies_names
1273 # Gives us a list of A (not THE) valid ordering and checks for cyclic dependencies
1274 order = topological_sort(self.future_dependencies)
1275 if not order:
1276 return False
1277
1278 # Get an ordered dictionary of the sort order but including all implicit dependencies.
1279 ordered_full_dependencies = all_dependencies(self.future_dependencies, order)
1280
1281 # Return all the implicit+explicit past dependencies
1282 full_past_dependencies = past_from_future_dependencies(ordered_full_dependencies)
1283 # Correct each calibration's dependency list to reflect the implicit dependencies
1284 for calibration in self.calibrations.values():
1285 full_deps = full_past_dependencies[calibration.name]
1286 explicit_deps = [cal.name for cal in calibration.dependencies]
1287 for dep in full_deps:
1288 if dep not in explicit_deps:
1289 calibration.dependencies.append(self.calibrations[dep])
1290 # At this point the calibrations have their full dependencies but they aren't in topological
1291 # sort order. Correct that here
1292 ordered_dependency_list = []
1293 for ordered_calibration_name in order:
1294 if ordered_calibration_name in [dep.name for dep in calibration.dependencies]:
1295 ordered_dependency_list.append(self.calibrations[ordered_calibration_name])
1296 calibration.dependencies = ordered_dependency_list
1297 order = ordered_full_dependencies
1298 # We should also patch in all of the implicit dependencies for the calibrations
1299 return order
1300
1301 def _check_backend(self):
1302 """
1303 Makes sure that the CAF has a valid backend setup. If one isn't set by the user (or if the
1304 one that is stored isn't a valid Backend object) we should create a default Local backend.
1305 """
1306 if not isinstance(self._backend, caf.backends.Backend):
1307
1308 self.backend = caf.backends.Local()
1309
1310 def _prune_invalid_collections(self):
1311 """
1312 Checks all current calibrations and removes any invalid Collections from their collections list.
1313 """
1314 B2INFO("Checking for any invalid Collections in Calibrations.")
1315 for calibration in self.calibrations.values():
1316 valid_collections = {}
1317 for name, collection in calibration.collections.items():
1318 if collection.is_valid():
1319 valid_collections[name] = collection
1320 else:
1321 B2WARNING(f"Removing invalid Collection '{name}' from Calibration '{calibration.name}'.")
1322 calibration.collections = valid_collections
1323
1324 def run(self, iov=None):
1325 """
1326 Keyword Arguments:
1327 iov(`caf.utils.IoV`): IoV to calibrate for this processing run. Only the input files necessary to calibrate
1328 this IoV will be used in the collection step.
1329
1330 This function runs the overall calibration job, saves the outputs to the output_dir directory,
1331 and creates database payloads.
1332
1333 Upload of final databases is not done here. This simply creates the local databases in
1334 the output directory. You should check the validity of your new local database before uploading
1335 to the conditions DB via the basf2 tools/interface to the DB.
1336 """
1337 if not self.calibrations:
1338 B2FATAL("There were no Calibration objects to run. Maybe you tried to add invalid ones?")
1339 # Checks whether the dependencies we've added will give a valid order
1340 order = self._order_calibrations()
1341 if not order:
1342 B2FATAL("Couldn't order the calibrations properly. Could be a cyclic dependency.")
1343
1344 # Check that a backend has been set and use default Local() one if not
1345 self._check_backend()
1346
1347 self._prune_invalid_collections()
1348
1349 # Creates the overall output directory and reset the attribute to use an absolute path to it.
1350 self.output_dir = self._make_output_dir()
1351
1352 # Creates a SQLite DB to save the status of the various calibrations
1353 self._make_database()
1354
1355 # Enter the overall output dir during processing and opena connection to the DB
1356 with temporary_workdir(self.output_dir):
1357 db = CAFDB(self._db_path)
1358 db.open()
1359 db_initial_calibrations = db.query("select * from calibrations").fetchall()
1360 for calibration in self.calibrations.values():
1361 # Apply defaults given to the `CAF` to the calibrations if they aren't set
1362 calibration._apply_calibration_defaults(self.calibration_defaults)
1363 calibration._db_path = self._db_path
1364 calibration.output_database_dir = Path(self.output_dir, calibration.name, "outputdb").as_posix()
1365 calibration.iov = iov
1366 if not calibration.backend:
1367 calibration.backend = self.backend
1368 # Do some checking of the db to see if we need to add an entry for this calibration
1369 if calibration.name not in [db_cal[0] for db_cal in db_initial_calibrations]:
1370 db.insert_calibration(calibration.name)
1371 db.commit()
1372 else:
1373 for cal_info in db_initial_calibrations:
1374 if cal_info[0] == calibration.name:
1375 cal_initial_state = cal_info[2]
1376 cal_initial_iteration = cal_info[3]
1377 B2INFO(f"Previous entry in database found for {calibration.name}.")
1378 B2INFO(f"Setting {calibration.name} state to checkpoint state '{cal_initial_state}'.")
1379 calibration.state = cal_initial_state
1380 B2INFO(f"Setting {calibration.name} iteration to '{cal_initial_iteration}'.")
1381 calibration.iteration = cal_initial_iteration
1382 # Daemonize so that it exits if the main program exits
1383 calibration.daemon = True
1384
1385 db.close()
1386
1387 # Is it possible to keep going?
1388 keep_running = True
1389 while keep_running:
1390 keep_running = False
1391 # Do we have calibrations that may yet complete?
1392 remaining_calibrations = []
1393
1394 for calibration in self.calibrations.values():
1395 # Find the currently ended calibrations (may not be joined yet)
1396 if (calibration.state == CalibrationBase.end_state or calibration.state == CalibrationBase.fail_state):
1397 # Search for any alive Calibrations and join them
1398 if calibration.is_alive():
1399 B2DEBUG(29, f"Joining {calibration.name}.")
1400 calibration.join()
1401 else:
1402 if calibration.dependencies_met():
1403 if not calibration.is_alive():
1404 B2DEBUG(29, f"Starting {calibration.name}.")
1405 try:
1406 calibration.start()
1407 except RuntimeError:
1408 # Catch the case when the calibration just finished so it ended up here
1409 # in the "else" and not above where it should have been joined.
1410 B2DEBUG(29, f"{calibration.name} probably just finished, join it later.")
1411 remaining_calibrations.append(calibration)
1412 else:
1413 if not calibration.failed_dependencies():
1414 remaining_calibrations.append(calibration)
1415 if remaining_calibrations:
1416 keep_running = True
1417 # Loop over jobs that the calibrations want submitted and submit them.
1418 # We do this here because some backends don't like us submitting in parallel from multiple CalibrationThreads
1419 # So this is like a mini job queue without getting too clever with it
1420 for calibration in remaining_calibrations:
1421 for job in calibration.jobs_to_submit[:]:
1422 calibration.backend.submit(job)
1423 calibration.jobs_to_submit.remove(job)
1424 sleep(self.heartbeat)
1425
1426 B2INFO("Printing summary of final CAF status.")
1427 with CAFDB(self._db_path, read_only=True) as db:
1428 print(db.output_calibration_table())
1429
1430 @property
1431 def backend(self):
1432 """
1433 The `backend <backends.Backend>` that runs the collector job.
1434 When set, this is checked that a `backends.Backend` class instance was passed in.
1435 """
1436 return self._backend
1437
1438 @backend.setter
1439 def backend(self, backend):
1440 """
1441 """
1442 if isinstance(backend, caf.backends.Backend):
1443 self._backend = backend
1444 else:
1445 B2ERROR('Backend property must inherit from Backend class.')
1446
1447 def _make_output_dir(self):
1448 """
1449 Creates the output directory. If it already exists we are now going to try and restart the program from the last state.
1450
1451 Returns:
1452 str: The absolute path of the new output_dir
1453 """
1454 p = Path(self.output_dir).resolve()
1455 if p.is_dir():
1456 B2INFO(f"{p.as_posix()} output directory already exists. "
1457 "We will try to restart from the previous finishing state.")
1458 return p.as_posix()
1459 else:
1460 p.mkdir(parents=True)
1461 if p.is_dir():
1462 return p.as_posix()
1463 else:
1464 raise FileNotFoundError(f"Attempted to create output_dir {p.as_posix()}, but it didn't work.")
1465
1466 def _make_database(self):
1467 """
1468 Creates the CAF status database. If it already exists we don't overwrite it.
1469 """
1470 self._db_path = Path(self.output_dir, self._db_name).absolute()
1471 if self._db_path.exists():
1472 B2INFO(f"Previous CAF database found {self._db_path}")
1473 # Will create a new database + tables, or do nothing but checks we can connect to existing one
1474 with CAFDB(self._db_path):
1475 pass
1476
1477# @endcond
STL class.