Belle II Software  release-08-01-10
beast_tuple_producer.py
1 #!/usr/bin/env python3
2 
3 
10 
11 
12 # This steering script steers the production of ROOT tuples for beast.
13 #
14 # Before running, you have to put a file to IoV mapping called 'file_iov_map.pkl' in your
15 # working directory. You can create such a file using the script 'create_file_to_iov_map.py'
16 # in basf2 folder calibration/examples.
17 #
18 # basf2 beast_tuple_producer.py -- --runLow=5613 --runHigh=5613 --expNo=3
19 #
20 # This will compute tuples run-by-run for all runs between runLow and runHigh for the
21 # given experiment number.
22 
23 import multiprocessing
24 import SetMetaTimeModule
25 from caf.utils import IoV
26 import basf2 as b2
27 NUMBER_OF_PROCESSES = 20
28 
29 b2.set_log_level(b2.LogLevel.ERROR)
30 
31 b2.reset_database()
32 b2.use_central_database("Calibration_Offline_Development")
33 
34 
35 # Some ROOT tools
36 
37 
38 class CalculationProcess(multiprocessing.Process):
39  """ Main class to steer the production of ROOT tuples for beast """
40 
41  def __init__(self, iov, file_paths, output_dir):
42  """ Constructor """
43  super().__init__()
44 
45  self.ioviov = iov
46 
47  self.file_pathsfile_paths = file_paths
48 
49  self.output_diroutput_dir = output_dir
50 
51  def run(self):
52  """ Run """
53  # Register modules
54  rootinput = b2.register_module('RootInput')
55  rootinput.param('inputFileNames', self.file_pathsfile_paths)
56  rootinput.param(
57  'branchNames', [
58  'EventMetaData', 'RawPXDs', 'RawSVDs', 'RawCDCs'])
59  gearbox = b2.register_module('Gearbox')
60  gearbox.param('fileName', 'geometry/Beast2_phase2.xml')
61  geometry = b2.register_module('Geometry')
62  geometry.param('components', ['PXD', 'SVD', 'CDC'])
63  pxdclusterizer = b2.register_module('PXDClusterizer')
64  pxdclusterizer.param('ElectronicNoise', 1.0)
65  pxdclusterizer.param('SeedSN', 9.0)
66  pxdtupleproducer = b2.register_module('PXDBgTupleProducer')
67  pxdtupleproducer.param(
68  'outputFileName',
69  f'{self.output_dir}/pxd_beast_tuple_exp_{self.iov.exp_low}_run_{self.iov.run_low}.root')
70 
71  # Create the path
72  main = b2.create_path()
73  main.add_module(rootinput)
74  main.add_module(SetMetaTimeModule.SetMetaTimeModule())
75  main.add_module(gearbox)
76  main.add_module(geometry)
77  main.add_module('PXDUnpacker')
78  main.add_module("ActivatePXDPixelMasker")
79  main.add_module("ActivatePXDGainCalibrator")
80  main.add_module("PXDRawHitSorter")
81  main.add_module(pxdclusterizer)
82  main.add_module(pxdtupleproducer)
83  main.add_module(b2.register_module('Progress'))
84 
85  # Process the run
86  b2.process(main)
87 
88 
89 #
90 # Function run by worker processes
91 #
92 
93 def worker(task_q, done_q):
94  for iov, file_paths, output_dir in iter(task_q.get, 'STOP'):
95  print(f"Start processing IoV={str(iov)}")
96  p = CalculationProcess(iov, file_paths, output_dir)
97  p.start()
98  p.join()
99  done_q.put(iov)
100 
101 
102 if __name__ == "__main__":
103 
104  import pickle
105  import argparse
106  parser = argparse.ArgumentParser(
107  description="Produce pxd tuples and histofiles from ROOT formatted raw data")
108  parser.add_argument(
109  '--runLow',
110  default=0,
111  type=int,
112  help='Compute mask for specific IoV')
113  parser.add_argument('--runHigh', default=-1, type=int,
114  help='Compute mask for specific IoV')
115  parser.add_argument(
116  '--expNo',
117  default=3,
118  type=int,
119  help='Compute mask for specific IoV')
120  parser.add_argument(
121  '--outputDir',
122  default='./',
123  type=str,
124  help='Name of output directory for tuples')
125  args = parser.parse_args()
126 
127  # Set the IoV range for this calibration
128  iov_to_calibrate = IoV(
129  exp_low=args.expNo,
130  run_low=args.runLow,
131  exp_high=args.expNo,
132  run_high=args.runHigh)
133 
134  map_file_path = "file_iov_map.pkl"
135  with open(map_file_path, 'br') as map_file:
136  files_to_iovs = pickle.load(map_file)
137 
138  # Set of all currently known single run iovs
139  iov_set = set(files_to_iovs.values())
140 
141  # Dict mapping single run iovs to their input files
142  iovs_to_files = {}
143  for iov in iov_set:
144  if iov_to_calibrate.contains(iov):
145  file_paths = [k for k, v in files_to_iovs.items() if v == iov]
146  iovs_to_files[iov] = file_paths
147 
148  # Create queues
149  task_queue = multiprocessing.Queue()
150  done_queue = multiprocessing.Queue()
151 
152  # Submit tasks
153  for iov, file_paths in iovs_to_files.items():
154  task_queue.put((iov, file_paths, args.outputDir))
155 
156  # Start worker processes
157  for i in range(NUMBER_OF_PROCESSES):
158  multiprocessing.Process(
159  target=worker, args=(
160  task_queue, done_queue)).start()
161 
162  # Get and print results
163  print('Unordered results:')
164  for i in range(len(iovs_to_files)):
165  print('\t', done_queue.get())
166 
167  # Tell child processes to stop
168  for i in range(NUMBER_OF_PROCESSES):
169  task_queue.put('STOP')
def __init__(self, iov, file_paths, output_dir)