Belle II Software development
CAF Class Reference

Public Member Functions

 __init__ (self, calibration_defaults=None)
 
 add_calibration (self, calibration)
 
 run (self, iov=None)
 
 backend (self)
 
 backend (self, backend)
 

Public Attributes

dict calibrations = {}
 Dictionary of calibrations for this CAF instance.
 
dict future_dependencies = {}
 Dictionary of future dependencies of Calibration objects, where the value is all calibrations that will depend on the key, filled during self.run()
 
dict dependencies = {}
 Dictionary of dependencies of Calibration objects, where value is the list of Calibration objects that the key depends on.
 
str output_dir = "calibration_results"
 Output path to store results of calibration and bookkeeping information.
 
 order = None
 The ordering and explicit future dependencies of calibrations.
 
int heartbeat = 5
 The heartbeat (seconds) between polling for Calibrations that are finished.
 
dict calibration_defaults = {**self.default_calibration_config, **calibration_defaults}
 Default options applied to each calibration known to the CAF, if the Calibration has these defined by the user then the defaults aren't applied.
 
 backend = caf.backends.Local()
 backend property
 

Static Public Attributes

dict default_calibration_config
 The defaults for Calibrations.
 

Protected Member Functions

 _remove_missing_dependencies (self)
 
 _order_calibrations (self)
 
 _check_backend (self)
 
 _prune_invalid_collections (self)
 
 _make_output_dir (self)
 
 _make_database (self)
 

Protected Attributes

 _backend = None
 Private backend attribute.
 
 _db_path = None
 The path of the SQLite DB.
 

Static Protected Attributes

str _db_name = "caf_state.db"
 The name of the SQLite DB that gets created.
 

Detailed Description

Parameters:
  calibration_defaults (dict): A dictionary of default options for calibrations run by this `CAF` instance e.g.

                               >>> calibration_defaults={"max_iterations":2}

This class holds `Calibration` objects and processes them. It defines the initial configuration/setup
for the calibrations. But most of the real processing is done through the `caf.state_machines.CalibrationMachine`.

The `CAF` class essentially does some initial setup, holds the `CalibrationBase` instances and calls the
`CalibrationBase.start` when the dependencies are met.

Much of the checking for consistency is done in this class so that no processing is done with an invalid
setup. Choosing which files to use as input should be done from outside during the setup of the `CAF` and
`CalibrationBase` instances.

Definition at line 1214 of file framework.py.

Constructor & Destructor Documentation

◆ __init__()

__init__ ( self,
calibration_defaults = None )
 

Definition at line 1240 of file framework.py.

1240 def __init__(self, calibration_defaults=None):
1241 """
1242 """
1243
1244 self.calibrations = {}
1245
1247 self.future_dependencies = {}
1248
1250 self.dependencies = {}
1251
1252 self.output_dir = "calibration_results"
1253
1254 self.order = None
1255
1256 self._backend = None
1257
1258 self.heartbeat = 5
1259
1260 if not calibration_defaults:
1261 calibration_defaults = {}
1262
1264 self.calibration_defaults = {**self.default_calibration_config, **calibration_defaults}
1265
1266 self._db_path = None
1267

Member Function Documentation

◆ _check_backend()

_check_backend ( self)
protected
Makes sure that the CAF has a valid backend setup. If one isn't set by the user (or if the
one that is stored isn't a valid Backend object) we should create a default Local backend.

Definition at line 1355 of file framework.py.

1355 def _check_backend(self):
1356 """
1357 Makes sure that the CAF has a valid backend setup. If one isn't set by the user (or if the
1358 one that is stored isn't a valid Backend object) we should create a default Local backend.
1359 """
1360 if not isinstance(self._backend, caf.backends.Backend):
1361
1362 self.backend = caf.backends.Local()
1363

◆ _make_database()

_make_database ( self)
protected
Creates the CAF status database. If it already exists we don't overwrite it.

Definition at line 1520 of file framework.py.

1520 def _make_database(self):
1521 """
1522 Creates the CAF status database. If it already exists we don't overwrite it.
1523 """
1524 self._db_path = Path(self.output_dir, self._db_name).absolute()
1525 if self._db_path.exists():
1526 B2INFO(f"Previous CAF database found {self._db_path}")
1527 # Will create a new database + tables, or do nothing but checks we can connect to existing one
1528 with CAFDB(self._db_path):
1529 pass

◆ _make_output_dir()

_make_output_dir ( self)
protected
Creates the output directory. If it already exists we are now going to try and restart the program from the last state.

Returns:
    str: The absolute path of the new output_dir

Definition at line 1501 of file framework.py.

1501 def _make_output_dir(self):
1502 """
1503 Creates the output directory. If it already exists we are now going to try and restart the program from the last state.
1504
1505 Returns:
1506 str: The absolute path of the new output_dir
1507 """
1508 p = Path(self.output_dir).resolve()
1509 if p.is_dir():
1510 B2INFO(f"{p.as_posix()} output directory already exists. "
1511 "We will try to restart from the previous finishing state.")
1512 return p.as_posix()
1513 else:
1514 p.mkdir(parents=True)
1515 if p.is_dir():
1516 return p.as_posix()
1517 else:
1518 raise FileNotFoundError(f"Attempted to create output_dir {p.as_posix()}, but it didn't work.")
1519

◆ _order_calibrations()

_order_calibrations ( self)
protected
- Uses dependency attributes of calibrations to create a dependency dictionary and passes it
to a sorting algorithm.
- Returns valid OrderedDict if sort was successful, empty one if it failed (most likely a cyclic dependency)

Definition at line 1311 of file framework.py.

1311 def _order_calibrations(self):
1312 """
1313 - Uses dependency attributes of calibrations to create a dependency dictionary and passes it
1314 to a sorting algorithm.
1315 - Returns valid OrderedDict if sort was successful, empty one if it failed (most likely a cyclic dependency)
1316 """
1317 # First remove any dependencies on calibrations not added to the CAF
1318 self._remove_missing_dependencies()
1319 # Filling dependencies dictionaries of CAF for sorting, only explicit dependencies for now
1320 # Note that they currently use the names not the calibration objects.
1321 for calibration in self.calibrations.values():
1322 future_dependencies_names = [dependency.name for dependency in calibration.future_dependencies]
1323 past_dependencies_names = [dependency.name for dependency in calibration.dependencies]
1324
1325 self.future_dependencies[calibration.name] = future_dependencies_names
1326 self.dependencies[calibration.name] = past_dependencies_names
1327 # Gives us a list of A (not THE) valid ordering and checks for cyclic dependencies
1328 order = topological_sort(self.future_dependencies)
1329 if not order:
1330 return False
1331
1332 # Get an ordered dictionary of the sort order but including all implicit dependencies.
1333 ordered_full_dependencies = all_dependencies(self.future_dependencies, order)
1334
1335 # Return all the implicit+explicit past dependencies
1336 full_past_dependencies = past_from_future_dependencies(ordered_full_dependencies)
1337 # Correct each calibration's dependency list to reflect the implicit dependencies
1338 for calibration in self.calibrations.values():
1339 full_deps = full_past_dependencies[calibration.name]
1340 explicit_deps = [cal.name for cal in calibration.dependencies]
1341 for dep in full_deps:
1342 if dep not in explicit_deps:
1343 calibration.dependencies.append(self.calibrations[dep])
1344 # At this point the calibrations have their full dependencies but they aren't in topological
1345 # sort order. Correct that here
1346 ordered_dependency_list = []
1347 for ordered_calibration_name in order:
1348 if ordered_calibration_name in [dep.name for dep in calibration.dependencies]:
1349 ordered_dependency_list.append(self.calibrations[ordered_calibration_name])
1350 calibration.dependencies = ordered_dependency_list
1351 order = ordered_full_dependencies
1352 # We should also patch in all of the implicit dependencies for the calibrations
1353 return order
1354

◆ _prune_invalid_collections()

_prune_invalid_collections ( self)
protected
Checks all current calibrations and removes any invalid Collections from their collections list.

Definition at line 1364 of file framework.py.

1364 def _prune_invalid_collections(self):
1365 """
1366 Checks all current calibrations and removes any invalid Collections from their collections list.
1367 """
1368 B2INFO("Checking for any invalid Collections in Calibrations.")
1369 for calibration in self.calibrations.values():
1370 valid_collections = {}
1371 for name, collection in calibration.collections.items():
1372 if collection.is_valid():
1373 valid_collections[name] = collection
1374 else:
1375 B2WARNING(f"Removing invalid Collection '{name}' from Calibration '{calibration.name}'.")
1376 calibration.collections = valid_collections
1377

◆ _remove_missing_dependencies()

_remove_missing_dependencies ( self)
protected
This checks the future and past dependencies of each `Calibration` in the `CAF`.
If any dependencies are not known to the `CAF` then they are removed from the `Calibration`
object directly.

Definition at line 1284 of file framework.py.

1284 def _remove_missing_dependencies(self):
1285 """
1286 This checks the future and past dependencies of each `Calibration` in the `CAF`.
1287 If any dependencies are not known to the `CAF` then they are removed from the `Calibration`
1288 object directly.
1289 """
1290 calibration_names = [calibration.name for calibration in self.calibrations.values()]
1291
1292 def is_dependency_in_caf(dependency):
1293 """
1294 Quick function to use with filter() and check dependencies against calibrations known to `CAF`
1295 """
1296 dependency_in_caf = dependency.name in calibration_names
1297 if not dependency_in_caf:
1298 B2WARNING(f"The calibration {dependency.name} is a required dependency but is not in the CAF."
1299 " It has been removed as a dependency.")
1300 return dependency_in_caf
1301
1302 # Check that there aren't dependencies on calibrations not added to the framework
1303 # Remove them from the calibration objects if there are.
1304 for calibration in self.calibrations.values():
1305 filtered_future_dependencies = list(filter(is_dependency_in_caf, calibration.future_dependencies))
1306 calibration.future_dependencies = filtered_future_dependencies
1307
1308 filtered_dependencies = list(filter(is_dependency_in_caf, calibration.dependencies))
1309 calibration.dependencies = filtered_dependencies
1310

◆ add_calibration()

add_calibration ( self,
calibration )
Adds a `Calibration` that is to be used in this program to the list.
Also adds an empty dependency list to the overall dictionary.
You should not directly alter a `Calibration` object after it has been
added here.

Definition at line 1268 of file framework.py.

1268 def add_calibration(self, calibration):
1269 """
1270 Adds a `Calibration` that is to be used in this program to the list.
1271 Also adds an empty dependency list to the overall dictionary.
1272 You should not directly alter a `Calibration` object after it has been
1273 added here.
1274 """
1275 if calibration.is_valid():
1276 if calibration.name not in self.calibrations:
1277 self.calibrations[calibration.name] = calibration
1278 else:
1279 B2WARNING(f"Tried to add a calibration with the name {calibration.name} twice.")
1280 else:
1281 B2WARNING(f"Tried to add incomplete/invalid calibration ({calibration.name}) to the framework."
1282 "It was not added and will not be part of the final process.")
1283

◆ backend() [1/2]

backend ( self)
The `backend <backends.Backend>` that runs the collector job.
When set, this is checked that a `backends.Backend` class instance was passed in.

Definition at line 1485 of file framework.py.

1485 def backend(self):
1486 """
1487 The `backend <backends.Backend>` that runs the collector job.
1488 When set, this is checked that a `backends.Backend` class instance was passed in.
1489 """
1490 return self._backend
1491

◆ backend() [2/2]

backend ( self,
backend )
 

Definition at line 1493 of file framework.py.

1493 def backend(self, backend):
1494 """
1495 """
1496 if isinstance(backend, caf.backends.Backend):
1497 self._backend = backend
1498 else:
1499 B2ERROR('Backend property must inherit from Backend class.')
1500

◆ run()

run ( self,
iov = None )
Keyword Arguments:
    iov(`caf.utils.IoV`): IoV to calibrate for this processing run. Only the input files necessary to calibrate
                          this IoV will be used in the collection step.

This function runs the overall calibration job, saves the outputs to the output_dir directory,
and creates database payloads.

Upload of final databases is not done here. This simply creates the local databases in
the output directory. You should check the validity of your new local database before uploading
to the conditions DB via the basf2 tools/interface to the DB.

Definition at line 1378 of file framework.py.

1378 def run(self, iov=None):
1379 """
1380 Keyword Arguments:
1381 iov(`caf.utils.IoV`): IoV to calibrate for this processing run. Only the input files necessary to calibrate
1382 this IoV will be used in the collection step.
1383
1384 This function runs the overall calibration job, saves the outputs to the output_dir directory,
1385 and creates database payloads.
1386
1387 Upload of final databases is not done here. This simply creates the local databases in
1388 the output directory. You should check the validity of your new local database before uploading
1389 to the conditions DB via the basf2 tools/interface to the DB.
1390 """
1391 if not self.calibrations:
1392 B2FATAL("There were no Calibration objects to run. Maybe you tried to add invalid ones?")
1393 # Checks whether the dependencies we've added will give a valid order
1394 order = self._order_calibrations()
1395 if not order:
1396 B2FATAL("Couldn't order the calibrations properly. Could be a cyclic dependency.")
1397
1398 # Check that a backend has been set and use default Local() one if not
1399 self._check_backend()
1400
1401 self._prune_invalid_collections()
1402
1403 # Creates the overall output directory and reset the attribute to use an absolute path to it.
1404 self.output_dir = self._make_output_dir()
1405
1406 # Creates a SQLite DB to save the status of the various calibrations
1407 self._make_database()
1408
1409 # Enter the overall output dir during processing and opena connection to the DB
1410 with temporary_workdir(self.output_dir):
1411 db = CAFDB(self._db_path)
1412 db.open()
1413 db_initial_calibrations = db.query("select * from calibrations").fetchall()
1414 for calibration in self.calibrations.values():
1415 # Apply defaults given to the `CAF` to the calibrations if they aren't set
1416 calibration._apply_calibration_defaults(self.calibration_defaults)
1417 calibration._db_path = self._db_path
1418 calibration.output_database_dir = Path(self.output_dir, calibration.name, "outputdb").as_posix()
1419 calibration.iov = iov
1420 if not calibration.backend:
1421 calibration.backend = self.backend
1422 # Do some checking of the db to see if we need to add an entry for this calibration
1423 if calibration.name not in [db_cal[0] for db_cal in db_initial_calibrations]:
1424 db.insert_calibration(calibration.name)
1425 db.commit()
1426 else:
1427 for cal_info in db_initial_calibrations:
1428 if cal_info[0] == calibration.name:
1429 cal_initial_state = cal_info[2]
1430 cal_initial_iteration = cal_info[3]
1431 B2INFO(f"Previous entry in database found for {calibration.name}.")
1432 B2INFO(f"Setting {calibration.name} state to checkpoint state '{cal_initial_state}'.")
1433 calibration.state = cal_initial_state
1434 B2INFO(f"Setting {calibration.name} iteration to '{cal_initial_iteration}'.")
1435 calibration.iteration = cal_initial_iteration
1436 # Daemonize so that it exits if the main program exits
1437 calibration.daemon = True
1438
1439 db.close()
1440
1441 # Is it possible to keep going?
1442 keep_running = True
1443 while keep_running:
1444 keep_running = False
1445 # Do we have calibrations that may yet complete?
1446 remaining_calibrations = []
1447
1448 for calibration in self.calibrations.values():
1449 # Find the currently ended calibrations (may not be joined yet)
1450 if (calibration.state == CalibrationBase.end_state or calibration.state == CalibrationBase.fail_state):
1451 # Search for any alive Calibrations and join them
1452 if calibration.is_alive():
1453 B2DEBUG(29, f"Joining {calibration.name}.")
1454 calibration.join()
1455 else:
1456 if calibration.dependencies_met():
1457 if not calibration.is_alive():
1458 B2DEBUG(29, f"Starting {calibration.name}.")
1459 try:
1460 calibration.start()
1461 except RuntimeError:
1462 # Catch the case when the calibration just finished so it ended up here
1463 # in the "else" and not above where it should have been joined.
1464 B2DEBUG(29, f"{calibration.name} probably just finished, join it later.")
1465 remaining_calibrations.append(calibration)
1466 else:
1467 if not calibration.failed_dependencies():
1468 remaining_calibrations.append(calibration)
1469 if remaining_calibrations:
1470 keep_running = True
1471 # Loop over jobs that the calibrations want submitted and submit them.
1472 # We do this here because some backends don't like us submitting in parallel from multiple CalibrationThreads
1473 # So this is like a mini job queue without getting too clever with it
1474 for calibration in remaining_calibrations:
1475 for job in calibration.jobs_to_submit[:]:
1476 calibration.backend.submit(job)
1477 calibration.jobs_to_submit.remove(job)
1478 sleep(self.heartbeat)
1479
1480 B2INFO("Printing summary of final CAF status.")
1481 with CAFDB(self._db_path, read_only=True) as db:
1482 print(db.output_calibration_table())
1483

Member Data Documentation

◆ _backend

_backend = None
protected

Private backend attribute.

Definition at line 1256 of file framework.py.

◆ _db_name

str _db_name = "caf_state.db"
staticprotected

The name of the SQLite DB that gets created.

Definition at line 1233 of file framework.py.

◆ _db_path

_db_path = None
protected

The path of the SQLite DB.

Definition at line 1266 of file framework.py.

◆ backend

backend = caf.backends.Local()

backend property

Definition at line 1362 of file framework.py.

◆ calibration_defaults

calibration_defaults = {**self.default_calibration_config, **calibration_defaults}

Default options applied to each calibration known to the CAF, if the Calibration has these defined by the user then the defaults aren't applied.

A simple way to define the same configuration to all calibrations in the CAF.

Definition at line 1264 of file framework.py.

◆ calibrations

dict calibrations = {}

Dictionary of calibrations for this CAF instance.

You should use add_calibration to add to this.

Definition at line 1244 of file framework.py.

◆ default_calibration_config

dict default_calibration_config
static
Initial value:
= {
"max_iterations": 5,
"ignored_runs": []
}

The defaults for Calibrations.

Definition at line 1235 of file framework.py.

◆ dependencies

dict dependencies = {}

Dictionary of dependencies of Calibration objects, where value is the list of Calibration objects that the key depends on.

This attribute is filled during self.run()

Definition at line 1250 of file framework.py.

◆ future_dependencies

dict future_dependencies = {}

Dictionary of future dependencies of Calibration objects, where the value is all calibrations that will depend on the key, filled during self.run()

Definition at line 1247 of file framework.py.

◆ heartbeat

heartbeat = 5

The heartbeat (seconds) between polling for Calibrations that are finished.

Definition at line 1258 of file framework.py.

◆ order

order = None

The ordering and explicit future dependencies of calibrations.

Will be filled during CAF.run() for you.

Definition at line 1254 of file framework.py.

◆ output_dir

output_dir = "calibration_results"

Output path to store results of calibration and bookkeeping information.

Definition at line 1252 of file framework.py.


The documentation for this class was generated from the following file: