Belle II Software development
beast_tuple_producer.py
1#!/usr/bin/env python3
2
3
10
11
12# This steering script steers the production of ROOT tuples for beast.
13#
14# Before running, you have to put a file to IoV mapping called 'file_iov_map.pkl' in your
15# working directory. You can create such a file using the script 'create_file_to_iov_map.py'
16# in basf2 folder calibration/examples.
17#
18# basf2 beast_tuple_producer.py -- --runLow=5613 --runHigh=5613 --expNo=3
19#
20# This will compute tuples run-by-run for all runs between runLow and runHigh for the
21# given experiment number.
22
23import multiprocessing
24import SetMetaTimeModule
25from caf.utils import IoV
26import basf2 as b2
27NUMBER_OF_PROCESSES = 20
28
29b2.set_log_level(b2.LogLevel.ERROR)
30
31b2.reset_database()
32b2.use_central_database("Calibration_Offline_Development")
33
34
35# Some ROOT tools
36
37
38class CalculationProcess(multiprocessing.Process):
39 """ Main class to steer the production of ROOT tuples for beast """
40
41 def __init__(self, iov, file_paths, output_dir):
42 """ Constructor """
43 super().__init__()
44
45 self.iov = iov
46
47 self.file_paths = file_paths
48
49 self.output_dir = output_dir
50
51 def run(self):
52 """ Run """
53 # Register modules
54 rootinput = b2.register_module('RootInput')
55 rootinput.param('inputFileNames', self.file_paths)
56 rootinput.param(
57 'branchNames', [
58 'EventMetaData', 'RawPXDs', 'RawSVDs', 'RawCDCs'])
59 gearbox = b2.register_module('Gearbox')
60 gearbox.param('fileName', 'geometry/Beast2_phase2.xml')
61 geometry = b2.register_module('Geometry')
62 geometry.param('components', ['PXD', 'SVD', 'CDC'])
63 pxdclusterizer = b2.register_module('PXDClusterizer')
64 pxdclusterizer.param('ElectronicNoise', 1.0)
65 pxdclusterizer.param('SeedSN', 9.0)
66 pxdtupleproducer = b2.register_module('PXDBgTupleProducer')
67 pxdtupleproducer.param(
68 'outputFileName',
69 f'{self.output_dir}/pxd_beast_tuple_exp_{self.iov.exp_low}_run_{self.iov.run_low}.root')
70
71 # Create the path
72 main = b2.create_path()
73 main.add_module(rootinput)
75 main.add_module(gearbox)
76 main.add_module(geometry)
77 main.add_module('PXDUnpacker')
78 main.add_module("ActivatePXDPixelMasker")
79 main.add_module("ActivatePXDGainCalibrator")
80 main.add_module("PXDRawHitSorter")
81 main.add_module(pxdclusterizer)
82 main.add_module(pxdtupleproducer)
83 main.add_module(b2.register_module('Progress'))
84
85 # Process the run
86 b2.process(main)
87
88
89#
90# Function run by worker processes
91#
92
93def worker(task_q, done_q):
94 for iov, file_paths, output_dir in iter(task_q.get, 'STOP'):
95 print(f"Start processing IoV={str(iov)}")
96 p = CalculationProcess(iov, file_paths, output_dir)
97 p.start()
98 p.join()
99 done_q.put(iov)
100
101
102if __name__ == "__main__":
103
104 import pickle
105 import argparse
106 parser = argparse.ArgumentParser(
107 description="Produce pxd tuples and histofiles from ROOT formatted raw data")
108 parser.add_argument(
109 '--runLow',
110 default=0,
111 type=int,
112 help='Compute mask for specific IoV')
113 parser.add_argument('--runHigh', default=-1, type=int,
114 help='Compute mask for specific IoV')
115 parser.add_argument(
116 '--expNo',
117 default=3,
118 type=int,
119 help='Compute mask for specific IoV')
120 parser.add_argument(
121 '--outputDir',
122 default='./',
123 type=str,
124 help='Name of output directory for tuples')
125 args = parser.parse_args()
126
127 # Set the IoV range for this calibration
128 iov_to_calibrate = IoV(
129 exp_low=args.expNo,
130 run_low=args.runLow,
131 exp_high=args.expNo,
132 run_high=args.runHigh)
133
134 map_file_path = "file_iov_map.pkl"
135 with open(map_file_path, 'br') as map_file:
136 files_to_iovs = pickle.load(map_file)
137
138 # Set of all currently known single run iovs
139 iov_set = set(files_to_iovs.values())
140
141 # Dict mapping single run iovs to their input files
142 iovs_to_files = {}
143 for iov in iov_set:
144 if iov_to_calibrate.contains(iov):
145 file_paths = [k for k, v in files_to_iovs.items() if v == iov]
146 iovs_to_files[iov] = file_paths
147
148 # Create queues
149 task_queue = multiprocessing.Queue()
150 done_queue = multiprocessing.Queue()
151
152 # Submit tasks
153 for iov, file_paths in iovs_to_files.items():
154 task_queue.put((iov, file_paths, args.outputDir))
155
156 # Start worker processes
157 for i in range(NUMBER_OF_PROCESSES):
158 multiprocessing.Process(
159 target=worker, args=(
160 task_queue, done_queue)).start()
161
162 # Get and print results
163 print('Unordered results:')
164 for i in range(len(iovs_to_files)):
165 print('\t', done_queue.get())
166
167 # Tell child processes to stop
168 for i in range(NUMBER_OF_PROCESSES):
169 task_queue.put('STOP')
def __init__(self, iov, file_paths, output_dir)