Belle II Software development
CAF Class Reference

Public Member Functions

 __init__ (self, calibration_defaults=None)
 
 add_calibration (self, calibration)
 
 run (self, iov=None)
 
 backend (self)
 
 backend (self, backend)
 

Public Attributes

dict calibrations = {}
 Dictionary of calibrations for this CAF instance.
 
dict future_dependencies = {}
 Dictionary of future dependencies of Calibration objects, where the value is all calibrations that will depend on the key, filled during self.run()
 
dict dependencies = {}
 Dictionary of dependencies of Calibration objects, where value is the list of Calibration objects that the key depends on.
 
str output_dir = "calibration_results"
 Output path to store results of calibration and bookkeeping information.
 
 order = None
 The ordering and explicit future dependencies of calibrations.
 
int heartbeat = 5
 The heartbeat (seconds) between polling for Calibrations that are finished.
 
dict calibration_defaults = {**self.default_calibration_config, **calibration_defaults}
 Default options applied to each calibration known to the CAF, if the Calibration has these defined by the user then the defaults aren't applied.
 
 backend = caf.backends.Local()
 backend property
 

Static Public Attributes

dict default_calibration_config
 The defaults for Calibrations.
 

Protected Member Functions

 _remove_missing_dependencies (self)
 
 _order_calibrations (self)
 
 _check_backend (self)
 
 _prune_invalid_collections (self)
 
 _make_output_dir (self)
 
 _make_database (self)
 

Protected Attributes

 _backend = None
 Private backend attribute.
 
 _db_path = None
 The path of the SQLite DB.
 

Static Protected Attributes

str _db_name = "caf_state.db"
 The name of the SQLite DB that gets created.
 

Detailed Description

Parameters:
  calibration_defaults (dict): A dictionary of default options for calibrations run by this `CAF` instance e.g.

                               >>> calibration_defaults={"max_iterations":2}

This class holds `Calibration` objects and processes them. It defines the initial configuration/setup
for the calibrations. But most of the real processing is done through the `caf.state_machines.CalibrationMachine`.

The `CAF` class essentially does some initial setup, holds the `CalibrationBase` instances and calls the
`CalibrationBase.start` when the dependencies are met.

Much of the checking for consistency is done in this class so that no processing is done with an invalid
setup. Choosing which files to use as input should be done from outside during the setup of the `CAF` and
`CalibrationBase` instances.

Definition at line 1183 of file framework.py.

Constructor & Destructor Documentation

◆ __init__()

__init__ ( self,
calibration_defaults = None )
 

Definition at line 1209 of file framework.py.

1209 def __init__(self, calibration_defaults=None):
1210 """
1211 """
1212
1213 self.calibrations = {}
1214
1216 self.future_dependencies = {}
1217
1219 self.dependencies = {}
1220
1221 self.output_dir = "calibration_results"
1222
1223 self.order = None
1224
1225 self._backend = None
1226
1227 self.heartbeat = 5
1228
1229 if not calibration_defaults:
1230 calibration_defaults = {}
1231
1233 self.calibration_defaults = {**self.default_calibration_config, **calibration_defaults}
1234
1235 self._db_path = None
1236

Member Function Documentation

◆ _check_backend()

_check_backend ( self)
protected
Makes sure that the CAF has a valid backend setup. If one isn't set by the user (or if the
one that is stored isn't a valid Backend object) we should create a default Local backend.

Definition at line 1324 of file framework.py.

1324 def _check_backend(self):
1325 """
1326 Makes sure that the CAF has a valid backend setup. If one isn't set by the user (or if the
1327 one that is stored isn't a valid Backend object) we should create a default Local backend.
1328 """
1329 if not isinstance(self._backend, caf.backends.Backend):
1330
1331 self.backend = caf.backends.Local()
1332

◆ _make_database()

_make_database ( self)
protected
Creates the CAF status database. If it already exists we don't overwrite it.

Definition at line 1489 of file framework.py.

1489 def _make_database(self):
1490 """
1491 Creates the CAF status database. If it already exists we don't overwrite it.
1492 """
1493 self._db_path = Path(self.output_dir, self._db_name).absolute()
1494 if self._db_path.exists():
1495 B2INFO(f"Previous CAF database found {self._db_path}")
1496 # Will create a new database + tables, or do nothing but checks we can connect to existing one
1497 with CAFDB(self._db_path):
1498 pass

◆ _make_output_dir()

_make_output_dir ( self)
protected
Creates the output directory. If it already exists we are now going to try and restart the program from the last state.

Returns:
    str: The absolute path of the new output_dir

Definition at line 1470 of file framework.py.

1470 def _make_output_dir(self):
1471 """
1472 Creates the output directory. If it already exists we are now going to try and restart the program from the last state.
1473
1474 Returns:
1475 str: The absolute path of the new output_dir
1476 """
1477 p = Path(self.output_dir).resolve()
1478 if p.is_dir():
1479 B2INFO(f"{p.as_posix()} output directory already exists. "
1480 "We will try to restart from the previous finishing state.")
1481 return p.as_posix()
1482 else:
1483 p.mkdir(parents=True)
1484 if p.is_dir():
1485 return p.as_posix()
1486 else:
1487 raise FileNotFoundError(f"Attempted to create output_dir {p.as_posix()}, but it didn't work.")
1488

◆ _order_calibrations()

_order_calibrations ( self)
protected
- Uses dependency attributes of calibrations to create a dependency dictionary and passes it
to a sorting algorithm.
- Returns valid OrderedDict if sort was successful, empty one if it failed (most likely a cyclic dependency)

Definition at line 1280 of file framework.py.

1280 def _order_calibrations(self):
1281 """
1282 - Uses dependency attributes of calibrations to create a dependency dictionary and passes it
1283 to a sorting algorithm.
1284 - Returns valid OrderedDict if sort was successful, empty one if it failed (most likely a cyclic dependency)
1285 """
1286 # First remove any dependencies on calibrations not added to the CAF
1287 self._remove_missing_dependencies()
1288 # Filling dependencies dictionaries of CAF for sorting, only explicit dependencies for now
1289 # Note that they currently use the names not the calibration objects.
1290 for calibration in self.calibrations.values():
1291 future_dependencies_names = [dependency.name for dependency in calibration.future_dependencies]
1292 past_dependencies_names = [dependency.name for dependency in calibration.dependencies]
1293
1294 self.future_dependencies[calibration.name] = future_dependencies_names
1295 self.dependencies[calibration.name] = past_dependencies_names
1296 # Gives us a list of A (not THE) valid ordering and checks for cyclic dependencies
1297 order = topological_sort(self.future_dependencies)
1298 if not order:
1299 return False
1300
1301 # Get an ordered dictionary of the sort order but including all implicit dependencies.
1302 ordered_full_dependencies = all_dependencies(self.future_dependencies, order)
1303
1304 # Return all the implicit+explicit past dependencies
1305 full_past_dependencies = past_from_future_dependencies(ordered_full_dependencies)
1306 # Correct each calibration's dependency list to reflect the implicit dependencies
1307 for calibration in self.calibrations.values():
1308 full_deps = full_past_dependencies[calibration.name]
1309 explicit_deps = [cal.name for cal in calibration.dependencies]
1310 for dep in full_deps:
1311 if dep not in explicit_deps:
1312 calibration.dependencies.append(self.calibrations[dep])
1313 # At this point the calibrations have their full dependencies but they aren't in topological
1314 # sort order. Correct that here
1315 ordered_dependency_list = []
1316 for ordered_calibration_name in order:
1317 if ordered_calibration_name in [dep.name for dep in calibration.dependencies]:
1318 ordered_dependency_list.append(self.calibrations[ordered_calibration_name])
1319 calibration.dependencies = ordered_dependency_list
1320 order = ordered_full_dependencies
1321 # We should also patch in all of the implicit dependencies for the calibrations
1322 return order
1323

◆ _prune_invalid_collections()

_prune_invalid_collections ( self)
protected
Checks all current calibrations and removes any invalid Collections from their collections list.

Definition at line 1333 of file framework.py.

1333 def _prune_invalid_collections(self):
1334 """
1335 Checks all current calibrations and removes any invalid Collections from their collections list.
1336 """
1337 B2INFO("Checking for any invalid Collections in Calibrations.")
1338 for calibration in self.calibrations.values():
1339 valid_collections = {}
1340 for name, collection in calibration.collections.items():
1341 if collection.is_valid():
1342 valid_collections[name] = collection
1343 else:
1344 B2WARNING(f"Removing invalid Collection '{name}' from Calibration '{calibration.name}'.")
1345 calibration.collections = valid_collections
1346

◆ _remove_missing_dependencies()

_remove_missing_dependencies ( self)
protected
This checks the future and past dependencies of each `Calibration` in the `CAF`.
If any dependencies are not known to the `CAF` then they are removed from the `Calibration`
object directly.

Definition at line 1253 of file framework.py.

1253 def _remove_missing_dependencies(self):
1254 """
1255 This checks the future and past dependencies of each `Calibration` in the `CAF`.
1256 If any dependencies are not known to the `CAF` then they are removed from the `Calibration`
1257 object directly.
1258 """
1259 calibration_names = [calibration.name for calibration in self.calibrations.values()]
1260
1261 def is_dependency_in_caf(dependency):
1262 """
1263 Quick function to use with filter() and check dependencies against calibrations known to `CAF`
1264 """
1265 dependency_in_caf = dependency.name in calibration_names
1266 if not dependency_in_caf:
1267 B2WARNING(f"The calibration {dependency.name} is a required dependency but is not in the CAF."
1268 " It has been removed as a dependency.")
1269 return dependency_in_caf
1270
1271 # Check that there aren't dependencies on calibrations not added to the framework
1272 # Remove them from the calibration objects if there are.
1273 for calibration in self.calibrations.values():
1274 filtered_future_dependencies = list(filter(is_dependency_in_caf, calibration.future_dependencies))
1275 calibration.future_dependencies = filtered_future_dependencies
1276
1277 filtered_dependencies = list(filter(is_dependency_in_caf, calibration.dependencies))
1278 calibration.dependencies = filtered_dependencies
1279

◆ add_calibration()

add_calibration ( self,
calibration )
Adds a `Calibration` that is to be used in this program to the list.
Also adds an empty dependency list to the overall dictionary.
You should not directly alter a `Calibration` object after it has been
added here.

Definition at line 1237 of file framework.py.

1237 def add_calibration(self, calibration):
1238 """
1239 Adds a `Calibration` that is to be used in this program to the list.
1240 Also adds an empty dependency list to the overall dictionary.
1241 You should not directly alter a `Calibration` object after it has been
1242 added here.
1243 """
1244 if calibration.is_valid():
1245 if calibration.name not in self.calibrations:
1246 self.calibrations[calibration.name] = calibration
1247 else:
1248 B2WARNING(f"Tried to add a calibration with the name {calibration.name} twice.")
1249 else:
1250 B2WARNING(f"Tried to add incomplete/invalid calibration ({calibration.name}) to the framework."
1251 "It was not added and will not be part of the final process.")
1252

◆ backend() [1/2]

backend ( self)
The `backend <backends.Backend>` that runs the collector job.
When set, this is checked that a `backends.Backend` class instance was passed in.

Definition at line 1454 of file framework.py.

1454 def backend(self):
1455 """
1456 The `backend <backends.Backend>` that runs the collector job.
1457 When set, this is checked that a `backends.Backend` class instance was passed in.
1458 """
1459 return self._backend
1460

◆ backend() [2/2]

backend ( self,
backend )
 

Definition at line 1462 of file framework.py.

1462 def backend(self, backend):
1463 """
1464 """
1465 if isinstance(backend, caf.backends.Backend):
1466 self._backend = backend
1467 else:
1468 B2ERROR('Backend property must inherit from Backend class.')
1469

◆ run()

run ( self,
iov = None )
Keyword Arguments:
    iov(`caf.utils.IoV`): IoV to calibrate for this processing run. Only the input files necessary to calibrate
                          this IoV will be used in the collection step.

This function runs the overall calibration job, saves the outputs to the output_dir directory,
and creates database payloads.

Upload of final databases is not done here. This simply creates the local databases in
the output directory. You should check the validity of your new local database before uploading
to the conditions DB via the basf2 tools/interface to the DB.

Definition at line 1347 of file framework.py.

1347 def run(self, iov=None):
1348 """
1349 Keyword Arguments:
1350 iov(`caf.utils.IoV`): IoV to calibrate for this processing run. Only the input files necessary to calibrate
1351 this IoV will be used in the collection step.
1352
1353 This function runs the overall calibration job, saves the outputs to the output_dir directory,
1354 and creates database payloads.
1355
1356 Upload of final databases is not done here. This simply creates the local databases in
1357 the output directory. You should check the validity of your new local database before uploading
1358 to the conditions DB via the basf2 tools/interface to the DB.
1359 """
1360 if not self.calibrations:
1361 B2FATAL("There were no Calibration objects to run. Maybe you tried to add invalid ones?")
1362 # Checks whether the dependencies we've added will give a valid order
1363 order = self._order_calibrations()
1364 if not order:
1365 B2FATAL("Couldn't order the calibrations properly. Could be a cyclic dependency.")
1366
1367 # Check that a backend has been set and use default Local() one if not
1368 self._check_backend()
1369
1370 self._prune_invalid_collections()
1371
1372 # Creates the overall output directory and reset the attribute to use an absolute path to it.
1373 self.output_dir = self._make_output_dir()
1374
1375 # Creates a SQLite DB to save the status of the various calibrations
1376 self._make_database()
1377
1378 # Enter the overall output dir during processing and opena connection to the DB
1379 with temporary_workdir(self.output_dir):
1380 db = CAFDB(self._db_path)
1381 db.open()
1382 db_initial_calibrations = db.query("select * from calibrations").fetchall()
1383 for calibration in self.calibrations.values():
1384 # Apply defaults given to the `CAF` to the calibrations if they aren't set
1385 calibration._apply_calibration_defaults(self.calibration_defaults)
1386 calibration._db_path = self._db_path
1387 calibration.output_database_dir = Path(self.output_dir, calibration.name, "outputdb").as_posix()
1388 calibration.iov = iov
1389 if not calibration.backend:
1390 calibration.backend = self.backend
1391 # Do some checking of the db to see if we need to add an entry for this calibration
1392 if calibration.name not in [db_cal[0] for db_cal in db_initial_calibrations]:
1393 db.insert_calibration(calibration.name)
1394 db.commit()
1395 else:
1396 for cal_info in db_initial_calibrations:
1397 if cal_info[0] == calibration.name:
1398 cal_initial_state = cal_info[2]
1399 cal_initial_iteration = cal_info[3]
1400 B2INFO(f"Previous entry in database found for {calibration.name}.")
1401 B2INFO(f"Setting {calibration.name} state to checkpoint state '{cal_initial_state}'.")
1402 calibration.state = cal_initial_state
1403 B2INFO(f"Setting {calibration.name} iteration to '{cal_initial_iteration}'.")
1404 calibration.iteration = cal_initial_iteration
1405 # Daemonize so that it exits if the main program exits
1406 calibration.daemon = True
1407
1408 db.close()
1409
1410 # Is it possible to keep going?
1411 keep_running = True
1412 while keep_running:
1413 keep_running = False
1414 # Do we have calibrations that may yet complete?
1415 remaining_calibrations = []
1416
1417 for calibration in self.calibrations.values():
1418 # Find the currently ended calibrations (may not be joined yet)
1419 if (calibration.state == CalibrationBase.end_state or calibration.state == CalibrationBase.fail_state):
1420 # Search for any alive Calibrations and join them
1421 if calibration.is_alive():
1422 B2DEBUG(29, f"Joining {calibration.name}.")
1423 calibration.join()
1424 else:
1425 if calibration.dependencies_met():
1426 if not calibration.is_alive():
1427 B2DEBUG(29, f"Starting {calibration.name}.")
1428 try:
1429 calibration.start()
1430 except RuntimeError:
1431 # Catch the case when the calibration just finished so it ended up here
1432 # in the "else" and not above where it should have been joined.
1433 B2DEBUG(29, f"{calibration.name} probably just finished, join it later.")
1434 remaining_calibrations.append(calibration)
1435 else:
1436 if not calibration.failed_dependencies():
1437 remaining_calibrations.append(calibration)
1438 if remaining_calibrations:
1439 keep_running = True
1440 # Loop over jobs that the calibrations want submitted and submit them.
1441 # We do this here because some backends don't like us submitting in parallel from multiple CalibrationThreads
1442 # So this is like a mini job queue without getting too clever with it
1443 for calibration in remaining_calibrations:
1444 for job in calibration.jobs_to_submit[:]:
1445 calibration.backend.submit(job)
1446 calibration.jobs_to_submit.remove(job)
1447 sleep(self.heartbeat)
1448
1449 B2INFO("Printing summary of final CAF status.")
1450 with CAFDB(self._db_path, read_only=True) as db:
1451 print(db.output_calibration_table())
1452

Member Data Documentation

◆ _backend

_backend = None
protected

Private backend attribute.

Definition at line 1225 of file framework.py.

◆ _db_name

str _db_name = "caf_state.db"
staticprotected

The name of the SQLite DB that gets created.

Definition at line 1202 of file framework.py.

◆ _db_path

_db_path = None
protected

The path of the SQLite DB.

Definition at line 1235 of file framework.py.

◆ backend

backend = caf.backends.Local()

backend property

Definition at line 1331 of file framework.py.

◆ calibration_defaults

calibration_defaults = {**self.default_calibration_config, **calibration_defaults}

Default options applied to each calibration known to the CAF, if the Calibration has these defined by the user then the defaults aren't applied.

A simple way to define the same configuration to all calibrations in the CAF.

Definition at line 1233 of file framework.py.

◆ calibrations

dict calibrations = {}

Dictionary of calibrations for this CAF instance.

You should use add_calibration to add to this.

Definition at line 1213 of file framework.py.

◆ default_calibration_config

dict default_calibration_config
static
Initial value:
= {
"max_iterations": 5,
"ignored_runs": []
}

The defaults for Calibrations.

Definition at line 1204 of file framework.py.

◆ dependencies

dict dependencies = {}

Dictionary of dependencies of Calibration objects, where value is the list of Calibration objects that the key depends on.

This attribute is filled during self.run()

Definition at line 1219 of file framework.py.

◆ future_dependencies

dict future_dependencies = {}

Dictionary of future dependencies of Calibration objects, where the value is all calibrations that will depend on the key, filled during self.run()

Definition at line 1216 of file framework.py.

◆ heartbeat

heartbeat = 5

The heartbeat (seconds) between polling for Calibrations that are finished.

Definition at line 1227 of file framework.py.

◆ order

order = None

The ordering and explicit future dependencies of calibrations.

Will be filled during CAF.run() for you.

Definition at line 1223 of file framework.py.

◆ output_dir

output_dir = "calibration_results"

Output path to store results of calibration and bookkeeping information.

Definition at line 1221 of file framework.py.


The documentation for this class was generated from the following file: