Belle II Software development
CAF Class Reference

Public Member Functions

 __init__ (self, calibration_defaults=None)
 
 add_calibration (self, calibration)
 
 run (self, iov=None)
 
 backend (self)
 
 backend (self, backend)
 

Public Attributes

dict calibrations = {}
 Dictionary of calibrations for this CAF instance.
 
dict future_dependencies = {}
 Dictionary of future dependencies of Calibration objects, where the value is all calibrations that will depend on the key, filled during self.run()
 
dict dependencies = {}
 Dictionary of dependencies of Calibration objects, where value is the list of Calibration objects that the key depends on.
 
str output_dir = "calibration_results"
 Output path to store results of calibration and bookkeeping information.
 
 order = None
 The ordering and explicit future dependencies of calibrations.
 
int heartbeat = 5
 The heartbeat (seconds) between polling for Calibrations that are finished.
 
dict calibration_defaults = {**self.default_calibration_config, **calibration_defaults}
 Default options applied to each calibration known to the CAF, if the Calibration has these defined by the user then the defaults aren't applied.
 
 backend = caf.backends.Local()
 backend property
 

Static Public Attributes

dict default_calibration_config
 The defaults for Calibrations.
 

Protected Member Functions

 _remove_missing_dependencies (self)
 
 _order_calibrations (self)
 
 _check_backend (self)
 
 _prune_invalid_collections (self)
 
 _make_output_dir (self)
 
 _make_database (self)
 

Protected Attributes

 _backend = None
 Private backend attribute.
 
 _db_path = None
 The path of the SQLite DB.
 

Static Protected Attributes

str _db_name = "caf_state.db"
 The name of the SQLite DB that gets created.
 

Detailed Description

Parameters:
  calibration_defaults (dict): A dictionary of default options for calibrations run by this `CAF` instance e.g.

                               >>> calibration_defaults={"max_iterations":2}

This class holds `Calibration` objects and processes them. It defines the initial configuration/setup
for the calibrations. But most of the real processing is done through the `caf.state_machines.CalibrationMachine`.

The `CAF` class essentially does some initial setup, holds the `CalibrationBase` instances and calls the
`CalibrationBase.start` when the dependencies are met.

Much of the checking for consistency is done in this class so that no processing is done with an invalid
setup. Choosing which files to use as input should be done from outside during the setup of the `CAF` and
`CalibrationBase` instances.

Definition at line 1194 of file framework.py.

Constructor & Destructor Documentation

◆ __init__()

__init__ ( self,
calibration_defaults = None )
 

Definition at line 1220 of file framework.py.

1220 def __init__(self, calibration_defaults=None):
1221 """
1222 """
1223
1224 self.calibrations = {}
1225
1227 self.future_dependencies = {}
1228
1230 self.dependencies = {}
1231
1232 self.output_dir = "calibration_results"
1233
1234 self.order = None
1235
1236 self._backend = None
1237
1238 self.heartbeat = 5
1239
1240 if not calibration_defaults:
1241 calibration_defaults = {}
1242
1244 self.calibration_defaults = {**self.default_calibration_config, **calibration_defaults}
1245
1246 self._db_path = None
1247

Member Function Documentation

◆ _check_backend()

_check_backend ( self)
protected
Makes sure that the CAF has a valid backend setup. If one isn't set by the user (or if the
one that is stored isn't a valid Backend object) we should create a default Local backend.

Definition at line 1335 of file framework.py.

1335 def _check_backend(self):
1336 """
1337 Makes sure that the CAF has a valid backend setup. If one isn't set by the user (or if the
1338 one that is stored isn't a valid Backend object) we should create a default Local backend.
1339 """
1340 if not isinstance(self._backend, caf.backends.Backend):
1341
1342 self.backend = caf.backends.Local()
1343

◆ _make_database()

_make_database ( self)
protected
Creates the CAF status database. If it already exists we don't overwrite it.

Definition at line 1500 of file framework.py.

1500 def _make_database(self):
1501 """
1502 Creates the CAF status database. If it already exists we don't overwrite it.
1503 """
1504 self._db_path = Path(self.output_dir, self._db_name).absolute()
1505 if self._db_path.exists():
1506 B2INFO(f"Previous CAF database found {self._db_path}")
1507 # Will create a new database + tables, or do nothing but checks we can connect to existing one
1508 with CAFDB(self._db_path):
1509 pass

◆ _make_output_dir()

_make_output_dir ( self)
protected
Creates the output directory. If it already exists we are now going to try and restart the program from the last state.

Returns:
    str: The absolute path of the new output_dir

Definition at line 1481 of file framework.py.

1481 def _make_output_dir(self):
1482 """
1483 Creates the output directory. If it already exists we are now going to try and restart the program from the last state.
1484
1485 Returns:
1486 str: The absolute path of the new output_dir
1487 """
1488 p = Path(self.output_dir).resolve()
1489 if p.is_dir():
1490 B2INFO(f"{p.as_posix()} output directory already exists. "
1491 "We will try to restart from the previous finishing state.")
1492 return p.as_posix()
1493 else:
1494 p.mkdir(parents=True)
1495 if p.is_dir():
1496 return p.as_posix()
1497 else:
1498 raise FileNotFoundError(f"Attempted to create output_dir {p.as_posix()}, but it didn't work.")
1499

◆ _order_calibrations()

_order_calibrations ( self)
protected
- Uses dependency attributes of calibrations to create a dependency dictionary and passes it
to a sorting algorithm.
- Returns valid OrderedDict if sort was successful, empty one if it failed (most likely a cyclic dependency)

Definition at line 1291 of file framework.py.

1291 def _order_calibrations(self):
1292 """
1293 - Uses dependency attributes of calibrations to create a dependency dictionary and passes it
1294 to a sorting algorithm.
1295 - Returns valid OrderedDict if sort was successful, empty one if it failed (most likely a cyclic dependency)
1296 """
1297 # First remove any dependencies on calibrations not added to the CAF
1298 self._remove_missing_dependencies()
1299 # Filling dependencies dictionaries of CAF for sorting, only explicit dependencies for now
1300 # Note that they currently use the names not the calibration objects.
1301 for calibration in self.calibrations.values():
1302 future_dependencies_names = [dependency.name for dependency in calibration.future_dependencies]
1303 past_dependencies_names = [dependency.name for dependency in calibration.dependencies]
1304
1305 self.future_dependencies[calibration.name] = future_dependencies_names
1306 self.dependencies[calibration.name] = past_dependencies_names
1307 # Gives us a list of A (not THE) valid ordering and checks for cyclic dependencies
1308 order = topological_sort(self.future_dependencies)
1309 if not order:
1310 return False
1311
1312 # Get an ordered dictionary of the sort order but including all implicit dependencies.
1313 ordered_full_dependencies = all_dependencies(self.future_dependencies, order)
1314
1315 # Return all the implicit+explicit past dependencies
1316 full_past_dependencies = past_from_future_dependencies(ordered_full_dependencies)
1317 # Correct each calibration's dependency list to reflect the implicit dependencies
1318 for calibration in self.calibrations.values():
1319 full_deps = full_past_dependencies[calibration.name]
1320 explicit_deps = [cal.name for cal in calibration.dependencies]
1321 for dep in full_deps:
1322 if dep not in explicit_deps:
1323 calibration.dependencies.append(self.calibrations[dep])
1324 # At this point the calibrations have their full dependencies but they aren't in topological
1325 # sort order. Correct that here
1326 ordered_dependency_list = []
1327 for ordered_calibration_name in order:
1328 if ordered_calibration_name in [dep.name for dep in calibration.dependencies]:
1329 ordered_dependency_list.append(self.calibrations[ordered_calibration_name])
1330 calibration.dependencies = ordered_dependency_list
1331 order = ordered_full_dependencies
1332 # We should also patch in all of the implicit dependencies for the calibrations
1333 return order
1334

◆ _prune_invalid_collections()

_prune_invalid_collections ( self)
protected
Checks all current calibrations and removes any invalid Collections from their collections list.

Definition at line 1344 of file framework.py.

1344 def _prune_invalid_collections(self):
1345 """
1346 Checks all current calibrations and removes any invalid Collections from their collections list.
1347 """
1348 B2INFO("Checking for any invalid Collections in Calibrations.")
1349 for calibration in self.calibrations.values():
1350 valid_collections = {}
1351 for name, collection in calibration.collections.items():
1352 if collection.is_valid():
1353 valid_collections[name] = collection
1354 else:
1355 B2WARNING(f"Removing invalid Collection '{name}' from Calibration '{calibration.name}'.")
1356 calibration.collections = valid_collections
1357

◆ _remove_missing_dependencies()

_remove_missing_dependencies ( self)
protected
This checks the future and past dependencies of each `Calibration` in the `CAF`.
If any dependencies are not known to the `CAF` then they are removed from the `Calibration`
object directly.

Definition at line 1264 of file framework.py.

1264 def _remove_missing_dependencies(self):
1265 """
1266 This checks the future and past dependencies of each `Calibration` in the `CAF`.
1267 If any dependencies are not known to the `CAF` then they are removed from the `Calibration`
1268 object directly.
1269 """
1270 calibration_names = [calibration.name for calibration in self.calibrations.values()]
1271
1272 def is_dependency_in_caf(dependency):
1273 """
1274 Quick function to use with filter() and check dependencies against calibrations known to `CAF`
1275 """
1276 dependency_in_caf = dependency.name in calibration_names
1277 if not dependency_in_caf:
1278 B2WARNING(f"The calibration {dependency.name} is a required dependency but is not in the CAF."
1279 " It has been removed as a dependency.")
1280 return dependency_in_caf
1281
1282 # Check that there aren't dependencies on calibrations not added to the framework
1283 # Remove them from the calibration objects if there are.
1284 for calibration in self.calibrations.values():
1285 filtered_future_dependencies = list(filter(is_dependency_in_caf, calibration.future_dependencies))
1286 calibration.future_dependencies = filtered_future_dependencies
1287
1288 filtered_dependencies = list(filter(is_dependency_in_caf, calibration.dependencies))
1289 calibration.dependencies = filtered_dependencies
1290

◆ add_calibration()

add_calibration ( self,
calibration )
Adds a `Calibration` that is to be used in this program to the list.
Also adds an empty dependency list to the overall dictionary.
You should not directly alter a `Calibration` object after it has been
added here.

Definition at line 1248 of file framework.py.

1248 def add_calibration(self, calibration):
1249 """
1250 Adds a `Calibration` that is to be used in this program to the list.
1251 Also adds an empty dependency list to the overall dictionary.
1252 You should not directly alter a `Calibration` object after it has been
1253 added here.
1254 """
1255 if calibration.is_valid():
1256 if calibration.name not in self.calibrations:
1257 self.calibrations[calibration.name] = calibration
1258 else:
1259 B2WARNING(f"Tried to add a calibration with the name {calibration.name} twice.")
1260 else:
1261 B2WARNING(f"Tried to add incomplete/invalid calibration ({calibration.name}) to the framework."
1262 "It was not added and will not be part of the final process.")
1263

◆ backend() [1/2]

backend ( self)
The `backend <backends.Backend>` that runs the collector job.
When set, this is checked that a `backends.Backend` class instance was passed in.

Definition at line 1465 of file framework.py.

1465 def backend(self):
1466 """
1467 The `backend <backends.Backend>` that runs the collector job.
1468 When set, this is checked that a `backends.Backend` class instance was passed in.
1469 """
1470 return self._backend
1471

◆ backend() [2/2]

backend ( self,
backend )
 

Definition at line 1473 of file framework.py.

1473 def backend(self, backend):
1474 """
1475 """
1476 if isinstance(backend, caf.backends.Backend):
1477 self._backend = backend
1478 else:
1479 B2ERROR('Backend property must inherit from Backend class.')
1480

◆ run()

run ( self,
iov = None )
Keyword Arguments:
    iov(`caf.utils.IoV`): IoV to calibrate for this processing run. Only the input files necessary to calibrate
                          this IoV will be used in the collection step.

This function runs the overall calibration job, saves the outputs to the output_dir directory,
and creates database payloads.

Upload of final databases is not done here. This simply creates the local databases in
the output directory. You should check the validity of your new local database before uploading
to the conditions DB via the basf2 tools/interface to the DB.

Definition at line 1358 of file framework.py.

1358 def run(self, iov=None):
1359 """
1360 Keyword Arguments:
1361 iov(`caf.utils.IoV`): IoV to calibrate for this processing run. Only the input files necessary to calibrate
1362 this IoV will be used in the collection step.
1363
1364 This function runs the overall calibration job, saves the outputs to the output_dir directory,
1365 and creates database payloads.
1366
1367 Upload of final databases is not done here. This simply creates the local databases in
1368 the output directory. You should check the validity of your new local database before uploading
1369 to the conditions DB via the basf2 tools/interface to the DB.
1370 """
1371 if not self.calibrations:
1372 B2FATAL("There were no Calibration objects to run. Maybe you tried to add invalid ones?")
1373 # Checks whether the dependencies we've added will give a valid order
1374 order = self._order_calibrations()
1375 if not order:
1376 B2FATAL("Couldn't order the calibrations properly. Could be a cyclic dependency.")
1377
1378 # Check that a backend has been set and use default Local() one if not
1379 self._check_backend()
1380
1381 self._prune_invalid_collections()
1382
1383 # Creates the overall output directory and reset the attribute to use an absolute path to it.
1384 self.output_dir = self._make_output_dir()
1385
1386 # Creates a SQLite DB to save the status of the various calibrations
1387 self._make_database()
1388
1389 # Enter the overall output dir during processing and opena connection to the DB
1390 with temporary_workdir(self.output_dir):
1391 db = CAFDB(self._db_path)
1392 db.open()
1393 db_initial_calibrations = db.query("select * from calibrations").fetchall()
1394 for calibration in self.calibrations.values():
1395 # Apply defaults given to the `CAF` to the calibrations if they aren't set
1396 calibration._apply_calibration_defaults(self.calibration_defaults)
1397 calibration._db_path = self._db_path
1398 calibration.output_database_dir = Path(self.output_dir, calibration.name, "outputdb").as_posix()
1399 calibration.iov = iov
1400 if not calibration.backend:
1401 calibration.backend = self.backend
1402 # Do some checking of the db to see if we need to add an entry for this calibration
1403 if calibration.name not in [db_cal[0] for db_cal in db_initial_calibrations]:
1404 db.insert_calibration(calibration.name)
1405 db.commit()
1406 else:
1407 for cal_info in db_initial_calibrations:
1408 if cal_info[0] == calibration.name:
1409 cal_initial_state = cal_info[2]
1410 cal_initial_iteration = cal_info[3]
1411 B2INFO(f"Previous entry in database found for {calibration.name}.")
1412 B2INFO(f"Setting {calibration.name} state to checkpoint state '{cal_initial_state}'.")
1413 calibration.state = cal_initial_state
1414 B2INFO(f"Setting {calibration.name} iteration to '{cal_initial_iteration}'.")
1415 calibration.iteration = cal_initial_iteration
1416 # Daemonize so that it exits if the main program exits
1417 calibration.daemon = True
1418
1419 db.close()
1420
1421 # Is it possible to keep going?
1422 keep_running = True
1423 while keep_running:
1424 keep_running = False
1425 # Do we have calibrations that may yet complete?
1426 remaining_calibrations = []
1427
1428 for calibration in self.calibrations.values():
1429 # Find the currently ended calibrations (may not be joined yet)
1430 if (calibration.state == CalibrationBase.end_state or calibration.state == CalibrationBase.fail_state):
1431 # Search for any alive Calibrations and join them
1432 if calibration.is_alive():
1433 B2DEBUG(29, f"Joining {calibration.name}.")
1434 calibration.join()
1435 else:
1436 if calibration.dependencies_met():
1437 if not calibration.is_alive():
1438 B2DEBUG(29, f"Starting {calibration.name}.")
1439 try:
1440 calibration.start()
1441 except RuntimeError:
1442 # Catch the case when the calibration just finished so it ended up here
1443 # in the "else" and not above where it should have been joined.
1444 B2DEBUG(29, f"{calibration.name} probably just finished, join it later.")
1445 remaining_calibrations.append(calibration)
1446 else:
1447 if not calibration.failed_dependencies():
1448 remaining_calibrations.append(calibration)
1449 if remaining_calibrations:
1450 keep_running = True
1451 # Loop over jobs that the calibrations want submitted and submit them.
1452 # We do this here because some backends don't like us submitting in parallel from multiple CalibrationThreads
1453 # So this is like a mini job queue without getting too clever with it
1454 for calibration in remaining_calibrations:
1455 for job in calibration.jobs_to_submit[:]:
1456 calibration.backend.submit(job)
1457 calibration.jobs_to_submit.remove(job)
1458 sleep(self.heartbeat)
1459
1460 B2INFO("Printing summary of final CAF status.")
1461 with CAFDB(self._db_path, read_only=True) as db:
1462 print(db.output_calibration_table())
1463

Member Data Documentation

◆ _backend

_backend = None
protected

Private backend attribute.

Definition at line 1236 of file framework.py.

◆ _db_name

str _db_name = "caf_state.db"
staticprotected

The name of the SQLite DB that gets created.

Definition at line 1213 of file framework.py.

◆ _db_path

_db_path = None
protected

The path of the SQLite DB.

Definition at line 1246 of file framework.py.

◆ backend

backend = caf.backends.Local()

backend property

Definition at line 1342 of file framework.py.

◆ calibration_defaults

calibration_defaults = {**self.default_calibration_config, **calibration_defaults}

Default options applied to each calibration known to the CAF, if the Calibration has these defined by the user then the defaults aren't applied.

A simple way to define the same configuration to all calibrations in the CAF.

Definition at line 1244 of file framework.py.

◆ calibrations

dict calibrations = {}

Dictionary of calibrations for this CAF instance.

You should use add_calibration to add to this.

Definition at line 1224 of file framework.py.

◆ default_calibration_config

dict default_calibration_config
static
Initial value:
= {
"max_iterations": 5,
"ignored_runs": []
}

The defaults for Calibrations.

Definition at line 1215 of file framework.py.

◆ dependencies

dict dependencies = {}

Dictionary of dependencies of Calibration objects, where value is the list of Calibration objects that the key depends on.

This attribute is filled during self.run()

Definition at line 1230 of file framework.py.

◆ future_dependencies

dict future_dependencies = {}

Dictionary of future dependencies of Calibration objects, where the value is all calibrations that will depend on the key, filled during self.run()

Definition at line 1227 of file framework.py.

◆ heartbeat

heartbeat = 5

The heartbeat (seconds) between polling for Calibrations that are finished.

Definition at line 1238 of file framework.py.

◆ order

order = None

The ordering and explicit future dependencies of calibrations.

Will be filled during CAF.run() for you.

Definition at line 1234 of file framework.py.

◆ output_dir

output_dir = "calibration_results"

Output path to store results of calibration and bookkeeping information.

Definition at line 1232 of file framework.py.


The documentation for this class was generated from the following file: