Belle II Software development
framework.py
1#!/usr/bin/env python3
2
3# disable doxygen check for this file
4# @cond
5
6
13
14"""
15This module implements several objects/functions to configure and run calibrations.
16These classes are used to construct the workflow of the calibration job.
17The actual processing code is mostly in the `caf.state_machines` module.
18"""
19
20__all__ = ["CalibrationBase", "Calibration", "Algorithm", "CAF"]
21
22import os
23from threading import Thread
24from time import sleep
25from pathlib import Path
26import shutil
27from glob import glob
28
29from basf2 import B2ERROR, B2WARNING, B2INFO, B2FATAL, B2DEBUG
30from basf2 import find_file
31from basf2 import conditions as b2conditions
32
33from abc import ABC, abstractmethod
34
35import caf
36from caf.utils import B2INFO_MULTILINE
37from caf.utils import past_from_future_dependencies
38from caf.utils import topological_sort
39from caf.utils import all_dependencies
40from caf.utils import method_dispatch
41from caf.utils import temporary_workdir
42from caf.utils import find_int_dirs
43from caf.utils import LocalDatabase
44from caf.utils import CentralDatabase
45from caf.utils import parse_file_uri
46
47import caf.strategies as strategies
48import caf.runners as runners
49from caf.backends import MaxSubjobsSplitter, MaxFilesSplitter
50from caf.state_machines import CalibrationMachine, ConditionError, MachineError
51from caf.database import CAFDB
52
53
54class Collection():
55 """
56 Keyword Arguments:
57 collector (str, basf2.Module): The collector module or module name for this `Collection`.
58 input_files (list[str]): The input files to be used for only this `Collection`.
59 pre_collection_path (basf2.Path): The reconstruction `basf2.Path` to be run prior to the Collector module.
60 database_chain (list[CentralDatabase, LocalDatabase]): The database chain to be used initially for this `Collection`.
61 output_patterns (list[str]): Output patterns of files produced by collector which will be used to pass to the
62 `Algorithm.data_input` function. Setting this here, replaces the default completely.
63 max_files_for_collector_job (int): Maximum number of input files sent to each collector subjob for this `Collection`.
64 Technically this sets the SubjobSplitter to be used, not compatible with max_collector_jobs.
65 max_collector_jobs (int): Maximum number of collector subjobs for this `Collection`.
66 Input files are split evenly between them. Technically this sets the SubjobSplitter to be used. Not compatible with
67 max_files_for_collector_job.
68 backend_args (dict): The args for the backend submission of this `Collection`.
69 """
70
72 default_max_collector_jobs = 1000
73
75 job_config = "collector_job.json"
76
77 def __init__(self,
78 collector=None,
79 input_files=None,
80 pre_collector_path=None,
81 database_chain=None,
82 output_patterns=None,
83 max_files_per_collector_job=None,
84 max_collector_jobs=None,
85 backend_args=None
86 ):
87
88 self.collector = collector
89
90 self.input_files = []
91 if input_files:
92 self.input_files = input_files
93
99 self.files_to_iovs = {}
100
104 self.pre_collector_path = None
105 if pre_collector_path:
106 self.pre_collector_path = pre_collector_path
107
111 self.output_patterns = ["CollectorOutput.root"]
112 if output_patterns:
113 self.output_patterns = output_patterns
114
115
117 self.splitter = None
118 if max_files_per_collector_job and max_collector_jobs:
119 B2FATAL("Cannot set both 'max_files_per_collector_job' and 'max_collector_jobs' of a collection!")
120 elif max_files_per_collector_job:
121 self.max_files_per_collector_job = max_files_per_collector_job
122 elif max_collector_jobs:
123 self.max_collector_jobs = max_collector_jobs
124 else:
125 self.max_collector_jobs = self.default_max_collector_jobs
126
127
129 self.backend_args = {}
130 if backend_args:
131 self.backend_args = backend_args
132
133 if database_chain:
134
137 self.database_chain = database_chain
138 else:
139 self.database_chain = []
140 # This may seem weird but the changes to the DB interface mean that they have effectively swapped from being
141 # described well by appending to a list to a deque. So we do bit of reversal to translate it back and make the
142 # most important GT the last one encountered.
143 for tag in reversed(b2conditions.default_globaltags):
144 self.use_central_database(tag)
145
146 self.job_script = Path(find_file("calibration/scripts/caf/run_collector_path.py")).absolute()
147 """The basf2 steering file that will be used for Collector jobs run by this collection.
148This script will be copied into subjob directories as part of the input sandbox."""
149
150
151 self.job_cmd = ["basf2", self.job_script.name, "--job-information job_info.json"]
152
153 def reset_database(self):
154 """
155 Remove everything in the database_chain of this Calibration, including the default central database
156 tag automatically included from `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`.
157 """
158 self.database_chain = []
159
160 def use_central_database(self, global_tag):
161 """
162 Parameters:
163 global_tag (str): The central database global tag to use for this calibration.
164
165 Using this allows you to add a central database to the head of the global tag database chain for this collection.
166 The default database chain is just the central one from
167 `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`.
168 The input file global tag will always be overridden and never used unless explicitly set.
169
170 To turn off central database completely or use a custom tag as the base, you should call `Calibration.reset_database`
171 and start adding databases with `Calibration.use_local_database` and `Calibration.use_central_database`.
172
173 Alternatively you could set an empty list as the input database_chain when adding the Collection to the Calibration.
174
175 NOTE!! Since ``release-04-00-00`` the behaviour of basf2 conditions databases has changed.
176 All local database files MUST now be at the head of the 'chain', with all central database global tags in their own
177 list which will be checked after all local database files have been checked.
178
179 So even if you ask for ``["global_tag1", "localdb/database.txt", "global_tag2"]`` to be the database chain, the real order
180 that basf2 will use them is ``["global_tag1", "global_tag2", "localdb/database.txt"]`` where the file is checked first.
181 """
182 central_db = CentralDatabase(global_tag)
183 self.database_chain.append(central_db)
184
185 def use_local_database(self, filename, directory=""):
186 """
187 Parameters:
188 filename (str): The path to the database.txt of the local database
189 directory (str): The path to the payloads directory for this local database.
190
191 Append a local database to the chain for this collection.
192 You can call this function multiple times and each database will be added to the chain IN ORDER.
193 The databases are applied to this collection ONLY.
194
195 NOTE!! Since release-04-00-00 the behaviour of basf2 conditions databases has changed.
196 All local database files MUST now be at the head of the 'chain', with all central database global tags in their own
197 list which will be checked after all local database files have been checked.
198
199 So even if you ask for ["global_tag1", "localdb/database.txt", "global_tag2"] to be the database chain, the real order
200 that basf2 will use them is ["global_tag1", "global_tag2", "localdb/database.txt"] where the file is checked first.
201 """
202 local_db = LocalDatabase(filename, directory)
203 self.database_chain.append(local_db)
204
205 @staticmethod
206 def uri_list_from_input_file(input_file):
207 """
208 Parameters:
209 input_file (str): A local file/glob pattern or XROOTD URI
210
211 Returns:
212 list: A list of the URIs found from the initial string.
213 """
214 # By default we assume it is a local file path if no "scheme" is given
215 uri = parse_file_uri(input_file)
216 if uri.scheme == "file":
217 # For local files we also perform a glob just in case it is a wildcard pattern.
218 # That way we will have all the uris of files separately
219 uris = [parse_file_uri(f).geturl() for f in glob(input_file)]
220 else:
221 # Just let everything else through and hop the backend can figure it out
222 uris = [input_file]
223 return uris
224
225 @property
226 def input_files(self):
227 return self._input_files
228
229 @input_files.setter
230 def input_files(self, value):
231 if isinstance(value, str):
232 # If it's a string, we convert to a list of URIs
233 self._input_files = self.uri_list_from_input_file(value)
234 elif isinstance(value, list):
235 # If it's a list we loop and do the same thing
236 total_files = []
237 for pattern in value:
238 total_files.extend(self.uri_list_from_input_file(pattern))
239 self._input_files = total_files
240 else:
241 raise TypeError("Input files must be a list or string")
242
243 @property
244 def collector(self):
245 """
246 """
247 return self._collector
248
249 @collector.setter
250 def collector(self, collector):
251 """
252 """
253 # check if collector is already a module or if we need to create one
254 # from the name
255 if collector:
256 from basf2 import Module
257 if isinstance(collector, str):
258 from basf2 import register_module
259 collector = register_module(collector)
260 if not isinstance(collector, Module):
261 B2ERROR("Collector needs to be either a Module or the name of such a module")
262
263 self._collector = collector
264
265 def is_valid(self):
266 if (not self.collector or not self.input_files):
267 return False
268 else:
269 return True
270
271 @property
272 def max_collector_jobs(self):
273 if self.splitter:
274 return self.splitter.max_subjobs
275 else:
276 return None
277
278 @max_collector_jobs.setter
279 def max_collector_jobs(self, value):
280 if value is None:
281 self.splitter = None
282 else:
283 self.splitter = MaxSubjobsSplitter(max_subjobs=value)
284
285 @property
286 def max_files_per_collector_job(self):
287 if self.splitter:
288 return self.splitter.max_files_per_subjob
289 else:
290 return None
291
292 @max_files_per_collector_job.setter
293 def max_files_per_collector_job(self, value):
294 if value is None:
295 self.splitter = None
296 else:
297 self.splitter = MaxFilesSplitter(max_files_per_subjob=value)
298
299
300class CalibrationBase(ABC, Thread):
301 """
302 Abstract base class of Calibration types. The CAF implements the :py:class:`Calibration` class which inherits from this and runs the C++ CalibrationCollectorModule and CalibrationAlgorithm classes. But by inheriting from this
303 class and providing the minimal necessary methods/attributes you could plug in your own Calibration types
304 that doesn't depend on the C++ CAF at all and run everything in your own way.
305
306 .. warning:: Writing your own class inheriting from :py:class:`CalibrationBase` class is not recommended!
307 But it's there if you really need it.
308
309 Parameters:
310 name (str): Name of this calibration object. Should be unique if you are going to run it.
311
312 Keyword Arguments:
313 input_files (list[str]): Input files for this calibration. May contain wildcard expressions usable by `glob.glob`.
314 """
315
317 end_state = "completed"
318
320 fail_state = "failed"
321
322 def __init__(self, name, input_files=None):
323 """
324 """
325 super().__init__()
326
327 self.name = name
328
329 self.future_dependencies = []
330
331 self.dependencies = []
332
338 self.files_to_iovs = {}
339 if input_files:
340
341 self.input_files = input_files
342 else:
343 self.input_files = []
344
345
346 self.iov = None
347
348 self.output_database_dir = ""
349
351 self.save_payloads = True
352
353 self.jobs_to_submit = []
354
355 @abstractmethod
356 def run(self):
357 """
358 The most important method. Runs inside a new Thread and is called from `CalibrationBase.start`
359 once the dependencies of this `CalibrationBase` have returned with state == end_state i.e. "completed".
360 """
361
362 @abstractmethod
363 def is_valid(self):
364 """
365 A simple method you should implement that will return True or False depending on whether
366 the Calibration has been set up correctly and can be run safely.
367 """
368
369 def depends_on(self, calibration):
370 """
371 Parameters:
372 calibration (`CalibrationBase`): The Calibration object which will produce constants that this one depends on.
373
374 Adds dependency of this calibration on another i.e. This calibration
375 will not run until the dependency has completed, and the constants produced
376 will be used via the database chain.
377
378 You can define multiple dependencies for a single calibration simply
379 by calling this multiple times. Be careful when adding the calibration into
380 the `CAF` not to add a circular/cyclic dependency. If you do the sort will return an
381 empty order and the `CAF` processing will fail.
382
383 This function appends to the `CalibrationBase.dependencies` and `CalibrationBase.future_dependencies` attributes of this
384 `CalibrationBase` and the input one respectively. This prevents us having to do too much recalculation later on.
385 """
386 # Check that we don't have two calibration names that are the same
387 if self.name != calibration.name:
388 # Tests if we have the calibrations added as dependencies already and adds if not
389 if calibration not in self.dependencies:
390 self.dependencies.append(calibration)
391 if self not in calibration.dependencies:
392 calibration.future_dependencies.append(self)
393 else:
394 B2WARNING(f"Tried to add {calibration} as a dependency for {self} but they have the same name."
395 "Dependency was not added.")
396
397 def dependencies_met(self):
398 """
399 Checks if all of the Calibrations that this one depends on have reached a successful end state.
400 """
401 return all(map(lambda x: x.state == x.end_state, self.dependencies))
402
403 def failed_dependencies(self):
404 """
405 Returns the list of calibrations in our dependency list that have failed.
406 """
407 failed = []
408 for calibration in self.dependencies:
409 if calibration.state == self.fail_state:
410 failed.append(calibration)
411 return failed
412
413 def _apply_calibration_defaults(self, defaults):
414 """
415 We pass in default calibration options from the `CAF` instance here if called.
416 Won't overwrite any options already set. """
417 for key, value in defaults.items():
418 try:
419 if getattr(self, key) is None:
420 setattr(self, key, value)
421 except AttributeError:
422 print(f"The calibration {self.name} does not support the attribute {key}.")
423
424
425class Calibration(CalibrationBase):
426 """
427 Every Calibration object must have at least one collector at least one algorithm.
428 You have the option to add in your collector/algorithm by argument here, or add them
429 later by changing the properties.
430
431 If you plan to use multiple `Collection` objects I recommend that you only set the name here and add the Collections
432 separately via `add_collection()`.
433
434 Parameters:
435 name (str): Name of this calibration. It should be unique for use in the `CAF`
436 Keyword Arguments:
437 collector (str, `basf2.Module`): Should be set to a CalibrationCollectorModule() or a string with the module name.
438 algorithms (list, ``ROOT.Belle2.CalibrationAlgorithm``): The algorithm(s) to use for this `Calibration`.
439 input_files (str, list[str]): Input files for use by this Calibration. May contain wildcards usable by `glob.glob`
440
441 A Calibration won't be valid in the `CAF` until it has all of these four attributes set. For example:
442
443 >>> cal = Calibration('TestCalibration1')
444 >>> col1 = register_module('CaTest')
445 >>> cal.add_collection('TestColl', col1)
446
447 or equivalently
448
449 >>> cal = Calibration('TestCalibration1', 'CaTest')
450
451 If you want to run a basf2 :py:class:`path <basf2.Path>` before your collector module when running over data
452
453 >>> cal.pre_collector_path = my_basf2_path
454
455 You don't have to put a RootInput module in this pre-collection path, but you can if
456 you need some special parameters. If you want to process sroot files the you have to explicitly add
457 SeqRootInput to your pre-collection path.
458 The inputFileNames parameter of (Seq)RootInput will be set by the CAF automatically for you.
459
460
461 You can use optional arguments to pass in some/all during initialisation of the `Calibration` class
462
463 >>> cal = Calibration( 'TestCalibration1', 'CaTest', [alg1,alg2], ['/path/to/file.root'])
464
465 you can change the input file list later on, before running with `CAF`
466
467 >>> cal.input_files = ['path/to/*.root', 'other/path/to/file2.root']
468
469 If you have multiple collections from calling `add_collection()` then you should instead set the pre_collector_path,
470 input_files, database chain etc from there. See `Collection`.
471
472 Adding the CalibrationAlgorithm(s) is easy
473
474 >>> alg1 = TestAlgo()
475 >>> cal.algorithms = alg1
476
477 Or equivalently
478
479 >>> cal.algorithms = [alg1]
480
481 Or for multiple algorithms for one collector
482
483 >>> alg2 = TestAlgo()
484 >>> cal.algorithms = [alg1, alg2]
485
486 Note that when you set the algorithms, they are automatically wrapped and stored as a Python class
487 `Algorithm`. To access the C++ algorithm clas underneath directly do:
488
489 >>> cal.algorithms[i].algorithm
490
491 If you have a setup function that you want to run before each of the algorithms, set that with
492
493 >>> cal.pre_algorithms = my_function_object
494
495 If you want a different setup for each algorithm use a list with the same number of elements
496 as your algorithm list.
497
498 >>> cal.pre_algorithms = [my_function1, my_function2, ...]
499
500 You can also specify the dependencies of the calibration on others
501
502 >>> cal.depends_on(cal2)
503
504 By doing this, the `CAF` will respect the ordering of the calibrations and will pass the
505 calibration constants created by earlier completed calibrations to dependent ones.
506 """
507
508 moves = ["submit_collector", "complete", "run_algorithms", "iterate", "fail_fully"]
509
510 alg_output_dir = "algorithm_output"
511
512 checkpoint_states = ["init", "collector_completed", "completed"]
513
514 default_collection_name = "default"
515
516 def __init__(self,
517 name,
518 collector=None,
519 algorithms=None,
520 input_files=None,
521 pre_collector_path=None,
522 database_chain=None,
523 output_patterns=None,
524 max_files_per_collector_job=None,
525 max_collector_jobs=None,
526 backend_args=None
527 ):
528 """
529 """
530
531 self.collections = {}
532
533 self._algorithms = []
534
535 # Default collection added, will have None type and requires setting later via `self.collector`, or will take the
536 # CollectorModule/module name directly.
537 self.add_collection(self.default_collection_name,
538 Collection(collector,
539 input_files,
540 pre_collector_path,
541 database_chain,
542 output_patterns,
543 max_files_per_collector_job,
544 max_collector_jobs,
545 backend_args
546 ))
547
548 super().__init__(name, input_files)
549 if algorithms:
550
553 self.algorithms = algorithms
554
555 self.results = {}
556
558 self.max_iterations = None
559
563 self.ignored_runs = None
564 if self.algorithms:
565
568 self.strategies = strategies.SingleIOV
569 if database_chain:
570
572 self.database_chain = database_chain
573 else:
574 self.database_chain = []
575 # This database is already applied to the `Collection` automatically, so don't do it again
576 for tag in reversed(b2conditions.default_globaltags):
577 self.use_central_database(tag, apply_to_default_collection=False)
578
581 self.algorithms_runner = runners.SeqAlgorithmsRunner
582
584 self.backend = None
585
589 self.collector_full_update_interval = 30
590
591 self.heartbeat = 3
592
593 self.machine = None
594
595 self._db_path = None
596
597 def add_collection(self, name, collection):
598 """
599 Parameters:
600 name (str): Unique name of this `Collection` in the Calibration.
601 collection (`Collection`): `Collection` object to use.
602
603 Adds a new `Collection` object to the `Calibration`. Any valid Collection will be used in the Calibration.
604 A default Collection is automatically added but isn't valid and won't run unless you have assigned a collector
605 + input files.
606 You can ignore the default one and only add your own custom Collections. You can configure the default from the
607 Calibration(...) arguments or after creating the Calibration object via directly setting the cal.collector, cal.input_files
608 attributes.
609 """
610 if name not in self.collections:
611 self.collections[name] = collection
612 else:
613 B2WARNING(f"A Collection with the name '{name}' already exists in this Calibration. It has not been added."
614 "Please use another name.")
615
616 def is_valid(self):
617 """
618 A full calibration consists of a collector AND an associated algorithm AND input_files.
619
620 Returns False if:
621 1) We are missing any of the above.
622 2) There are multiple Collections and the Collectors have mis-matched granularities.
623 3) Any of our Collectors have granularities that don't match what our Strategy can use. """
624 if not self.algorithms:
625 B2WARNING(f"Empty algorithm list for {self.name}.")
626 return False
627
628 if not any([collection.is_valid() for collection in self.collections.values()]):
629 B2WARNING(f"No valid Collections for {self.name}.")
630 return False
631
632 granularities = []
633 for collection in self.collections.values():
634 if collection.is_valid():
635 collector_params = collection.collector.available_params()
636 for param in collector_params:
637 if param.name == "granularity":
638 granularities.append(param.values)
639 if len(set(granularities)) > 1:
640 B2WARNING("Multiple different granularities set for the Collections in this Calibration.")
641 return False
642
643 for alg in self.algorithms:
644 alg_type = type(alg.algorithm).__name__
645 incorrect_gran = [granularity not in alg.strategy.allowed_granularities for granularity in granularities]
646 if any(incorrect_gran):
647 B2WARNING(f"Selected strategy for {alg_type} does not match a collector's granularity.")
648 return False
649 return True
650
651 def reset_database(self, apply_to_default_collection=True):
652 """
653 Keyword Arguments:
654 apply_to_default_collection (bool): Should we also reset the default collection?
655
656 Remove everything in the database_chain of this Calibration, including the default central database tag automatically
657 included from `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`. This will NOT affect the
658 database chain of any `Collection` other than the default one. You can prevent the default Collection from having its chain
659 reset by setting 'apply_to_default_collection' to False.
660 """
661 self.database_chain = []
662 if self.default_collection_name in self.collections and apply_to_default_collection:
663 self.collections[self.default_collection_name].reset_database()
664
665 def use_central_database(self, global_tag, apply_to_default_collection=True):
666 """
667 Parameters:
668 global_tag (str): The central database global tag to use for this calibration.
669
670 Keyword Arguments:
671 apply_to_default_collection (bool): Should we also call use_central_database on the default collection (if it exists)
672
673 Using this allows you to append a central database to the database chain for this calibration.
674 The default database chain is just the central one from
675 `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`.
676 To turn off central database completely or use a custom tag as the base, you should call `Calibration.reset_database`
677 and start adding databases with `Calibration.use_local_database` and `Calibration.use_central_database`.
678
679 Note that the database chain attached to the `Calibration` will only affect the default `Collection` (if it exists),
680 and the algorithm processes. So calling:
681
682 >> cal.use_central_database("global_tag")
683
684 will modify the database chain used by all the algorithms assigned to this `Calibration`, and modifies the database chain
685 assigned to
686
687 >> cal.collections['default'].database_chain
688
689 But calling
690
691 >> cal.use_central_database(file_path, payload_dir, False)
692
693 will add the database to the Algorithm processes, but leave the default Collection database chain untouched.
694 So if you have multiple Collections in this Calibration *their database chains are separate*.
695 To specify an additional `CentralDatabase` for a different collection, you will have to call:
696
697 >> cal.collections['OtherCollection'].use_central_database("global_tag")
698 """
699 central_db = CentralDatabase(global_tag)
700 self.database_chain.append(central_db)
701 if self.default_collection_name in self.collections and apply_to_default_collection:
702 self.collections[self.default_collection_name].use_central_database(global_tag)
703
704 def use_local_database(self, filename, directory="", apply_to_default_collection=True):
705 """
706 Parameters:
707 filename (str): The path to the database.txt of the local database
708
709 Keyword Argumemts:
710 directory (str): The path to the payloads directory for this local database.
711 apply_to_default_collection (bool): Should we also call use_local_database on the default collection (if it exists)
712
713 Append a local database to the chain for this calibration.
714 You can call this function multiple times and each database will be added to the chain IN ORDER.
715 The databases are applied to this calibration ONLY.
716 The Local and Central databases applied via these functions are applied to the algorithm processes and optionally
717 the default `Collection` job as a database chain.
718 There are other databases applied to the processes later, checked by basf2 in this order:
719
720 1) Local Database from previous iteration of this Calibration.
721 2) Local Database chain from output of previous dependent Calibrations.
722 3) This chain of Local and Central databases where the last added is checked first.
723
724 Note that this function on the `Calibration` object will only affect the default `Collection` if it exists and if
725 'apply_to_default_collection' remains True. So calling:
726
727 >> cal.use_local_database(file_path, payload_dir)
728
729 will modify the database chain used by all the algorithms assigned to this `Calibration`, and modifies the database chain
730 assigned to
731
732 >> cal.collections['default'].database_chain
733
734 But calling
735
736 >> cal.use_local_database(file_path, payload_dir, False)
737
738 will add the database to the Algorithm processes, but leave the default Collection database chain untouched.
739
740 If you have multiple Collections in this Calibration *their database chains are separate*.
741 To specify an additional `LocalDatabase` for a different collection, you will have to call:
742
743 >> cal.collections['OtherCollection'].use_local_database(file_path, payload_dir)
744
745 """
746 local_db = LocalDatabase(filename, directory)
747 self.database_chain.append(local_db)
748 if self.default_collection_name in self.collections and apply_to_default_collection:
749 self.collections[self.default_collection_name].use_local_database(filename, directory)
750
751 def _get_default_collection_attribute(self, attr):
752 if self.default_collection_name in self.collections:
753 return getattr(self.collections[self.default_collection_name], attr)
754 else:
755 B2WARNING(f"You tried to get the attribute '{attr}' from the Calibration '{self.name}', "
756 "but the default collection doesn't exist."
757 f"You should use the cal.collections['CollectionName'].{attr} to access a custom "
758 "collection's attributes directly.")
759 return None
760
761 def _set_default_collection_attribute(self, attr, value):
762 if self.default_collection_name in self.collections:
763 setattr(self.collections[self.default_collection_name], attr, value)
764 else:
765 B2WARNING(f"You tried to set the attribute '{attr}' from the Calibration '{self.name}', "
766 "but the default collection doesn't exist."
767 f"You should use the cal.collections['CollectionName'].{attr} to access a custom "
768 "collection's attributes directly.")
769
770 @property
771 def collector(self):
772 """
773 """
774 return self._get_default_collection_attribute("collector")
775
776 @collector.setter
777 def collector(self, collector):
778 """
779 """
780 # check if collector is already a module or if we need to create one
781 # from the name
782 from basf2 import Module
783 if isinstance(collector, str):
784 from basf2 import register_module
785 collector = register_module(collector)
786 if not isinstance(collector, Module):
787 B2ERROR("Collector needs to be either a Module or the name of such a module")
788
789 self._set_default_collection_attribute("collector", collector)
790
791 @property
792 def input_files(self):
793 """
794 """
795 return self._get_default_collection_attribute("input_files")
796
797 @input_files.setter
798 def input_files(self, files):
799 """
800 """
801 self._set_default_collection_attribute("input_files", files)
802
803 @property
804 def files_to_iovs(self):
805 """
806 """
807 return self._get_default_collection_attribute("files_to_iovs")
808
809 @files_to_iovs.setter
810 def files_to_iovs(self, file_map):
811 """
812 """
813 self._set_default_collection_attribute("files_to_iovs", file_map)
814
815 @property
816 def pre_collector_path(self):
817 """
818 """
819 return self._get_default_collection_attribute("pre_collector_path")
820
821 @pre_collector_path.setter
822 def pre_collector_path(self, path):
823 """
824 """
825 self._set_default_collection_attribute("pre_collector_path", path)
826
827 @property
828 def output_patterns(self):
829 """
830 """
831 return self._get_default_collection_attribute("output_patterns")
832
833 @output_patterns.setter
834 def output_patterns(self, patterns):
835 """
836 """
837 self._set_default_collection_attribute("output_patterns", patterns)
838
839 @property
840 def max_files_per_collector_job(self):
841 """
842 """
843 return self._get_default_collection_attribute("max_files_per_collector_job")
844
845 @max_files_per_collector_job.setter
846 def max_files_per_collector_job(self, max_files):
847 """
848 """
849 self._set_default_collection_attribute("max_files_per_collector_job", max_files)
850
851 @property
852 def max_collector_jobs(self):
853 """
854 """
855 return self._get_default_collection_attribute("max_collector_jobs")
856
857 @max_collector_jobs.setter
858 def max_collector_jobs(self, max_jobs):
859 """
860 """
861 self._set_default_collection_attribute("max_collector_jobs", max_jobs)
862
863 @property
864 def backend_args(self):
865 """
866 """
867 return self._get_default_collection_attribute("backend_args")
868
869 @backend_args.setter
870 def backend_args(self, args):
871 """
872 """
873 self._set_default_collection_attribute("backend_args", args)
874
875 @property
876 def algorithms(self):
877 """
878 """
879 return self._algorithms
880
881 @algorithms.setter
882 @method_dispatch
883 def algorithms(self, value):
884 """
885 """
886 from ROOT import Belle2 # noqa: make the Belle2 namespace available
887 from ROOT.Belle2 import CalibrationAlgorithm
888 if isinstance(value, CalibrationAlgorithm):
889 self._algorithms = [Algorithm(value)]
890 else:
891 B2ERROR(f"Something other than CalibrationAlgorithm instance passed in ({type(value)}). "
892 "Algorithm needs to inherit from Belle2::CalibrationAlgorithm")
893
894 @algorithms.fset.register(tuple)
895 @algorithms.fset.register(list)
896 def _(self, value):
897 """
898 Alternate algorithms setter for lists and tuples of CalibrationAlgorithms.
899 """
900 from ROOT import Belle2 # noqa: make the Belle2 namespace available
901 from ROOT.Belle2 import CalibrationAlgorithm
902 if value:
903 self._algorithms = []
904 for alg in value:
905 if isinstance(alg, CalibrationAlgorithm):
906 self._algorithms.append(Algorithm(alg))
907 else:
908 B2ERROR(f"Something other than CalibrationAlgorithm instance passed in {type(value)}."
909 "Algorithm needs to inherit from Belle2::CalibrationAlgorithm")
910
911 @property
912 def pre_algorithms(self):
913 """
914 Callback run prior to each algorithm iteration.
915 """
916 return [alg.pre_algorithm for alg in self.algorithms]
917
918 @pre_algorithms.setter
919 @method_dispatch
920 def pre_algorithms(self, func):
921 """
922 """
923 if func:
924 for alg in self.algorithms:
925 alg.pre_algorithm = func
926 else:
927 B2ERROR("Something evaluated as False passed in as pre_algorithm function.")
928
929 @pre_algorithms.fset.register(tuple)
930 @pre_algorithms.fset.register(list)
931 def _(self, values):
932 """
933 Alternate pre_algorithms setter for lists and tuples of functions, should be one per algorithm.
934 """
935 if values:
936 if len(values) == len(self.algorithms):
937 for func, alg in zip(values, self.algorithms):
938 alg.pre_algorithm = func
939 else:
940 B2ERROR("Number of functions and number of algorithms doesn't match.")
941 else:
942 B2ERROR("Empty container passed in for pre_algorithm functions")
943
944 @property
945 def strategies(self):
946 """
947 The `caf.strategies.AlgorithmStrategy` or `list` of them used when running the algorithm(s).
948 """
949 return [alg.strategy for alg in self.algorithms]
950
951 @strategies.setter
952 @method_dispatch
953 def strategies(self, strategy):
954 """
955 """
956 if strategy:
957 for alg in self.algorithms:
958 alg.strategy = strategy
959 else:
960 B2ERROR("Something evaluated as False passed in as a strategy.")
961
962 @strategies.fset.register(tuple)
963 @strategies.fset.register(list)
964 def _(self, values):
965 """
966 Alternate strategies setter for lists and tuples of functions, should be one per algorithm.
967 """
968 if values:
969 if len(values) == len(self.algorithms):
970 for strategy, alg in zip(strategies, self.algorithms):
971 alg.strategy = strategy
972 else:
973 B2ERROR("Number of strategies and number of algorithms doesn't match.")
974 else:
975 B2ERROR("Empty container passed in for strategies list")
976
977 def __repr__(self):
978 """
979 """
980 return self.name
981
982 def run(self):
983 """
984 Main logic of the Calibration object.
985 Will be run in a new Thread by calling the start() method.
986 """
987 with CAFDB(self._db_path, read_only=True) as db:
988 initial_state = db.get_calibration_value(self.name, "checkpoint")
989 initial_iteration = db.get_calibration_value(self.name, "iteration")
990 B2INFO(f"Initial status of {self.name} found to be state={initial_state}, iteration={initial_iteration}")
991 self.machine = CalibrationMachine(self,
992 iov_to_calibrate=self.iov,
993 initial_state=initial_state,
994 iteration=initial_iteration)
995 self.state = initial_state
996 self.machine.root_dir = Path(os.getcwd(), self.name)
997 self.machine.collector_backend = self.backend
998
999 # Before we start running, let's clean up any iteration directories from iterations above our initial one.
1000 # Should prevent confusion between attempts if we fail again.
1001 all_iteration_paths = find_int_dirs(self.machine.root_dir)
1002 for iteration_path in all_iteration_paths:
1003 if int(iteration_path.name) > initial_iteration:
1004 shutil.rmtree(iteration_path)
1005
1006 while self.state != self.end_state and self.state != self.fail_state:
1007 if self.state == "init":
1008 try:
1009 B2INFO(f"Attempting collector submission for calibration {self.name}.")
1010 self.machine.submit_collector()
1011 except Exception as err:
1012 B2FATAL(str(err))
1013
1014 self._poll_collector()
1015
1016 # If we failed take us to the final fail state
1017 if self.state == "collector_failed":
1018 self.machine.fail_fully()
1019 return
1020
1021 # It's possible that we might raise an error while attempting to run due
1022 # to some problems e.g. Missing collector output files
1023 # We catch the error and exit with failed state so the CAF will stop
1024 try:
1025 B2INFO(f"Attempting to run algorithms for calibration {self.name}.")
1026 self.machine.run_algorithms()
1027 except MachineError as err:
1028 B2ERROR(str(err))
1029 self.machine.fail()
1030
1031 # If we failed take us to the final fail state
1032 if self.machine.state == "algorithms_failed":
1033 self.machine.fail_fully()
1034 return
1035
1036 def _poll_collector(self):
1037 """
1038 """
1039 while self.state == "running_collector":
1040 try:
1041 self.machine.complete()
1042 # ConditionError is thrown when the conditions for the transition have returned false, it's not serious.
1043 except ConditionError:
1044 try:
1045 B2DEBUG(29, f"Checking if collector jobs for calibration {self.name} have failed.")
1046 self.machine.fail()
1047 except ConditionError:
1048 pass
1049 sleep(self.heartbeat) # Sleep until we want to check again
1050
1051 @property
1052 def state(self):
1053 """
1054 The current major state of the calibration in the database file. The machine may have a different state.
1055 """
1056 with CAFDB(self._db_path, read_only=True) as db:
1057 state = db.get_calibration_value(self.name, "state")
1058 return state
1059
1060 @state.setter
1061 def state(self, state):
1062 """
1063 """
1064 B2DEBUG(29, f"Setting {self.name} to state {state}.")
1065 with CAFDB(self._db_path) as db:
1066 db.update_calibration_value(self.name, "state", str(state))
1067 if state in self.checkpoint_states:
1068 db.update_calibration_value(self.name, "checkpoint", str(state))
1069 B2DEBUG(29, f"{self.name} set to {state}.")
1070
1071 @property
1072 def iteration(self):
1073 """
1074 Retrieves the current iteration number in the database file.
1075
1076 Returns:
1077 int: The current iteration number
1078 """
1079 with CAFDB(self._db_path, read_only=True) as db:
1080 iteration = db.get_calibration_value(self.name, "iteration")
1081 return iteration
1082
1083 @iteration.setter
1084 def iteration(self, iteration):
1085 """
1086 """
1087 B2DEBUG(29, f"Setting {self.name} to {iteration}.")
1088 with CAFDB(self._db_path) as db:
1089 db.update_calibration_value(self.name, "iteration", iteration)
1090 B2DEBUG(29, f"{self.name} set to {self.iteration}.")
1091
1092
1093class Algorithm():
1094 """
1095 Parameters:
1096 algorithm: The CalibrationAlgorithm instance that we want to execute.
1097 Keyword Arguments:
1098 data_input : An optional function that sets the input files of the algorithm.
1099 pre_algorithm : An optional function that runs just prior to execution of the algorithm.
1100 Useful for set up e.g. module initialisation
1101
1102 This is a simple wrapper class around the C++ CalibrationAlgorithm class.
1103 It helps to add functionality to algorithms for use by the Calibration and CAF classes rather
1104 than separating the logic into those classes directly.
1105
1106 This is **not** currently a class that a user should interact with much during `CAF`
1107 setup (unless you're doing something advanced).
1108 The `Calibration` class should be doing the most of the creation of the defaults for these objects.
1109
1110 Setting the `data_input` function might be necessary if you have set the `Calibration.output_patterns`.
1111 Also, setting the `pre_algorithm` to a function that should execute prior to each `strategies.AlgorithmStrategy`
1112 is often useful i.e. by calling for the Geometry module to initialise.
1113 """
1114
1115 def __init__(self, algorithm, data_input=None, pre_algorithm=None):
1116 """
1117 """
1118
1119 self.algorithm = algorithm
1120
1121 cppname = type(algorithm).__cpp_name__
1122 self.name = cppname[cppname.rfind('::') + 2:]
1123
1126 self.data_input = data_input
1127 if not self.data_input:
1128 self.data_input = self.default_inputdata_setup
1129
1132 self.pre_algorithm = pre_algorithm
1133
1135 self.strategy = strategies.SingleIOV
1136
1141 self.params = {}
1142
1143 def default_inputdata_setup(self, input_file_paths):
1144 """
1145 Simple setup to set the input file names to the algorithm. Applied to the data_input attribute
1146 by default. This simply takes all files returned from the `Calibration.output_patterns` and filters
1147 for only the CollectorOutput.root files. Then it sets them as input files to the CalibrationAlgorithm class.
1148 """
1149 collector_output_files = list(filter(lambda file_path: "CollectorOutput.root" == Path(file_path).name,
1150 input_file_paths))
1151 info_lines = [f"Input files used in {self.name}:"]
1152 info_lines.extend(collector_output_files)
1153 B2INFO_MULTILINE(info_lines)
1154 self.algorithm.setInputFileNames(collector_output_files)
1155
1156
1157class CAF():
1158 """
1159 Parameters:
1160 calibration_defaults (dict): A dictionary of default options for calibrations run by this `CAF` instance e.g.
1161
1162 >>> calibration_defaults={"max_iterations":2}
1163
1164 This class holds `Calibration` objects and processes them. It defines the initial configuration/setup
1165 for the calibrations. But most of the real processing is done through the `caf.state_machines.CalibrationMachine`.
1166
1167 The `CAF` class essentially does some initial setup, holds the `CalibrationBase` instances and calls the
1168 `CalibrationBase.start` when the dependencies are met.
1169
1170 Much of the checking for consistency is done in this class so that no processing is done with an invalid setup. Choosing which files to use as input should be done from outside during the setup of the `CAF` and
1171 `CalibrationBase` instances.
1172 """
1173
1174
1175 _db_name = "caf_state.db"
1176
1177 default_calibration_config = {
1178 "max_iterations": 5,
1179 "ignored_runs": []
1180 }
1181
1182 def __init__(self, calibration_defaults=None):
1183 """
1184 """
1185
1186 self.calibrations = {}
1187
1189 self.future_dependencies = {}
1190
1192 self.dependencies = {}
1193
1194 self.output_dir = "calibration_results"
1195
1196 self.order = None
1197
1198 self._backend = None
1199
1200 self.heartbeat = 5
1201
1202 if not calibration_defaults:
1203 calibration_defaults = {}
1204
1206 self.calibration_defaults = {**self.default_calibration_config, **calibration_defaults}
1207
1208 self._db_path = None
1209
1210 def add_calibration(self, calibration):
1211 """
1212 Adds a `Calibration` that is to be used in this program to the list.
1213 Also adds an empty dependency list to the overall dictionary.
1214 You should not directly alter a `Calibration` object after it has been
1215 added here.
1216 """
1217 if calibration.is_valid():
1218 if calibration.name not in self.calibrations:
1219 self.calibrations[calibration.name] = calibration
1220 else:
1221 B2WARNING(f"Tried to add a calibration with the name {calibration.name} twice.")
1222 else:
1223 B2WARNING(f"Tried to add incomplete/invalid calibration ({calibration.name}) to the framework."
1224 "It was not added and will not be part of the final process.")
1225
1226 def _remove_missing_dependencies(self):
1227 """
1228 This checks the future and past dependencies of each `Calibration` in the `CAF`.
1229 If any dependencies are not known to the `CAF` then they are removed from the `Calibration`
1230 object directly.
1231 """
1232 calibration_names = [calibration.name for calibration in self.calibrations.values()]
1233
1234 def is_dependency_in_caf(dependency):
1235 """
1236 Quick function to use with filter() and check dependencies against calibrations known to `CAF`
1237 """
1238 dependency_in_caf = dependency.name in calibration_names
1239 if not dependency_in_caf:
1240 B2WARNING(f"The calibration {dependency.name} is a required dependency but is not in the CAF."
1241 " It has been removed as a dependency.")
1242 return dependency_in_caf
1243
1244 # Check that there aren't dependencies on calibrations not added to the framework
1245 # Remove them from the calibration objects if there are.
1246 for calibration in self.calibrations.values():
1247 filtered_future_dependencies = list(filter(is_dependency_in_caf, calibration.future_dependencies))
1248 calibration.future_dependencies = filtered_future_dependencies
1249
1250 filtered_dependencies = list(filter(is_dependency_in_caf, calibration.dependencies))
1251 calibration.dependencies = filtered_dependencies
1252
1253 def _order_calibrations(self):
1254 """
1255 - Uses dependency attributes of calibrations to create a dependency dictionary and passes it
1256 to a sorting algorithm.
1257 - Returns valid OrderedDict if sort was successful, empty one if it failed (most likely a cyclic dependency)
1258 """
1259 # First remove any dependencies on calibrations not added to the CAF
1260 self._remove_missing_dependencies()
1261 # Filling dependencies dictionaries of CAF for sorting, only explicit dependencies for now
1262 # Note that they currently use the names not the calibration objects.
1263 for calibration in self.calibrations.values():
1264 future_dependencies_names = [dependency.name for dependency in calibration.future_dependencies]
1265 past_dependencies_names = [dependency.name for dependency in calibration.dependencies]
1266
1267 self.future_dependencies[calibration.name] = future_dependencies_names
1268 self.dependencies[calibration.name] = past_dependencies_names
1269 # Gives us a list of A (not THE) valid ordering and checks for cyclic dependencies
1270 order = topological_sort(self.future_dependencies)
1271 if not order:
1272 return False
1273
1274 # Get an ordered dictionary of the sort order but including all implicit dependencies.
1275 ordered_full_dependencies = all_dependencies(self.future_dependencies, order)
1276
1277 # Return all the implicit+explicit past dependencies
1278 full_past_dependencies = past_from_future_dependencies(ordered_full_dependencies)
1279 # Correct each calibration's dependency list to reflect the implicit dependencies
1280 for calibration in self.calibrations.values():
1281 full_deps = full_past_dependencies[calibration.name]
1282 explicit_deps = [cal.name for cal in calibration.dependencies]
1283 for dep in full_deps:
1284 if dep not in explicit_deps:
1285 calibration.dependencies.append(self.calibrations[dep])
1286 # At this point the calibrations have their full dependencies but they aren't in topological
1287 # sort order. Correct that here
1288 ordered_dependency_list = []
1289 for ordered_calibration_name in order:
1290 if ordered_calibration_name in [dep.name for dep in calibration.dependencies]:
1291 ordered_dependency_list.append(self.calibrations[ordered_calibration_name])
1292 calibration.dependencies = ordered_dependency_list
1293 order = ordered_full_dependencies
1294 # We should also patch in all of the implicit dependencies for the calibrations
1295 return order
1296
1297 def _check_backend(self):
1298 """
1299 Makes sure that the CAF has a valid backend setup. If one isn't set by the user (or if the one that is stored isn't a valid Backend object) we should create a default Local backend.
1300 """
1301 if not isinstance(self._backend, caf.backends.Backend):
1302
1303 self.backend = caf.backends.Local()
1304
1305 def _prune_invalid_collections(self):
1306 """
1307 Checks all current calibrations and removes any invalid Collections from their collections list.
1308 """
1309 B2INFO("Checking for any invalid Collections in Calibrations.")
1310 for calibration in self.calibrations.values():
1311 valid_collections = {}
1312 for name, collection in calibration.collections.items():
1313 if collection.is_valid():
1314 valid_collections[name] = collection
1315 else:
1316 B2WARNING(f"Removing invalid Collection '{name}' from Calibration '{calibration.name}'.")
1317 calibration.collections = valid_collections
1318
1319 def run(self, iov=None):
1320 """
1321 Keyword Arguments:
1322 iov(`caf.utils.IoV`): IoV to calibrate for this processing run. Only the input files necessary to calibrate
1323 this IoV will be used in the collection step.
1324
1325 This function runs the overall calibration job, saves the outputs to the output_dir directory,
1326 and creates database payloads.
1327
1328 Upload of final databases is not done here. This simply creates the local databases in
1329 the output directory. You should check the validity of your new local database before uploading
1330 to the conditions DB via the basf2 tools/interface to the DB.
1331 """
1332 if not self.calibrations:
1333 B2FATAL("There were no Calibration objects to run. Maybe you tried to add invalid ones?")
1334 # Checks whether the dependencies we've added will give a valid order
1335 order = self._order_calibrations()
1336 if not order:
1337 B2FATAL("Couldn't order the calibrations properly. Could be a cyclic dependency.")
1338
1339 # Check that a backend has been set and use default Local() one if not
1340 self._check_backend()
1341
1342 self._prune_invalid_collections()
1343
1344 # Creates the overall output directory and reset the attribute to use an absolute path to it.
1345 self.output_dir = self._make_output_dir()
1346
1347 # Creates a SQLite DB to save the status of the various calibrations
1348 self._make_database()
1349
1350 # Enter the overall output dir during processing and opena connection to the DB
1351 with temporary_workdir(self.output_dir):
1352 db = CAFDB(self._db_path)
1353 db.open()
1354 db_initial_calibrations = db.query("select * from calibrations").fetchall()
1355 for calibration in self.calibrations.values():
1356 # Apply defaults given to the `CAF` to the calibrations if they aren't set
1357 calibration._apply_calibration_defaults(self.calibration_defaults)
1358 calibration._db_path = self._db_path
1359 calibration.output_database_dir = Path(self.output_dir, calibration.name, "outputdb").as_posix()
1360 calibration.iov = iov
1361 if not calibration.backend:
1362 calibration.backend = self.backend
1363 # Do some checking of the db to see if we need to add an entry for this calibration
1364 if calibration.name not in [db_cal[0] for db_cal in db_initial_calibrations]:
1365 db.insert_calibration(calibration.name)
1366 db.commit()
1367 else:
1368 for cal_info in db_initial_calibrations:
1369 if cal_info[0] == calibration.name:
1370 cal_initial_state = cal_info[2]
1371 cal_initial_iteration = cal_info[3]
1372 B2INFO(f"Previous entry in database found for {calibration.name}.")
1373 B2INFO(f"Setting {calibration.name} state to checkpoint state '{cal_initial_state}'.")
1374 calibration.state = cal_initial_state
1375 B2INFO(f"Setting {calibration.name} iteration to '{cal_initial_iteration}'.")
1376 calibration.iteration = cal_initial_iteration
1377 # Daemonize so that it exits if the main program exits
1378 calibration.daemon = True
1379
1380 db.close()
1381
1382 # Is it possible to keep going?
1383 keep_running = True
1384 while keep_running:
1385 keep_running = False
1386 # Do we have calibrations that may yet complete?
1387 remaining_calibrations = []
1388
1389 for calibration in self.calibrations.values():
1390 # Find the currently ended calibrations (may not be joined yet)
1391 if (calibration.state == CalibrationBase.end_state or calibration.state == CalibrationBase.fail_state):
1392 # Search for any alive Calibrations and join them
1393 if calibration.is_alive():
1394 B2DEBUG(29, f"Joining {calibration.name}.")
1395 calibration.join()
1396 else:
1397 if calibration.dependencies_met():
1398 if not calibration.is_alive():
1399 B2DEBUG(29, f"Starting {calibration.name}.")
1400 try:
1401 calibration.start()
1402 except RuntimeError:
1403 # Catch the case when the calibration just finished so it ended up here
1404 # in the "else" and not above where it should have been joined.
1405 B2DEBUG(29, f"{calibration.name} probably just finished, join it later.")
1406 remaining_calibrations.append(calibration)
1407 else:
1408 if not calibration.failed_dependencies():
1409 remaining_calibrations.append(calibration)
1410 if remaining_calibrations:
1411 keep_running = True
1412 # Loop over jobs that the calibrations want submitted and submit them.
1413 # We do this here because some backends don't like us submitting in parallel from multiple CalibrationThreads
1414 # So this is like a mini job queue without getting too clever with it
1415 for calibration in remaining_calibrations:
1416 for job in calibration.jobs_to_submit[:]:
1417 calibration.backend.submit(job)
1418 calibration.jobs_to_submit.remove(job)
1419 sleep(self.heartbeat)
1420
1421 B2INFO("Printing summary of final CAF status.")
1422 with CAFDB(self._db_path, read_only=True) as db:
1423 print(db.output_calibration_table())
1424
1425 @property
1426 def backend(self):
1427 """
1428 The `backend <backends.Backend>` that runs the collector job.
1429 When set, this is checked that a `backends.Backend` class instance was passed in.
1430 """
1431 return self._backend
1432
1433 @backend.setter
1434 def backend(self, backend):
1435 """
1436 """
1437 if isinstance(backend, caf.backends.Backend):
1438 self._backend = backend
1439 else:
1440 B2ERROR('Backend property must inherit from Backend class.')
1441
1442 def _make_output_dir(self):
1443 """
1444 Creates the output directory. If it already exists we are now going to try and restart the program from the last state.
1445
1446 Returns:
1447 str: The absolute path of the new output_dir
1448 """
1449 p = Path(self.output_dir).resolve()
1450 if p.is_dir():
1451 B2INFO(f"{p.as_posix()} output directory already exists. "
1452 "We will try to restart from the previous finishing state.")
1453 return p.as_posix()
1454 else:
1455 p.mkdir(parents=True)
1456 if p.is_dir():
1457 return p.as_posix()
1458 else:
1459 raise FileNotFoundError(f"Attempted to create output_dir {p.as_posix()}, but it didn't work.")
1460
1461 def _make_database(self):
1462 """
1463 Creates the CAF status database. If it already exists we don't overwrite it. """
1464 self._db_path = Path(self.output_dir, self._db_name).absolute()
1465 if self._db_path.exists():
1466 B2INFO(f"Previous CAF database found {self._db_path}")
1467 # Will create a new database + tables, or do nothing but checks we can connect to existing one
1468 with CAFDB(self._db_path):
1469 pass
1470
1471# @endcond
1472