Belle II Software development
caf_algorithmdb_access.py
1
18
19import basf2 as b2
20
21import os
22import sys
23
24from caf.framework import Calibration, CAF
25from caf import backends
26
27
28def main(argv):
29 if len(argv) == 1:
30 data_dir = argv[0]
31 else:
32 print("Usage: python3 caf_multiple_options.py <data directory>")
33 sys.exit(1)
34
35
39 input_files = [os.path.join(os.path.abspath(data_dir), '*.root')]
40
41
43 dep_col = b2.register_module('CaTest')
44 dep_col.set_name('Dependent') # Sets the prefix of the collected data in the datastore.
45 # Allows us to execute algorithm over input data in a run-by-run way so we can see DB constants from each run.
46 dep_col.param('granularity', 'run')
47 # Specific parameter to our test collector, proportional to the probability of algorithm requesting iteration.
48 dep_col.param('spread', 15)
49
50 from ROOT import Belle2 # noqa: make the Belle2 namespace available
51 from ROOT.Belle2 import TestDBAccessAlgorithm
52 dep_alg = TestDBAccessAlgorithm()
53 # Since we're using several instances of the same test algorithm here, we still want the database entries to have
54 # different names. TestCalibrationAlgorithm outputs to the database using the prefix name so we change it
55 # for each calibration. Not something you'd usually have to do. And you could instead use whatever name you want,
56 # we just use the prefix as a handy string.
57 dep_alg.setPrefix('Dependent') # Must be the same as colllector prefix
58
59 # Create our Calibration object
60 dep_cal = Calibration(name='Dependent_Calibration',
61 collector=dep_col,
62 algorithms=dep_alg,
63 input_files=input_files)
64
65 # Let's split up the collector jobs by files
66 dep_cal.max_files_per_collector_job = 1
67 dep_cal.max_iterations = 5
68
69 # This one runs last and picks up the final DB constants from the first one
70 last_col = b2.register_module('CaTest')
71 last_col.set_name('Last') # Sets the prefix of the collected data in the datastore.
72 # Allows us to execute algorithm over input data in a run-by-run way so we can see DB constants from each run.
73 last_col.param('granularity', 'run')
74 # Specific parameter to our test collector, proportional to the probability of algorithm requesting iteration.
75 last_col.param('spread', 15)
76
77 last_alg = TestDBAccessAlgorithm()
78 last_alg.setGeneratePayloads(False)
79 last_alg.setPrefix('Last') # Must be the same as colllector prefix
80
81 # Create our Calibration object
82 last_cal = Calibration(name='Last_Calibration',
83 collector=last_col,
84 algorithms=last_alg,
85 input_files=input_files)
86
87 # Let's split up the collector jobs by files
88 last_cal.max_files_per_collector_job = 1
89 last_cal.max_iterations = 5
90
91 # Define dependencies
92 last_cal.depends_on(dep_cal)
93
94
96 cal_fw = CAF()
97 cal_fw.add_calibration(dep_cal)
98 cal_fw.add_calibration(last_cal)
99 # Subjobs from collector jobs being split over input files can be parallelized.
100 # Also Calibrations 1 and 2, can be run at the same time.
101 # If you have 4 cores this backend will run them whenever one of the 4 processes becomes available
102 # For larger data or more calibrations, consider using the LSF or PBS batch system backends at your site
103 cal_fw.backend = backends.Local(max_processes=4)
104 # Start her up!
105 cal_fw.run()
106 print("End of CAF processing.")
107
108
109if __name__ == "__main__":
110 main(sys.argv[1:])
Definition: main.py:1