Belle II Software development
framework.py
1#!/usr/bin/env python3
2
3# disable doxygen check for this file
4# @cond
5
6
13
14"""
15This module implements several objects/functions to configure and run calibrations.
16These classes are used to construct the workflow of the calibration job.
17The actual processing code is mostly in the `caf.state_machines` module.
18"""
19
20__all__ = ["CalibrationBase", "Calibration", "Algorithm", "CAF"]
21
22import os
23from threading import Thread
24from time import sleep
25from pathlib import Path
26import shutil
27from glob import glob
28
29from basf2 import B2ERROR, B2WARNING, B2INFO, B2FATAL, B2DEBUG
30from basf2 import find_file
31from basf2 import conditions as b2conditions
32
33from abc import ABC, abstractmethod
34
35import caf
36from caf.utils import B2INFO_MULTILINE
37from caf.utils import past_from_future_dependencies
38from caf.utils import topological_sort
39from caf.utils import all_dependencies
40from caf.utils import method_dispatch
41from caf.utils import temporary_workdir
42from caf.utils import find_int_dirs
43from caf.utils import LocalDatabase
44from caf.utils import CentralDatabase
45from caf.utils import parse_file_uri
46
47import caf.strategies as strategies
48import caf.runners as runners
49from caf.backends import MaxSubjobsSplitter, MaxFilesSplitter
50from caf.state_machines import CalibrationMachine, ConditionError, MachineError
51from caf.database import CAFDB
52
53
54class Collection():
55 """
56 Keyword Arguments:
57 collector (str, basf2.Module): The collector module or module name for this `Collection`.
58 input_files (list[str]): The input files to be used for only this `Collection`.
59 pre_collection_path (basf2.Path): The reconstruction `basf2.Path` to be run prior to the Collector module.
60 database_chain (list[CentralDatabase, LocalDatabase]): The database chain to be used initially for this `Collection`.
61 output_patterns (list[str]): Output patterns of files produced by collector which will be used to pass to the
62 `Algorithm.data_input` function. Setting this here, replaces the default completely.
63 max_files_for_collector_job (int): Maximum number of input files sent to each collector subjob for this `Collection`.
64 Technically this sets the SubjobSplitter to be used, not compatible with max_collector_jobs.
65 max_collector_jobs (int): Maximum number of collector subjobs for this `Collection`.
66 Input files are split evenly between them. Technically this sets the SubjobSplitter to be used. Not compatible with
67 max_files_for_collector_job.
68 backend_args (dict): The args for the backend submission of this `Collection`.
69 """
70
72 default_max_collector_jobs = 1000
73
75 job_config = "collector_job.json"
76
77 def __init__(self,
78 collector=None,
79 input_files=None,
80 pre_collector_path=None,
81 database_chain=None,
82 output_patterns=None,
83 max_files_per_collector_job=None,
84 max_collector_jobs=None,
85 backend_args=None
86 ):
87
88 self.collector = collector
89
90 self.input_files = []
91 if input_files:
92 self.input_files = input_files
93
99 self.files_to_iovs = {}
100
104 self.pre_collector_path = None
105 if pre_collector_path:
106 self.pre_collector_path = pre_collector_path
107
111 self.output_patterns = ["CollectorOutput.root"]
112 if output_patterns:
113 self.output_patterns = output_patterns
114
115
117 self.splitter = None
118 if max_files_per_collector_job and max_collector_jobs:
119 B2FATAL("Cannot set both 'max_files_per_collector_job' and 'max_collector_jobs' of a collection!")
120 elif max_files_per_collector_job:
121 self.max_files_per_collector_job = max_files_per_collector_job
122 elif max_collector_jobs:
123 self.max_collector_jobs = max_collector_jobs
124 else:
125 self.max_collector_jobs = self.default_max_collector_jobs
126
127
129 self.backend_args = {}
130 if backend_args:
131 self.backend_args = backend_args
132
133 if database_chain:
134
137 self.database_chain = database_chain
138 else:
139 self.database_chain = []
140 # This may seem weird but the changes to the DB interface mean that they have effectively swapped from being
141 # described well by appending to a list to a deque. So we do bit of reversal to translate it back and make the
142 # most important GT the last one encountered.
143 for tag in reversed(b2conditions.default_globaltags):
144 self.use_central_database(tag)
145
146 self.job_script = Path(find_file("calibration/scripts/caf/run_collector_path.py")).absolute()
147 """The basf2 steering file that will be used for Collector jobs run by this collection.
148This script will be copied into subjob directories as part of the input sandbox."""
149
150
151 self.job_cmd = ["basf2", self.job_script.name, "--job-information job_info.json"]
152
153 def reset_database(self):
154 """
155 Remove everything in the database_chain of this Calibration, including the default central database
156 tag automatically included from `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`.
157 """
158 self.database_chain = []
159
160 def use_central_database(self, global_tag):
161 """
162 Parameters:
163 global_tag (str): The central database global tag to use for this calibration.
164
165 Using this allows you to add a central database to the head of the global tag database chain for this collection.
166 The default database chain is just the central one from
167 `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`.
168 The input file global tag will always be overrided and never used unless explicitly set.
169
170 To turn off central database completely or use a custom tag as the base, you should call `Calibration.reset_database`
171 and start adding databases with `Calibration.use_local_database` and `Calibration.use_central_database`.
172
173 Alternatively you could set an empty list as the input database_chain when adding the Collection to the Calibration.
174
175 NOTE!! Since ``release-04-00-00`` the behaviour of basf2 conditions databases has changed.
176 All local database files MUST now be at the head of the 'chain', with all central database global tags in their own
177 list which will be checked after all local database files have been checked.
178
179 So even if you ask for ``["global_tag1", "localdb/database.txt", "global_tag2"]`` to be the database chain, the real order
180 that basf2 will use them is ``["global_tag1", "global_tag2", "localdb/database.txt"]`` where the file is checked first.
181 """
182 central_db = CentralDatabase(global_tag)
183 self.database_chain.append(central_db)
184
185 def use_local_database(self, filename, directory=""):
186 """
187 Parameters:
188 filename (str): The path to the database.txt of the local database
189 directory (str): The path to the payloads directory for this local database.
190
191 Append a local database to the chain for this collection.
192 You can call this function multiple times and each database will be added to the chain IN ORDER.
193 The databases are applied to this collection ONLY.
194
195 NOTE!! Since release-04-00-00 the behaviour of basf2 conditions databases has changed.
196 All local database files MUST now be at the head of the 'chain', with all central database global tags in their own
197 list which will be checked after all local database files have been checked.
198
199 So even if you ask for ["global_tag1", "localdb/database.txt", "global_tag2"] to be the database chain, the real order
200 that basf2 will use them is ["global_tag1", "global_tag2", "localdb/database.txt"] where the file is checked first.
201 """
202 local_db = LocalDatabase(filename, directory)
203 self.database_chain.append(local_db)
204
205 @staticmethod
206 def uri_list_from_input_file(input_file):
207 """
208 Parameters:
209 input_file (str): A local file/glob pattern or XROOTD URI
210
211 Returns:
212 list: A list of the URIs found from the initial string.
213 """
214 # By default we assume it is a local file path if no "scheme" is given
215 uri = parse_file_uri(input_file)
216 if uri.scheme == "file":
217 # For local files we also perform a glob just in case it is a wildcard pattern.
218 # That way we will have all the uris of files separately
219 uris = [parse_file_uri(f).geturl() for f in glob(input_file)]
220 else:
221 # Just let everything else through and hop the backend can figure it out
222 uris = [input_file]
223 return uris
224
225 @property
226 def input_files(self):
227 return self._input_files
228
229 @input_files.setter
230 def input_files(self, value):
231 if isinstance(value, str):
232 # If it's a string, we convert to a list of URIs
233 self._input_files = self.uri_list_from_input_file(value)
234 elif isinstance(value, list):
235 # If it's a list we loop and do the same thing
236 total_files = []
237 for pattern in value:
238 total_files.extend(self.uri_list_from_input_file(pattern))
239 self._input_files = total_files
240 else:
241 raise TypeError("Input files must be a list or string")
242
243 @property
244 def collector(self):
245 """
246 """
247 return self._collector
248
249 @collector.setter
250 def collector(self, collector):
251 """
252 """
253 # check if collector is already a module or if we need to create one
254 # from the name
255 if collector:
256 from basf2 import Module
257 if isinstance(collector, str):
258 from basf2 import register_module
259 collector = register_module(collector)
260 if not isinstance(collector, Module):
261 B2ERROR("Collector needs to be either a Module or the name of such a module")
262
263 self._collector = collector
264
265 def is_valid(self):
266 if (not self.collector or not self.input_files):
267 return False
268 else:
269 return True
270
271 @property
272 def max_collector_jobs(self):
273 if self.splitter:
274 return self.splitter.max_subjobs
275 else:
276 return None
277
278 @max_collector_jobs.setter
279 def max_collector_jobs(self, value):
280 if value is None:
281 self.splitter = None
282 else:
283 self.splitter = MaxSubjobsSplitter(max_subjobs=value)
284
285 @property
286 def max_files_per_collector_job(self):
287 if self.splitter:
288 return self.splitter.max_files_per_subjob
289 else:
290 return None
291
292 @max_files_per_collector_job.setter
293 def max_files_per_collector_job(self, value):
294 if value is None:
295 self.splitter = None
296 else:
297 self.splitter = MaxFilesSplitter(max_files_per_subjob=value)
298
299
300class CalibrationBase(ABC, Thread):
301 """
302 Abstract base class of Calibration types. The CAF implements the :py:class:`Calibration` class which inherits from this and runs the C++ CalibrationCollectorModule and CalibrationAlgorithm classes. But by inheriting from this
303 class and providing the minimal necessary methods/attributes you could plug in your own Calibration types
304 that doesn't depend on the C++ CAF at all and run everything in your own way.
305
306 .. warning:: Writing your own class inheriting from :py:class:`CalibrationBase` class is not recommended!
307 But it's there if you really need it.
308
309 Parameters:
310 name (str): Name of this calibration object. Should be unique if you are going to run it.
311
312 Keyword Arguments:
313 input_files (list[str]): Input files for this calibration. May contain wildcard expressions useable by `glob.glob`.
314 """
315
317 end_state = "completed"
318
320 fail_state = "failed"
321
322 def __init__(self, name, input_files=None):
323 """
324 """
325 super().__init__()
326
327 self.name = name
328
329 self.future_dependencies = []
330
331 self.dependencies = []
332
338 self.files_to_iovs = {}
339 if input_files:
340
341 self.input_files = input_files
342 else:
343 self.input_files = []
344
345
346 self.iov = None
347
348 self.output_database_dir = ""
349
351 self.save_payloads = True
352
353 self.jobs_to_submit = []
354
355 @abstractmethod
356 def run(self):
357 """
358 The most important method. Runs inside a new Thread and is called from `CalibrationBase.start`
359 once the dependencies of this `CalibrationBase` have returned with state == end_state i.e. "completed".
360 """
361
362 @abstractmethod
363 def is_valid(self):
364 """
365 A simple method you should implement that will return True or False depending on whether
366 the Calibration has been set up correctly and can be run safely.
367 """
368
369 def depends_on(self, calibration):
370 """
371 Parameters:
372 calibration (`CalibrationBase`): The Calibration object which will produce constants that this one depends on.
373
374 Adds dependency of this calibration on another i.e. This calibration
375 will not run until the dependency has completed, and the constants produced
376 will be used via the database chain.
377
378 You can define multiple dependencies for a single calibration simply
379 by calling this multiple times. Be careful when adding the calibration into
380 the `CAF` not to add a circular/cyclic dependency. If you do the sort will return an
381 empty order and the `CAF` processing will fail.
382
383 This function appens to the `CalibrationBase.dependencies` and `CalibrationBase.future_dependencies` attributes of this
384 `CalibrationBase` and the input one respectively. This prevents us having to do too much recalculation later on.
385 """
386 # Check that we don't have two calibration names that are the same
387 if self.name != calibration.name:
388 # Tests if we have the calibrations added as dependencies already and adds if not
389 if calibration not in self.dependencies:
390 self.dependencies.append(calibration)
391 if self not in calibration.dependencies:
392 calibration.future_dependencies.append(self)
393 else:
394 B2WARNING(f"Tried to add {calibration} as a dependency for {self} but they have the same name."
395 "Dependency was not added.")
396
397 def dependencies_met(self):
398 """
399 Checks if all of the Calibrations that this one depends on have reached a successful end state.
400 """
401 return all(map(lambda x: x.state == x.end_state, self.dependencies))
402
403 def failed_dependencies(self):
404 """
405 Returns the list of calibrations in our dependency list that have failed.
406 """
407 failed = []
408 for calibration in self.dependencies:
409 if calibration.state == self.fail_state:
410 failed.append(calibration)
411 return failed
412
413 def _apply_calibration_defaults(self, defaults):
414 """
415 We pass in default calibration options from the `CAF` instance here if called.
416 Won't overwrite any options already set. """
417 for key, value in defaults.items():
418 try:
419 if getattr(self, key) is None:
420 setattr(self, key, value)
421 except AttributeError:
422 print(f"The calibration {self.name} does not support the attribute {key}.")
423
424
425class Calibration(CalibrationBase):
426 """
427 Every Calibration object must have at least one collector at least one algorithm.
428 You have the option to add in your collector/algorithm by argument here, or add them
429 later by changing the properties.
430
431 If you plan to use multiple `Collection` objects I recommend that you only set the name here and add the Collections
432 separately via `add_collection()`.
433
434 Parameters:
435 name (str): Name of this calibration. It should be unique for use in the `CAF`
436 Keyword Arguments:
437 collector (str, `basf2.Module`): Should be set to a CalibrationCollectorModule() or a string with the module name.
438 algorithms (list, ``ROOT.Belle2.CalibrationAlgorithm``): The algorithm(s) to use for this `Calibration`.
439 input_files (str, list[str]): Input files for use by this Calibration. May contain wildcards useable by `glob.glob`
440
441 A Calibration won't be valid in the `CAF` until it has all of these four attributes set. For example:
442
443 >>> cal = Calibration('TestCalibration1')
444 >>> col1 = register_module('CaTest')
445 >>> cal.add_collection('TestColl', col1)
446
447 or equivalently
448
449 >>> cal = Calibration('TestCalibration1', 'CaTest')
450
451 If you want to run a basf2 :py:class:`path <basf2.Path>` before your collector module when running over data
452
453 >>> cal.pre_collector_path = my_basf2_path
454
455 You don't have to put a RootInput module in this pre-collection path, but you can if
456 you need some special parameters. If you want to process sroot files the you have to explicitly add
457 SeqRootInput to your pre-collection path.
458 The inputFileNames parameter of (Seq)RootInput will be set by the CAF automatically for you.
459
460
461 You can use optional arguments to pass in some/all during initialisation of the `Calibration` class
462
463 >>> cal = Calibration( 'TestCalibration1', 'CaTest', [alg1,alg2], ['/path/to/file.root'])
464
465 you can change the input file list later on, before running with `CAF`
466
467 >>> cal.input_files = ['path/to/*.root', 'other/path/to/file2.root']
468
469 If you have multiple collections from calling `add_collection()` then you should instead set the pre_collector_path,
470 input_files, database chain etc from there. See `Collection`.
471
472 Adding the CalibrationAlgorithm(s) is easy
473
474 >>> alg1 = TestAlgo()
475 >>> cal.algorithms = alg1
476
477 Or equivalently
478
479 >>> cal.algorithms = [alg1]
480
481 Or for multiple algorithms for one collector
482
483 >>> alg2 = TestAlgo()
484 >>> cal.algorithms = [alg1, alg2]
485
486 Note that when you set the algorithms, they are automatically wrapped and stored as a Python class
487 `Algorithm`. To access the C++ algorithm clas underneath directly do:
488
489 >>> cal.algorithms[i].algorithm
490
491 If you have a setup function that you want to run before each of the algorithms, set that with
492
493 >>> cal.pre_algorithms = my_function_object
494
495 If you want a different setup for each algorithm use a list with the same number of elements
496 as your algorithm list.
497
498 >>> cal.pre_algorithms = [my_function1, my_function2, ...]
499
500 You can also specify the dependencies of the calibration on others
501
502 >>> cal.depends_on(cal2)
503
504 By doing this, the `CAF` will respect the ordering of the calibrations and will pass the
505 calibration constants created by earlier completed calibrations to dependent ones.
506 """
507
508 moves = ["submit_collector", "complete", "run_algorithms", "iterate", "fail_fully"]
509
510 alg_output_dir = "algorithm_output"
511
512 checkpoint_states = ["init", "collector_completed", "completed"]
513
514 default_collection_name = "default"
515
516 def __init__(self,
517 name,
518 collector=None,
519 algorithms=None,
520 input_files=None,
521 pre_collector_path=None,
522 database_chain=None,
523 output_patterns=None,
524 max_files_per_collector_job=None,
525 max_collector_jobs=None,
526 backend_args=None
527 ):
528 """
529 """
530
531 self.collections = {}
532
533 self._algorithms = []
534
535 # Default collection added, will have None type and requires setting later via `self.collector`, or will take the
536 # CollectorModule/module name directly.
537 self.add_collection(self.default_collection_name,
538 Collection(collector,
539 input_files,
540 pre_collector_path,
541 database_chain,
542 output_patterns,
543 max_files_per_collector_job,
544 max_collector_jobs,
545 backend_args
546 ))
547
548 super().__init__(name, input_files)
549 if algorithms:
550
553 self.algorithms = algorithms
554
555 self.results = {}
556
558 self.max_iterations = None
559
563 self.ignored_runs = None
564 if self.algorithms:
565
568 self.strategies = strategies.SingleIOV
569 if database_chain:
570
572 self.database_chain = database_chain
573 else:
574 self.database_chain = []
575 # This database is already applied to the `Collection` automatically, so don't do it again
576 for tag in reversed(b2conditions.default_globaltags):
577 self.use_central_database(tag, apply_to_default_collection=False)
578
581 self.algorithms_runner = runners.SeqAlgorithmsRunner
582
584 self.backend = None
585
589 self.collector_full_update_interval = 30
590
591 self.heartbeat = 3
592
593 self.machine = None
594
595 self._db_path = None
596
597 def add_collection(self, name, collection):
598 """
599 Parameters:
600 name (str): Unique name of this `Collection` in the Calibration.
601 collection (`Collection`): `Collection` object to use.
602
603 Adds a new `Collection` object to the `Calibration`. Any valid Collection will be used in the Calibration.
604 A default Collection is automatically added but isn't valid and won't run unless you have assigned a collector
605 + input files.
606 You can ignore the default one and only add your own custom Collections. You can configure the default from the
607 Calibration(...) arguments or after creating the Calibration object via directly setting the cal.collector, cal.input_files
608 attributes.
609 """
610 if name not in self.collections:
611 self.collections[name] = collection
612 else:
613 B2WARNING(f"A Collection with the name '{name}' already exists in this Calibration. It has not been added."
614 "Please use another name.")
615
616 def is_valid(self):
617 """
618 A full calibration consists of a collector AND an associated algorithm AND input_files.
619
620 Returns False if:
621 1) We are missing any of the above.
622 2) There are multiple Collections and the Collectors have mis-matched granularities.
623 3) Any of our Collectors have granularities that don't match what our Strategy can use. """
624 if not self.algorithms:
625 B2WARNING(f"Empty algorithm list for {self.name}.")
626 return False
627
628 if not any([collection.is_valid() for collection in self.collections.values()]):
629 B2WARNING(f"No valid Collections for {self.name}.")
630 return False
631
632 granularities = []
633 for collection in self.collections.values():
634 if collection.is_valid():
635 collector_params = collection.collector.available_params()
636 for param in collector_params:
637 if param.name == "granularity":
638 granularities.append(param.values)
639 if len(set(granularities)) > 1:
640 B2WARNING("Multiple different granularities set for the Collections in this Calibration.")
641 return False
642
643 for alg in self.algorithms:
644 alg_type = type(alg.algorithm).__name__
645 incorrect_gran = [granularity not in alg.strategy.allowed_granularities for granularity in granularities]
646 if any(incorrect_gran):
647 B2WARNING(f"Selected strategy for {alg_type} does not match a collector's granularity.")
648 return False
649 return True
650
651 def reset_database(self, apply_to_default_collection=True):
652 """
653 Keyword Arguments:
654 apply_to_default_collection (bool): Should we also reset the default collection?
655
656 Remove everything in the database_chain of this Calibration, including the default central database tag automatically
657 included from `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`. This will NOT affect the
658 database chain of any `Collection` other than the default one. You can prevent the default Collection from having its chain
659 reset by setting 'apply_to_default_collection' to False.
660 """
661 self.database_chain = []
662 if self.default_collection_name in self.collections and apply_to_default_collection:
663 self.collections[self.default_collection_name].reset_database()
664
665 def use_central_database(self, global_tag, apply_to_default_collection=True):
666 """
667 Parameters:
668 global_tag (str): The central database global tag to use for this calibration.
669
670 Keyword Arguments:
671 apply_to_default_collection (bool): Should we also call use_central_database on the default collection (if it exists)
672
673 Using this allows you to append a central database to the database chain for this calibration.
674 The default database chain is just the central one from
675 `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`.
676 To turn off central database completely or use a custom tag as the base, you should call `Calibration.reset_database`
677 and start adding databases with `Calibration.use_local_database` and `Calibration.use_central_database`.
678
679 Note that the database chain attached to the `Calibration` will only affect the default `Collection` (if it exists),
680 and the algorithm processes. So calling:
681
682 >> cal.use_central_database("global_tag")
683
684 will modify the database chain used by all the algorithms assigned to this `Calibration`, and modifies the database chain
685 assigned to
686
687 >> cal.collections['default'].database_chain
688
689 But calling
690
691 >> cal.use_central_database(file_path, payload_dir, False)
692
693 will add the database to the Algorithm processes, but leave the default Collection database chain untouched.
694 So if you have multiple Collections in this Calibration *their database chains are separate*.
695 To specify an additional `CentralDatabase` for a different collection, you will have to call:
696
697 >> cal.collections['OtherCollection'].use_central_database("global_tag")
698 """
699 central_db = CentralDatabase(global_tag)
700 self.database_chain.append(central_db)
701 if self.default_collection_name in self.collections and apply_to_default_collection:
702 self.collections[self.default_collection_name].use_central_database(global_tag)
703
704 def use_local_database(self, filename, directory="", apply_to_default_collection=True):
705 """
706 Parameters:
707 filename (str): The path to the database.txt of the local database
708
709 Keyword Argumemts:
710 directory (str): The path to the payloads directory for this local database.
711 apply_to_default_collection (bool): Should we also call use_local_database on the default collection (if it exists)
712
713 Append a local database to the chain for this calibration.
714 You can call this function multiple times and each database will be added to the chain IN ORDER.
715 The databases are applied to this calibration ONLY.
716 The Local and Central databases applied via these functions are applied to the algorithm processes and optionally
717 the default `Collection` job as a database chain.
718 There are other databases applied to the processes later, checked by basf2 in this order:
719
720 1) Local Database from previous iteration of this Calibration.
721 2) Local Database chain from output of previous dependent Calibrations.
722 3) This chain of Local and Central databases where the last added is checked first.
723
724 Note that this function on the `Calibration` object will only affect the default `Collection` if it exists and if
725 'apply_to_default_collection' remains True. So calling:
726
727 >> cal.use_local_database(file_path, payload_dir)
728
729 will modify the database chain used by all the algorithms assigned to this `Calibration`, and modifies the database chain
730 assigned to
731
732 >> cal.collections['default'].database_chain
733
734 But calling
735
736 >> cal.use_local_database(file_path, payload_dir, False)
737
738 will add the database to the Algorithm processes, but leave the default Collection database chain untouched.
739
740 If you have multiple Collections in this Calibration *their database chains are separate*.
741 To specify an additional `LocalDatabase` for a different collection, you will have to call:
742
743 >> cal.collections['OtherCollection'].use_local_database(file_path, payload_dir)
744
745 """
746 local_db = LocalDatabase(filename, directory)
747 self.database_chain.append(local_db)
748 if self.default_collection_name in self.collections and apply_to_default_collection:
749 self.collections[self.default_collection_name].use_local_database(filename, directory)
750
751 def _get_default_collection_attribute(self, attr):
752 if self.default_collection_name in self.collections:
753 return getattr(self.collections[self.default_collection_name], attr)
754 else:
755 B2WARNING(f"You tried to get the attribute '{attr}' from the Calibration '{self.name}', "
756 "but the default collection doesn't exist."
757 f"You should use the cal.collections['CollectionName'].{attr} to access a custom "
758 "collection's attributes directly.")
759 return None
760
761 def _set_default_collection_attribute(self, attr, value):
762 if self.default_collection_name in self.collections:
763 setattr(self.collections[self.default_collection_name], attr, value)
764 else:
765 B2WARNING(f"You tried to set the attribute '{attr}' from the Calibration '{self.name}', "
766 "but the default collection doesn't exist."
767 f"You should use the cal.collections['CollectionName'].{attr} to access a custom "
768 "collection's attributes directly.")
769
770 @property
771 def collector(self):
772 """
773 """
774 return self._get_default_collection_attribute("collector")
775
776 @collector.setter
777 def collector(self, collector):
778 """
779 """
780 # check if collector is already a module or if we need to create one
781 # from the name
782 from basf2 import Module
783 if isinstance(collector, str):
784 from basf2 import register_module
785 collector = register_module(collector)
786 if not isinstance(collector, Module):
787 B2ERROR("Collector needs to be either a Module or the name of such a module")
788
789 self._set_default_collection_attribute("collector", collector)
790
791 @property
792 def input_files(self):
793 """
794 """
795 return self._get_default_collection_attribute("input_files")
796
797 @input_files.setter
798 def input_files(self, files):
799 """
800 """
801 self._set_default_collection_attribute("input_files", files)
802
803 @property
804 def files_to_iovs(self):
805 """
806 """
807 return self._get_default_collection_attribute("files_to_iovs")
808
809 @files_to_iovs.setter
810 def files_to_iovs(self, file_map):
811 """
812 """
813 self._set_default_collection_attribute("files_to_iovs", file_map)
814
815 @property
816 def pre_collector_path(self):
817 """
818 """
819 return self._get_default_collection_attribute("pre_collector_path")
820
821 @pre_collector_path.setter
822 def pre_collector_path(self, path):
823 """
824 """
825 self._set_default_collection_attribute("pre_collector_path", path)
826
827 @property
828 def output_patterns(self):
829 """
830 """
831 return self._get_default_collection_attribute("output_patterns")
832
833 @output_patterns.setter
834 def output_patterns(self, patterns):
835 """
836 """
837 self._set_default_collection_attribute("output_patterns", patterns)
838
839 @property
840 def max_files_per_collector_job(self):
841 """
842 """
843 return self._get_default_collection_attribute("max_files_per_collector_job")
844
845 @max_files_per_collector_job.setter
846 def max_files_per_collector_job(self, max_files):
847 """
848 """
849 self._set_default_collection_attribute("max_files_per_collector_job", max_files)
850
851 @property
852 def max_collector_jobs(self):
853 """
854 """
855 return self._get_default_collection_attribute("max_collector_jobs")
856
857 @max_collector_jobs.setter
858 def max_collector_jobs(self, max_jobs):
859 """
860 """
861 self._set_default_collection_attribute("max_collector_jobs", max_jobs)
862
863 @property
864 def backend_args(self):
865 """
866 """
867 return self._get_default_collection_attribute("backend_args")
868
869 @backend_args.setter
870 def backend_args(self, args):
871 """
872 """
873 self._set_default_collection_attribute("backend_args", args)
874
875 @property
876 def algorithms(self):
877 """
878 """
879 return self._algorithms
880
881 @algorithms.setter
882 @method_dispatch
883 def algorithms(self, value):
884 """
885 """
886 from ROOT.Belle2 import CalibrationAlgorithm
887 if isinstance(value, CalibrationAlgorithm):
888 self._algorithms = [Algorithm(value)]
889 else:
890 B2ERROR(f"Something other than CalibrationAlgorithm instance passed in ({type(value)}). "
891 "Algorithm needs to inherit from Belle2::CalibrationAlgorithm")
892
893 @algorithms.fset.register(tuple)
894 @algorithms.fset.register(list)
895 def _(self, value):
896 """
897 Alternate algorithms setter for lists and tuples of CalibrationAlgorithms.
898 """
899 from ROOT.Belle2 import CalibrationAlgorithm
900 if value:
901 self._algorithms = []
902 for alg in value:
903 if isinstance(alg, CalibrationAlgorithm):
904 self._algorithms.append(Algorithm(alg))
905 else:
906 B2ERROR(f"Something other than CalibrationAlgorithm instance passed in {type(value)}."
907 "Algorithm needs to inherit from Belle2::CalibrationAlgorithm")
908
909 @property
910 def pre_algorithms(self):
911 """
912 Callback run prior to each algorithm iteration.
913 """
914 return [alg.pre_algorithm for alg in self.algorithms]
915
916 @pre_algorithms.setter
917 @method_dispatch
918 def pre_algorithms(self, func):
919 """
920 """
921 if func:
922 for alg in self.algorithms:
923 alg.pre_algorithm = func
924 else:
925 B2ERROR("Something evaluated as False passed in as pre_algorithm function.")
926
927 @pre_algorithms.fset.register(tuple)
928 @pre_algorithms.fset.register(list)
929 def _(self, values):
930 """
931 Alternate pre_algorithms setter for lists and tuples of functions, should be one per algorithm.
932 """
933 if values:
934 if len(values) == len(self.algorithms):
935 for func, alg in zip(values, self.algorithms):
936 alg.pre_algorithm = func
937 else:
938 B2ERROR("Number of functions and number of algorithms doesn't match.")
939 else:
940 B2ERROR("Empty container passed in for pre_algorithm functions")
941
942 @property
943 def strategies(self):
944 """
945 The `caf.strategies.AlgorithmStrategy` or `list` of them used when running the algorithm(s).
946 """
947 return [alg.strategy for alg in self.algorithms]
948
949 @strategies.setter
950 @method_dispatch
951 def strategies(self, strategy):
952 """
953 """
954 if strategy:
955 for alg in self.algorithms:
956 alg.strategy = strategy
957 else:
958 B2ERROR("Something evaluated as False passed in as a strategy.")
959
960 @strategies.fset.register(tuple)
961 @strategies.fset.register(list)
962 def _(self, values):
963 """
964 Alternate strategies setter for lists and tuples of functions, should be one per algorithm.
965 """
966 if values:
967 if len(values) == len(self.algorithms):
968 for strategy, alg in zip(strategies, self.algorithms):
969 alg.strategy = strategy
970 else:
971 B2ERROR("Number of strategies and number of algorithms doesn't match.")
972 else:
973 B2ERROR("Empty container passed in for strategies list")
974
975 def __repr__(self):
976 """
977 """
978 return self.name
979
980 def run(self):
981 """
982 Main logic of the Calibration object.
983 Will be run in a new Thread by calling the start() method.
984 """
985 with CAFDB(self._db_path, read_only=True) as db:
986 initial_state = db.get_calibration_value(self.name, "checkpoint")
987 initial_iteration = db.get_calibration_value(self.name, "iteration")
988 B2INFO(f"Initial status of {self.name} found to be state={initial_state}, iteration={initial_iteration}")
989 self.machine = CalibrationMachine(self,
990 iov_to_calibrate=self.iov,
991 initial_state=initial_state,
992 iteration=initial_iteration)
993 self.state = initial_state
994 self.machine.root_dir = Path(os.getcwd(), self.name)
995 self.machine.collector_backend = self.backend
996
997 # Before we start running, let's clean up any iteration directories from iterations above our initial one.
998 # Should prevent confusion between attempts if we fail again.
999 all_iteration_paths = find_int_dirs(self.machine.root_dir)
1000 for iteration_path in all_iteration_paths:
1001 if int(iteration_path.name) > initial_iteration:
1002 shutil.rmtree(iteration_path)
1003
1004 while self.state != self.end_state and self.state != self.fail_state:
1005 if self.state == "init":
1006 try:
1007 B2INFO(f"Attempting collector submission for calibration {self.name}.")
1008 self.machine.submit_collector()
1009 except Exception as err:
1010 B2FATAL(str(err))
1011
1012 self._poll_collector()
1013
1014 # If we failed take us to the final fail state
1015 if self.state == "collector_failed":
1016 self.machine.fail_fully()
1017 return
1018
1019 # It's possible that we might raise an error while attempting to run due
1020 # to some problems e.g. Missing collector output files
1021 # We catch the error and exit with failed state so the CAF will stop
1022 try:
1023 B2INFO(f"Attempting to run algorithms for calibration {self.name}.")
1024 self.machine.run_algorithms()
1025 except MachineError as err:
1026 B2ERROR(str(err))
1027 self.machine.fail()
1028
1029 # If we failed take us to the final fail state
1030 if self.machine.state == "algorithms_failed":
1031 self.machine.fail_fully()
1032 return
1033
1034 def _poll_collector(self):
1035 """
1036 """
1037 while self.state == "running_collector":
1038 try:
1039 self.machine.complete()
1040 # ConditionError is thrown when the condtions for the transition have returned false, it's not serious.
1041 except ConditionError:
1042 try:
1043 B2DEBUG(29, f"Checking if collector jobs for calibration {self.name} have failed.")
1044 self.machine.fail()
1045 except ConditionError:
1046 pass
1047 sleep(self.heartbeat) # Sleep until we want to check again
1048
1049 @property
1050 def state(self):
1051 """
1052 The current major state of the calibration in the database file. The machine may have a different state.
1053 """
1054 with CAFDB(self._db_path, read_only=True) as db:
1055 state = db.get_calibration_value(self.name, "state")
1056 return state
1057
1058 @state.setter
1059 def state(self, state):
1060 """
1061 """
1062 B2DEBUG(29, f"Setting {self.name} to state {state}.")
1063 with CAFDB(self._db_path) as db:
1064 db.update_calibration_value(self.name, "state", str(state))
1065 if state in self.checkpoint_states:
1066 db.update_calibration_value(self.name, "checkpoint", str(state))
1067 B2DEBUG(29, f"{self.name} set to {state}.")
1068
1069 @property
1070 def iteration(self):
1071 """
1072 Retrieves the current iteration number in the database file.
1073
1074 Returns:
1075 int: The current iteration number
1076 """
1077 with CAFDB(self._db_path, read_only=True) as db:
1078 iteration = db.get_calibration_value(self.name, "iteration")
1079 return iteration
1080
1081 @iteration.setter
1082 def iteration(self, iteration):
1083 """
1084 """
1085 B2DEBUG(29, f"Setting {self.name} to {iteration}.")
1086 with CAFDB(self._db_path) as db:
1087 db.update_calibration_value(self.name, "iteration", iteration)
1088 B2DEBUG(29, f"{self.name} set to {self.iteration}.")
1089
1090
1091class Algorithm():
1092 """
1093 Parameters:
1094 algorithm: The CalibrationAlgorithm instance that we want to execute.
1095 Keyword Arguments:
1096 data_input : An optional function that sets the input files of the algorithm.
1097 pre_algorithm : An optional function that runs just prior to execution of the algorithm.
1098 Useful for set up e.g. module initialisation
1099
1100 This is a simple wrapper class around the C++ CalibrationAlgorithm class.
1101 It helps to add functionality to algorithms for use by the Calibration and CAF classes rather
1102 than separating the logic into those classes directly.
1103
1104 This is **not** currently a class that a user should interact with much during `CAF`
1105 setup (unless you're doing something advanced).
1106 The `Calibration` class should be doing the most of the creation of the defaults for these objects.
1107
1108 Setting the `data_input` function might be necessary if you have set the `Calibration.output_patterns`.
1109 Also, setting the `pre_algorithm` to a function that should execute prior to each `strategies.AlgorithmStrategy`
1110 is often useful i.e. by calling for the Geometry module to initialise.
1111 """
1112
1113 def __init__(self, algorithm, data_input=None, pre_algorithm=None):
1114 """
1115 """
1116
1117 self.algorithm = algorithm
1118
1119 cppname = type(algorithm).__cpp_name__
1120 self.name = cppname[cppname.rfind('::') + 2:]
1121
1124 self.data_input = data_input
1125 if not self.data_input:
1126 self.data_input = self.default_inputdata_setup
1127
1130 self.pre_algorithm = pre_algorithm
1131
1133 self.strategy = strategies.SingleIOV
1134
1139 self.params = {}
1140
1141 def default_inputdata_setup(self, input_file_paths):
1142 """
1143 Simple setup to set the input file names to the algorithm. Applied to the data_input attribute
1144 by default. This simply takes all files returned from the `Calibration.output_patterns` and filters
1145 for only the CollectorOutput.root files. Then it sets them as input files to the CalibrationAlgorithm class.
1146 """
1147 collector_output_files = list(filter(lambda file_path: "CollectorOutput.root" == Path(file_path).name,
1148 input_file_paths))
1149 info_lines = [f"Input files used in {self.name}:"]
1150 info_lines.extend(collector_output_files)
1151 B2INFO_MULTILINE(info_lines)
1152 self.algorithm.setInputFileNames(collector_output_files)
1153
1154
1155class CAF():
1156 """
1157 Parameters:
1158 calibration_defaults (dict): A dictionary of default options for calibrations run by this `CAF` instance e.g.
1159
1160 >>> calibration_defaults={"max_iterations":2}
1161
1162 This class holds `Calibration` objects and processes them. It defines the initial configuration/setup
1163 for the calibrations. But most of the real processing is done through the `caf.state_machines.CalibrationMachine`.
1164
1165 The `CAF` class essentially does some initial setup, holds the `CalibrationBase` instances and calls the
1166 `CalibrationBase.start` when the dependencies are met.
1167
1168 Much of the checking for consistency is done in this class so that no processing is done with an invalid setup. Choosing which files to use as input should be done from outside during the setup of the `CAF` and
1169 `CalibrationBase` instances.
1170 """
1171
1172
1173 _db_name = "caf_state.db"
1174
1175 default_calibration_config = {
1176 "max_iterations": 5,
1177 "ignored_runs": []
1178 }
1179
1180 def __init__(self, calibration_defaults=None):
1181 """
1182 """
1183
1184 self.calibrations = {}
1185
1187 self.future_dependencies = {}
1188
1190 self.dependencies = {}
1191
1192 self.output_dir = "calibration_results"
1193
1194 self.order = None
1195
1196 self._backend = None
1197
1198 self.heartbeat = 5
1199
1200 if not calibration_defaults:
1201 calibration_defaults = {}
1202
1204 self.calibration_defaults = {**self.default_calibration_config, **calibration_defaults}
1205
1206 self._db_path = None
1207
1208 def add_calibration(self, calibration):
1209 """
1210 Adds a `Calibration` that is to be used in this program to the list.
1211 Also adds an empty dependency list to the overall dictionary.
1212 You should not directly alter a `Calibration` object after it has been
1213 added here.
1214 """
1215 if calibration.is_valid():
1216 if calibration.name not in self.calibrations:
1217 self.calibrations[calibration.name] = calibration
1218 else:
1219 B2WARNING(f"Tried to add a calibration with the name {calibration.name} twice.")
1220 else:
1221 B2WARNING(f"Tried to add incomplete/invalid calibration ({calibration.name}) to the framwork."
1222 "It was not added and will not be part of the final process.")
1223
1224 def _remove_missing_dependencies(self):
1225 """
1226 This checks the future and past dependencies of each `Calibration` in the `CAF`.
1227 If any dependencies are not known to the `CAF` then they are removed from the `Calibration`
1228 object directly.
1229 """
1230 calibration_names = [calibration.name for calibration in self.calibrations.values()]
1231
1232 def is_dependency_in_caf(dependency):
1233 """
1234 Quick function to use with filter() and check dependencies against calibrations known to `CAF`
1235 """
1236 dependency_in_caf = dependency.name in calibration_names
1237 if not dependency_in_caf:
1238 B2WARNING(f"The calibration {dependency.name} is a required dependency but is not in the CAF."
1239 " It has been removed as a dependency.")
1240 return dependency_in_caf
1241
1242 # Check that there aren't dependencies on calibrations not added to the framework
1243 # Remove them from the calibration objects if there are.
1244 for calibration in self.calibrations.values():
1245 filtered_future_dependencies = list(filter(is_dependency_in_caf, calibration.future_dependencies))
1246 calibration.future_dependencies = filtered_future_dependencies
1247
1248 filtered_dependencies = list(filter(is_dependency_in_caf, calibration.dependencies))
1249 calibration.dependencies = filtered_dependencies
1250
1251 def _order_calibrations(self):
1252 """
1253 - Uses dependency atrributes of calibrations to create a dependency dictionary and passes it
1254 to a sorting algorithm.
1255 - Returns valid OrderedDict if sort was succesful, empty one if it failed (most likely a cyclic dependency)
1256 """
1257 # First remove any dependencies on calibrations not added to the CAF
1258 self._remove_missing_dependencies()
1259 # Filling dependencies dictionaries of CAF for sorting, only explicit dependencies for now
1260 # Note that they currently use the names not the calibration objects.
1261 for calibration in self.calibrations.values():
1262 future_dependencies_names = [dependency.name for dependency in calibration.future_dependencies]
1263 past_dependencies_names = [dependency.name for dependency in calibration.dependencies]
1264
1265 self.future_dependencies[calibration.name] = future_dependencies_names
1266 self.dependencies[calibration.name] = past_dependencies_names
1267 # Gives us a list of A (not THE) valid ordering and checks for cyclic dependencies
1268 order = topological_sort(self.future_dependencies)
1269 if not order:
1270 return False
1271
1272 # Get an ordered dictionary of the sort order but including all implicit dependencies.
1273 ordered_full_dependencies = all_dependencies(self.future_dependencies, order)
1274
1275 # Return all the implicit+explicit past dependencies
1276 full_past_dependencies = past_from_future_dependencies(ordered_full_dependencies)
1277 # Correct each calibration's dependency list to reflect the implicit dependencies
1278 for calibration in self.calibrations.values():
1279 full_deps = full_past_dependencies[calibration.name]
1280 explicit_deps = [cal.name for cal in calibration.dependencies]
1281 for dep in full_deps:
1282 if dep not in explicit_deps:
1283 calibration.dependencies.append(self.calibrations[dep])
1284 # At this point the calibrations have their full dependencies but they aren't in topological
1285 # sort order. Correct that here
1286 ordered_dependency_list = []
1287 for ordered_calibration_name in order:
1288 if ordered_calibration_name in [dep.name for dep in calibration.dependencies]:
1289 ordered_dependency_list.append(self.calibrations[ordered_calibration_name])
1290 calibration.dependencies = ordered_dependency_list
1291 order = ordered_full_dependencies
1292 # We should also patch in all of the implicit dependencies for the calibrations
1293 return order
1294
1295 def _check_backend(self):
1296 """
1297 Makes sure that the CAF has a valid backend setup. If one isn't set by the user (or if the one that is stored isn't a valid Backend object) we should create a default Local backend.
1298 """
1299 if not isinstance(self._backend, caf.backends.Backend):
1300
1301 self.backend = caf.backends.Local()
1302
1303 def _prune_invalid_collections(self):
1304 """
1305 Checks all current calibrations and removes any invalid Collections from their collections list.
1306 """
1307 B2INFO("Checking for any invalid Collections in Calibrations.")
1308 for calibration in self.calibrations.values():
1309 valid_collections = {}
1310 for name, collection in calibration.collections.items():
1311 if collection.is_valid():
1312 valid_collections[name] = collection
1313 else:
1314 B2WARNING(f"Removing invalid Collection '{name}' from Calibration '{calibration.name}'.")
1315 calibration.collections = valid_collections
1316
1317 def run(self, iov=None):
1318 """
1319 Keyword Arguments:
1320 iov(`caf.utils.IoV`): IoV to calibrate for this processing run. Only the input files necessary to calibrate
1321 this IoV will be used in the collection step.
1322
1323 This function runs the overall calibration job, saves the outputs to the output_dir directory,
1324 and creates database payloads.
1325
1326 Upload of final databases is not done here. This simply creates the local databases in
1327 the output directory. You should check the validity of your new local database before uploading
1328 to the conditions DB via the basf2 tools/interface to the DB.
1329 """
1330 if not self.calibrations:
1331 B2FATAL("There were no Calibration objects to run. Maybe you tried to add invalid ones?")
1332 # Checks whether the dependencies we've added will give a valid order
1333 order = self._order_calibrations()
1334 if not order:
1335 B2FATAL("Couldn't order the calibrations properly. Could be a cyclic dependency.")
1336
1337 # Check that a backend has been set and use default Local() one if not
1338 self._check_backend()
1339
1340 self._prune_invalid_collections()
1341
1342 # Creates the overall output directory and reset the attribute to use an absolute path to it.
1343 self.output_dir = self._make_output_dir()
1344
1345 # Creates a SQLite DB to save the status of the various calibrations
1346 self._make_database()
1347
1348 # Enter the overall output dir during processing and opena connection to the DB
1349 with temporary_workdir(self.output_dir):
1350 db = CAFDB(self._db_path)
1351 db.open()
1352 db_initial_calibrations = db.query("select * from calibrations").fetchall()
1353 for calibration in self.calibrations.values():
1354 # Apply defaults given to the `CAF` to the calibrations if they aren't set
1355 calibration._apply_calibration_defaults(self.calibration_defaults)
1356 calibration._db_path = self._db_path
1357 calibration.output_database_dir = Path(self.output_dir, calibration.name, "outputdb").as_posix()
1358 calibration.iov = iov
1359 if not calibration.backend:
1360 calibration.backend = self.backend
1361 # Do some checking of the db to see if we need to add an entry for this calibration
1362 if calibration.name not in [db_cal[0] for db_cal in db_initial_calibrations]:
1363 db.insert_calibration(calibration.name)
1364 db.commit()
1365 else:
1366 for cal_info in db_initial_calibrations:
1367 if cal_info[0] == calibration.name:
1368 cal_initial_state = cal_info[2]
1369 cal_initial_iteration = cal_info[3]
1370 B2INFO(f"Previous entry in database found for {calibration.name}.")
1371 B2INFO(f"Setting {calibration.name} state to checkpoint state '{cal_initial_state}'.")
1372 calibration.state = cal_initial_state
1373 B2INFO(f"Setting {calibration.name} iteration to '{cal_initial_iteration}'.")
1374 calibration.iteration = cal_initial_iteration
1375 # Daemonize so that it exits if the main program exits
1376 calibration.daemon = True
1377
1378 db.close()
1379
1380 # Is it possible to keep going?
1381 keep_running = True
1382 while keep_running:
1383 keep_running = False
1384 # Do we have calibrations that may yet complete?
1385 remaining_calibrations = []
1386
1387 for calibration in self.calibrations.values():
1388 # Find the currently ended calibrations (may not be joined yet)
1389 if (calibration.state == CalibrationBase.end_state or calibration.state == CalibrationBase.fail_state):
1390 # Search for any alive Calibrations and join them
1391 if calibration.is_alive():
1392 B2DEBUG(29, f"Joining {calibration.name}.")
1393 calibration.join()
1394 else:
1395 if calibration.dependencies_met():
1396 if not calibration.is_alive():
1397 B2DEBUG(29, f"Starting {calibration.name}.")
1398 try:
1399 calibration.start()
1400 except RuntimeError:
1401 # Catch the case when the calibration just finished so it ended up here
1402 # in the "else" and not above where it should have been joined.
1403 B2DEBUG(29, f"{calibration.name} probably just finished, join it later.")
1404 remaining_calibrations.append(calibration)
1405 else:
1406 if not calibration.failed_dependencies():
1407 remaining_calibrations.append(calibration)
1408 if remaining_calibrations:
1409 keep_running = True
1410 # Loop over jobs that the calibrations want submitted and submit them.
1411 # We do this here because some backends don't like us submitting in parallel from multiple CalibrationThreads
1412 # So this is like a mini job queue without getting too clever with it
1413 for calibration in remaining_calibrations:
1414 for job in calibration.jobs_to_submit[:]:
1415 calibration.backend.submit(job)
1416 calibration.jobs_to_submit.remove(job)
1417 sleep(self.heartbeat)
1418
1419 B2INFO("Printing summary of final CAF status.")
1420 with CAFDB(self._db_path, read_only=True) as db:
1421 print(db.output_calibration_table())
1422
1423 @property
1424 def backend(self):
1425 """
1426 The `backend <backends.Backend>` that runs the collector job.
1427 When set, this is checked that a `backends.Backend` class instance was passed in.
1428 """
1429 return self._backend
1430
1431 @backend.setter
1432 def backend(self, backend):
1433 """
1434 """
1435 if isinstance(backend, caf.backends.Backend):
1436 self._backend = backend
1437 else:
1438 B2ERROR('Backend property must inherit from Backend class.')
1439
1440 def _make_output_dir(self):
1441 """
1442 Creates the output directory. If it already exists we are now going to try and restart the program from the last state.
1443
1444 Returns:
1445 str: The absolute path of the new output_dir
1446 """
1447 p = Path(self.output_dir).resolve()
1448 if p.is_dir():
1449 B2INFO(f"{p.as_posix()} output directory already exists. "
1450 "We will try to restart from the previous finishing state.")
1451 return p.as_posix()
1452 else:
1453 p.mkdir(parents=True)
1454 if p.is_dir():
1455 return p.as_posix()
1456 else:
1457 raise FileNotFoundError(f"Attempted to create output_dir {p.as_posix()}, but it didn't work.")
1458
1459 def _make_database(self):
1460 """
1461 Creates the CAF status database. If it already exists we don't overwrite it. """
1462 self._db_path = Path(self.output_dir, self._db_name).absolute()
1463 if self._db_path.exists():
1464 B2INFO(f"Previous CAF database found {self._db_path}")
1465 # Will create a new database + tables, or do nothing but checks we can connect to existing one
1466 with CAFDB(self._db_path):
1467 pass
1468
1469# @endcond
1470