Belle II Software  release-05-01-25
processing.py
1 import argparse
2 import multiprocessing
3 import sys
4 
5 import basf2
6 import ROOT
7 
8 from softwaretrigger import constants
9 from pxd import add_roi_payload_assembler, add_roi_finder
10 
11 from reconstruction import add_reconstruction, add_cosmics_reconstruction
12 from softwaretrigger import path_utils
13 import reconstruction
14 from geometry import check_components
15 from rawdata import add_unpackers
16 
17 
18 def setup_basf2_and_db(zmq=False):
19  """
20  Setup local database usage for HLT
21  """
22  parser = argparse.ArgumentParser(description='basf2 for online')
23 
24  if zmq:
25  parser.add_argument("--input", required=True, type=str, help="ZMQ Address of the distributor process")
26  parser.add_argument("--output", required=True, type=str, help="ZMQ Address of the collector process")
27  parser.add_argument("--dqm", required=True, type=str, help="ZMQ Address of the histoserver process")
28  else:
29  parser.add_argument('input_buffer_name', type=str,
30  help='Input Ring Buffer names')
31  parser.add_argument('output_buffer_name', type=str,
32  help='Output Ring Buffer name')
33  parser.add_argument('histo_port', type=int,
34  help='Port of the HistoManager to connect to')
35  parser.add_argument('--input-file', type=str,
36  help="Input sroot file, if set no RingBuffer input will be used",
37  default=None)
38  parser.add_argument('--output-file', type=str,
39  help="Filename for SeqRoot output, if set no RingBuffer output will be used",
40  default=None)
41  parser.add_argument('--histo-output-file', type=str,
42  help="Filename for histogram output",
43  default=None)
44  parser.add_argument('--no-output',
45  help="Don't write any output files",
46  action="store_true", default=False)
47 
48  parser.add_argument('--number-processes', type=int, default=multiprocessing.cpu_count()-5,
49  help='Number of parallel processes to use')
50  parser.add_argument('--local-db-path', type=str,
51  help="set path to the local payload locations to use for the ConditionDB",
52  default=constants.DEFAULT_DB_FILE_LOCATION)
53  parser.add_argument('--central-db-tag', type=str, nargs="*",
54  help="Use the central db with a specific tag (can be applied multiple times, order is relevant)")
55 
56  args = parser.parse_args()
57 
58  # Local DB specification
59  basf2.reset_database()
60  basf2.conditions.override_globaltags()
61  if args.central_db_tag:
62  for central_tag in args.central_db_tag:
63  basf2.conditions.prepend_globaltag(central_tag)
64  else:
65  basf2.conditions.globaltags = ["online"]
66  basf2.conditions.metadata_providers = ["file://" + basf2.find_file(args.local_db_path + "/metadata.sqlite")]
67  basf2.conditions.payload_locations = [basf2.find_file(args.local_db_path)]
68 
69  # Number of processes
70  basf2.set_nprocesses(args.number_processes)
71 
72  # Logging
73  basf2.set_log_level(basf2.LogLevel.ERROR)
74  # And because reasons we want every log message to be only one line,
75  # otherwise the LogFilter in daq_slc throws away the other lines
76  basf2.logging.enable_escape_newlines = True
77 
78  return args
79 
80 
81 def start_path(args, location):
82  """
83  Create and return a path used for HLT and ExpressReco running
84  """
85  path = basf2.create_path()
86 
87  input_buffer_module_name = ""
88  if location == constants.Location.expressreco:
89  input_buffer_module_name = "Rbuf2Ds"
90  elif location == constants.Location.hlt:
91  input_buffer_module_name = "Raw2Ds"
92  else:
93  basf2.B2FATAL(f"Does not know location {location}")
94 
95  # Input
96  if not args.input_file:
97  path.add_module(input_buffer_module_name, RingBufferName=args.input_buffer_name)
98  else:
99  if args.input_file.endswith(".sroot"):
100  path.add_module('SeqRootInput', inputFileName=args.input_file)
101  else:
102  path.add_module('RootInput', inputFileName=args.input_file)
103 
104  # Histogram Handling
105  if not args.histo_output_file:
106  path.add_module('DqmHistoManager', Port=args.histo_port, DumpInterval=1000, workDirName="/tmp/")
107  else:
108  path.add_module('HistoManager', histoFileName=args.histo_output_file)
109 
110  return path
111 
112 
113 def start_zmq_path(args, location):
114  path = basf2.Path()
115  reco_path = basf2.Path()
116 
117  if location == constants.Location.expressreco:
118  input_module = path.add_module("HLTZMQ2Ds", input=args.input, addExpressRecoObjects=True)
119  elif location == constants.Location.hlt:
120  input_module = path.add_module("HLTZMQ2Ds", input=args.input)
121  else:
122  basf2.B2FATAL(f"Does not know location {location}")
123 
124  input_module.if_value("==0", reco_path, basf2.AfterConditionPath.CONTINUE)
125  reco_path.add_module("HLTDQM2ZMQ", output=args.dqm, sendOutInterval=30)
126 
127  return path, reco_path
128 
129 
130 def add_hlt_processing(path,
131  run_type=constants.RunTypes.beam,
132  softwaretrigger_mode=constants.SoftwareTriggerModes.filter,
133  prune_input=True,
134  prune_output=True,
135  unpacker_components=None,
136  reco_components=None,
137  create_hlt_unit_histograms=True,
138  **kwargs):
139  """
140  Add all modules for processing on HLT filter machines
141  """
142  path.add_module('StatisticsSummary').set_name('Sum_Wait')
143 
144  if unpacker_components is None:
145  unpacker_components = constants.DEFAULT_HLT_COMPONENTS
146  if reco_components is None:
147  reco_components = constants.DEFAULT_HLT_COMPONENTS
148 
149  check_components(unpacker_components)
150  check_components(reco_components)
151 
152  # ensure that only DataStore content is present that we expect in
153  # in the HLT configuration. If ROIpayloads or tracks are present in the
154  # input file, this can be a problem and lead to crashes
155  if prune_input:
156  path.add_module("PruneDataStore", matchEntries=constants.HLT_INPUT_OBJECTS)
157 
158  # Add the geometry (if not already present)
159  path_utils.add_geometry_if_not_present(path)
160  path.add_module('StatisticsSummary').set_name('Sum_Initialization')
161 
162  # Unpack the event content
163  add_unpackers(path, components=unpacker_components, writeKLMDigitRaws=True)
164  path.add_module('StatisticsSummary').set_name('Sum_Unpackers')
165 
166  # Build one path for all accepted events...
167  accept_path = basf2.Path()
168 
169  # Do the reconstruction needed for the HLT decision
170  path_utils.add_filter_reconstruction(path, run_type=run_type, components=reco_components, **kwargs)
171 
172  # Add the part of the dqm modules, which should run after every reconstruction
173  path_utils.add_hlt_dqm(path, run_type=run_type, components=reco_components, dqm_mode=constants.DQMModes.before_filter,
174  create_hlt_unit_histograms=create_hlt_unit_histograms)
175 
176  # Only turn on the filtering (by branching the path) if filtering is turned on
177  if softwaretrigger_mode == constants.SoftwareTriggerModes.filter:
178  # Now split up the path according to the HLT decision
179  hlt_filter_module = path_utils.add_filter_module(path)
180 
181  # There are two possibilities for the output of this module
182  # (1) the event is dismissed -> only store the metadata
183  path_utils.hlt_event_abort(hlt_filter_module, "==0", ROOT.Belle2.EventMetaData.c_HLTDiscard)
184  # (2) the event is accepted -> go on with the hlt reconstruction
185  hlt_filter_module.if_value("==1", accept_path, basf2.AfterConditionPath.CONTINUE)
186  accept_path.add_module('StatisticsSummary').set_name('Sum_HLT_Discard')
187  elif softwaretrigger_mode == constants.SoftwareTriggerModes.monitor:
188  # Otherwise just always go with the accept path
189  path.add_path(accept_path)
190  else:
191  basf2.B2FATAL(f"The software trigger mode {softwaretrigger_mode} is not supported.")
192 
193  # For accepted events we continue the reconstruction
194  path_utils.add_post_filter_reconstruction(accept_path, run_type=run_type, components=reco_components)
195 
196  # Only create the ROIs for accepted events
197  add_roi_finder(accept_path)
198  accept_path.add_module('StatisticsSummary').set_name('Sum_ROI_Finder')
199 
200  # Add the HLT DQM modules only in case the event is accepted
201  path_utils.add_hlt_dqm(
202  accept_path,
203  run_type=run_type,
204  components=reco_components,
205  dqm_mode=constants.DQMModes.filtered,
206  create_hlt_unit_histograms=create_hlt_unit_histograms)
207 
208  # Make sure to create ROI payloads for sending them to ONSEN
209  # Do this for all events
210  # Normally, the payload assembler marks the event with the software trigger decision to inform the hardware to
211  # drop the data for the event in case the decision is "no"
212  # However, if we are running in monitoring mode, we ignore the decision
213  pxd_ignores_hlt_decision = (softwaretrigger_mode == constants.SoftwareTriggerModes.monitor)
214  add_roi_payload_assembler(path, ignore_hlt_decision=pxd_ignores_hlt_decision)
215  path.add_module('StatisticsSummary').set_name('Sum_ROI_Payload_Assembler')
216 
217  # Add the part of the dqm modules, which should run on all events, not only on the accepted onces
218  path_utils.add_hlt_dqm(path, run_type=run_type, components=reco_components, dqm_mode=constants.DQMModes.all_events,
219  create_hlt_unit_histograms=create_hlt_unit_histograms)
220 
221  if prune_output:
222  # And in the end remove everything which should not be stored
223  path_utils.add_store_only_rawdata_path(path)
224  path.add_module('StatisticsSummary').set_name('Sum_Close_Event')
225 
226 
227 def add_expressreco_processing(path,
228  run_type=constants.RunTypes.beam,
229  select_only_accepted_events=False,
230  prune_input=True,
231  prune_output=True,
232  unpacker_components=None,
233  reco_components=None,
234  do_reconstruction=True,
235  **kwargs):
236  """
237  Add all modules for processing on the ExpressReco machines
238  """
239  if unpacker_components is None:
240  unpacker_components = constants.DEFAULT_EXPRESSRECO_COMPONENTS
241  if reco_components is None:
242  reco_components = constants.DEFAULT_EXPRESSRECO_COMPONENTS
243 
244  check_components(unpacker_components)
245  check_components(reco_components)
246 
247  # If turned on, only events selected by the HLT will go to ereco.
248  # this is needed as by default also un-selected events will get passed to ereco,
249  # however they are empty.
250  if select_only_accepted_events:
251  skim_module = path.add_module("TriggerSkim", triggerLines=["software_trigger_cut&all&total_result"], resultOnMissing=0)
252  skim_module.if_value("==0", basf2.Path(), basf2.AfterConditionPath.END)
253 
254  # ensure that only DataStore content is present that we expect in
255  # in the ExpressReco configuration. If tracks are present in the
256  # input file, this can be a problem and lead to crashes
257  if prune_input:
258  path.add_module("PruneDataStore", matchEntries=constants.EXPRESSRECO_INPUT_OBJECTS)
259 
260  path_utils.add_geometry_if_not_present(path)
261  add_unpackers(path, components=unpacker_components)
262 
263  if do_reconstruction:
264  if run_type == constants.RunTypes.beam:
265  add_reconstruction(path, components=reco_components, pruneTracks=False,
266  skipGeometryAdding=True, add_trigger_calculation=False, **kwargs)
267  elif run_type == constants.RunTypes.cosmic:
268  add_cosmics_reconstruction(path, components=reco_components, pruneTracks=False,
269  skipGeometryAdding=True, **kwargs)
270  else:
271  basf2.B2FATAL("Run Type {} not supported.".format(run_type))
272 
273  path_utils.add_expressreco_dqm(path, run_type, components=reco_components)
274 
275  if prune_output:
276  path.add_module("PruneDataStore", matchEntries=constants.ALWAYS_SAVE_OBJECTS + constants.RAWDATA_OBJECTS +
277  constants.PROCESSED_OBJECTS)
278 
279 
280 def finalize_path(path, args, location, show_progress_bar=True):
281  """
282  Add the required output modules for expressreco/HLT
283  """
284  save_objects = constants.ALWAYS_SAVE_OBJECTS + constants.RAWDATA_OBJECTS
285  if location == constants.Location.expressreco:
286  save_objects += constants.PROCESSED_OBJECTS
287 
288  if show_progress_bar:
289  path.add_module("Progress")
290 
291  # Limit streaming objects for parallel processing
292  basf2.set_streamobjs(save_objects)
293 
294  if args.no_output:
295  return
296 
297  output_buffer_module_name = ""
298  if location == constants.Location.expressreco:
299  output_buffer_module_name = "Ds2Sample"
300  elif location == constants.Location.hlt:
301  output_buffer_module_name = "Ds2Rbuf"
302  else:
303  basf2.B2FATAL(f"Does not know location {location}")
304 
305  if not args.output_file:
306  path.add_module(output_buffer_module_name, RingBufferName=args.output_buffer_name,
307  saveObjs=save_objects)
308  else:
309  if args.output_file.endswith(".sroot"):
310  path.add_module("SeqRootOutput", saveObjs=save_objects, outputFileName=args.output_file)
311  else:
312  # We are storing everything on purpose!
313  path.add_module("RootOutput", outputFileName=args.output_file)
314 
315 
316 def finalize_zmq_path(path, args, location):
317  """
318  Add the required output modules for expressreco/HLT
319  """
320  save_objects = constants.ALWAYS_SAVE_OBJECTS + constants.RAWDATA_OBJECTS
321  if location == constants.Location.expressreco:
322  save_objects += constants.PROCESSED_OBJECTS
323 
324  # Limit streaming objects for parallel processing
325  basf2.set_streamobjs(save_objects)
326 
327  if location == constants.Location.expressreco:
328  path.add_module("HLTDs2ZMQ", output=args.output, raw=False)
329  elif location == constants.Location.hlt:
330  path.add_module("HLTDs2ZMQ", output=args.output, raw=True)
331  else:
332  basf2.B2FATAL(f"Does not know location {location}")