Belle II Software  release-06-01-15
beast_tuple_producer.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 
4 
11 
12 
13 # This steering script steers the production of ROOT tuples for beast.
14 #
15 # Before running, you have to put a file to IoV mapping called 'file_iov_map.pkl' in your
16 # working directory. You can create such a file using the script 'create_file_to_iov_map.py'
17 # in basf2 folder calibration/examples.
18 #
19 # basf2 beast_tuple_producer.py -- --runLow=5613 --runHigh=5613 --expNo=3
20 #
21 # This will compute tuples run-by-run for all runs between runLow and runHigh for the
22 # given experiment number.
23 
24 import multiprocessing
25 import SetMetaTimeModule
26 from caf.utils import IoV
27 import basf2 as b2
28 NUMBER_OF_PROCESSES = 20
29 
30 b2.set_log_level(b2.LogLevel.ERROR)
31 
32 b2.reset_database()
33 b2.use_central_database("Calibration_Offline_Development")
34 
35 
36 # Some ROOT tools
37 
38 
39 class CalculationProcess(multiprocessing.Process):
40  """ Main class to steer the production of ROOT tuples for beast """
41 
42  def __init__(self, iov, file_paths, output_dir):
43  """ Constructor """
44  super(CalculationProcess, self).__init__()
45 
46  self.ioviov = iov
47 
48  self.file_pathsfile_paths = file_paths
49 
50  self.output_diroutput_dir = output_dir
51 
52  def run(self):
53  """ Run """
54  # Register modules
55  rootinput = b2.register_module('RootInput')
56  rootinput.param('inputFileNames', self.file_pathsfile_paths)
57  rootinput.param(
58  'branchNames', [
59  'EventMetaData', 'RawPXDs', 'RawSVDs', 'RawCDCs'])
60  gearbox = b2.register_module('Gearbox')
61  gearbox.param('fileName', 'geometry/Beast2_phase2.xml')
62  geometry = b2.register_module('Geometry')
63  geometry.param('components', ['PXD', 'SVD', 'CDC'])
64  pxdclusterizer = b2.register_module('PXDClusterizer')
65  pxdclusterizer.param('ElectronicNoise', 1.0)
66  pxdclusterizer.param('SeedSN', 9.0)
67  pxdtupleproducer = b2.register_module('PXDBgTupleProducer')
68  pxdtupleproducer.param(
69  'outputFileName',
70  '{}/pxd_beast_tuple_exp_{}_run_{}.root'.format(
71  self.output_diroutput_dir,
72  self.ioviov.exp_low,
73  self.ioviov.run_low))
74 
75  # Create the path
76  main = b2.create_path()
77  main.add_module(rootinput)
78  main.add_module(SetMetaTimeModule.SetMetaTimeModule())
79  main.add_module(gearbox)
80  main.add_module(geometry)
81  main.add_module('PXDUnpacker')
82  main.add_module("ActivatePXDPixelMasker")
83  main.add_module("ActivatePXDGainCalibrator")
84  main.add_module("PXDRawHitSorter")
85  main.add_module(pxdclusterizer)
86  main.add_module(pxdtupleproducer)
87  main.add_module(b2.register_module('Progress'))
88 
89  # Process the run
90  b2.process(main)
91 
92 
93 #
94 # Function run by worker processes
95 #
96 
97 def worker(task_q, done_q):
98  for iov, file_paths, output_dir in iter(task_q.get, 'STOP'):
99  print("Start processing IoV={}".format(str(iov)))
100  p = CalculationProcess(iov, file_paths, output_dir)
101  p.start()
102  p.join()
103  done_q.put(iov)
104 
105 
106 if __name__ == "__main__":
107 
108  import pickle
109  import argparse
110  parser = argparse.ArgumentParser(
111  description="Produce pxd tuples and histofiles from ROOT formatted raw data")
112  parser.add_argument(
113  '--runLow',
114  default=0,
115  type=int,
116  help='Compute mask for specific IoV')
117  parser.add_argument('--runHigh', default=-1, type=int,
118  help='Compute mask for specific IoV')
119  parser.add_argument(
120  '--expNo',
121  default=3,
122  type=int,
123  help='Compute mask for specific IoV')
124  parser.add_argument(
125  '--outputDir',
126  default='./',
127  type=str,
128  help='Name of output directory for tuples')
129  args = parser.parse_args()
130 
131  # Set the IoV range for this calibration
132  iov_to_calibrate = IoV(
133  exp_low=args.expNo,
134  run_low=args.runLow,
135  exp_high=args.expNo,
136  run_high=args.runHigh)
137 
138  map_file_path = "file_iov_map.pkl"
139  with open(map_file_path, 'br') as map_file:
140  files_to_iovs = pickle.load(map_file)
141 
142  # Set of all currently known single run iovs
143  iov_set = set(files_to_iovs.values())
144 
145  # Dict mapping single run iovs to their input files
146  iovs_to_files = {}
147  for iov in iov_set:
148  if iov_to_calibrate.contains(iov):
149  file_paths = [k for k, v in files_to_iovs.items() if v == iov]
150  iovs_to_files[iov] = file_paths
151 
152  # Create queues
153  task_queue = multiprocessing.Queue()
154  done_queue = multiprocessing.Queue()
155 
156  # Submit tasks
157  for iov, file_paths in iovs_to_files.items():
158  task_queue.put((iov, file_paths, args.outputDir))
159 
160  # Start worker processes
161  for i in range(NUMBER_OF_PROCESSES):
162  multiprocessing.Process(
163  target=worker, args=(
164  task_queue, done_queue)).start()
165 
166  # Get and print results
167  print('Unordered results:')
168  for i in range(len(iovs_to_files)):
169  print('\t', done_queue.get())
170 
171  # Tell child processes to stop
172  for i in range(NUMBER_OF_PROCESSES):
173  task_queue.put('STOP')
def __init__(self, iov, file_paths, output_dir)