Belle II Software  release-08-01-10
processing.py
1 
8 import os
9 import argparse
10 import multiprocessing
11 import tempfile
12 
13 import basf2
14 
15 from softwaretrigger import constants
16 from pxd import add_roi_payload_assembler, add_roi_finder
17 
18 from reconstruction import add_reconstruction, add_cosmics_reconstruction
19 from softwaretrigger import path_utils
20 from geometry import check_components
21 from rawdata import add_unpackers
22 
23 
24 def setup_basf2_and_db(zmq=False):
25  """
26  Setup local database usage for HLT
27  """
28  parser = argparse.ArgumentParser(description='basf2 for online')
29 
30  if zmq:
31  parser.add_argument("--input", required=True, type=str, help="ZMQ Address of the distributor process")
32  parser.add_argument("--output", required=True, type=str, help="ZMQ Address of the collector process")
33  parser.add_argument("--dqm", required=True, type=str, help="ZMQ Address of the histoserver process")
34  else:
35  parser.add_argument('input_buffer_name', type=str,
36  help='Input Ring Buffer names')
37  parser.add_argument('output_buffer_name', type=str,
38  help='Output Ring Buffer name')
39  parser.add_argument('histo_port', type=int,
40  help='Port of the HistoManager to connect to')
41  parser.add_argument('--input-file', type=str,
42  help="Input sroot file, if set no RingBuffer input will be used",
43  default=None)
44  parser.add_argument('--output-file', type=str,
45  help="Filename for SeqRoot output, if set no RingBuffer output will be used",
46  default=None)
47  parser.add_argument('--histo-output-file', type=str,
48  help="Filename for histogram output",
49  default=None)
50  parser.add_argument('--no-output',
51  help="Don't write any output files",
52  action="store_true", default=False)
53 
54  parser.add_argument('--number-processes', type=int, default=multiprocessing.cpu_count() - 5,
55  help='Number of parallel processes to use')
56  parser.add_argument('--local-db-path', type=str,
57  help="set path to the local payload locations to use for the ConditionDB",
58  default=constants.DEFAULT_DB_FILE_LOCATION)
59  parser.add_argument('--central-db-tag', type=str, nargs="*",
60  help="Use the central db with a specific tag (can be applied multiple times, order is relevant)")
61  parser.add_argument('--udp-hostname', type=str,
62  help="set hostname for UDP logging connection", default=None)
63  parser.add_argument('--udp-port', type=int,
64  help="set port number for UDP logging connection", default=None)
65 
66  args = parser.parse_args()
67 
68  # Local DB specification
69  basf2.conditions.override_globaltags()
70  if args.central_db_tag:
71  for central_tag in args.central_db_tag:
72  basf2.conditions.prepend_globaltag(central_tag)
73  else:
74  basf2.conditions.globaltags = ["online"]
75  basf2.conditions.metadata_providers = ["file://" + basf2.find_file(args.local_db_path + "/metadata.sqlite")]
76  basf2.conditions.payload_locations = [basf2.find_file(args.local_db_path)]
77 
78  # Number of processes
79  basf2.set_nprocesses(args.number_processes)
80 
81  # basf2 logging setup
82  basf2.set_log_level(basf2.LogLevel.ERROR)
83  # And because reasons we want every log message to be only one line,
84  # otherwise the LogFilter in daq_slc throws away the other lines
85  basf2.logging.enable_escape_newlines = True
86 
87  # UDP logging
88  if (args.udp_hostname is not None) and (args.udp_port is not None):
89  basf2.logging.add_udp(args.udp_hostname, args.udp_port)
90 
91  # Online realm
92  basf2.set_realm("online")
93 
94  return args
95 
96 
97 def start_path(args, location):
98  """
99  Create and return a path used for HLT and ExpressReco running
100  """
101  path = basf2.create_path()
102 
103  input_buffer_module_name = ""
104  if location == constants.Location.expressreco:
105  input_buffer_module_name = "Rbuf2Ds"
106  elif location == constants.Location.hlt:
107  input_buffer_module_name = "Raw2Ds"
108  else:
109  basf2.B2FATAL(f"Does not know location {location}")
110 
111  # Input
112  if not args.input_file:
113  path.add_module(input_buffer_module_name, RingBufferName=args.input_buffer_name)
114  else:
115  if args.input_file.endswith(".sroot"):
116  path.add_module('SeqRootInput', inputFileName=args.input_file)
117  else:
118  path.add_module('RootInput', inputFileName=args.input_file)
119 
120  # Histogram Handling
121  if not args.histo_output_file:
122  path.add_module('DqmHistoManager', Port=args.histo_port, DumpInterval=1000, workDirName=tempfile.gettempdir()+"/")
123  else:
124  workdir = os.path.dirname(args.histo_output_file)
125  filename = os.path.basename(args.histo_output_file)
126  path.add_module('HistoManager', histoFileName=filename, workDirName=workdir)
127 
128  return path
129 
130 
131 def start_zmq_path(args, location):
132  path = basf2.Path()
133  reco_path = basf2.Path()
134 
135  if location == constants.Location.expressreco:
136  input_module = path.add_module("HLTZMQ2Ds", input=args.input, addExpressRecoObjects=True)
137  elif location == constants.Location.hlt:
138  input_module = path.add_module("HLTZMQ2Ds", input=args.input)
139  else:
140  basf2.B2FATAL(f"Does not know location {location}")
141 
142  input_module.if_value("==0", reco_path, basf2.AfterConditionPath.CONTINUE)
143  reco_path.add_module("HLTDQM2ZMQ", output=args.dqm, sendOutInterval=30)
144 
145  return path, reco_path
146 
147 
148 def add_hlt_processing(path,
149  run_type=constants.RunTypes.beam,
150  softwaretrigger_mode=constants.SoftwareTriggerModes.filter,
151  prune_input=True,
152  prune_output=True,
153  unpacker_components=None,
154  reco_components=None,
155  create_hlt_unit_histograms=True,
156  **kwargs):
157  """
158  Add all modules for processing on HLT filter machines
159  """
160 
161  # Always avoid the top-level 'import ROOT'.
162  import ROOT # noqa
163 
164  path.add_module('StatisticsSummary').set_name('Sum_Wait')
165 
166  if unpacker_components is None:
167  unpacker_components = constants.DEFAULT_HLT_COMPONENTS
168  if reco_components is None:
169  reco_components = constants.DEFAULT_HLT_COMPONENTS
170 
171  check_components(unpacker_components)
172  check_components(reco_components)
173 
174  # ensure that only DataStore content is present that we expect in
175  # in the HLT configuration. If ROIpayloads or tracks are present in the
176  # input file, this can be a problem and lead to crashes
177  if prune_input:
178  path.add_module("PruneDataStore", matchEntries=constants.HLT_INPUT_OBJECTS)
179 
180  # Add the geometry (if not already present)
181  path_utils.add_geometry_if_not_present(path)
182  path.add_module('StatisticsSummary').set_name('Sum_Initialization')
183 
184  # Unpack the event content
185  add_unpackers(path, components=unpacker_components, writeKLMDigitRaws=True)
186  path.add_module('StatisticsSummary').set_name('Sum_Unpackers')
187 
188  # Build one path for all accepted events...
189  accept_path = basf2.Path()
190 
191  # Do the reconstruction needed for the HLT decision
192  path_utils.add_pre_filter_reconstruction(path, run_type=run_type, components=reco_components, **kwargs)
193 
194  # Perform HLT filter calculation
195  path_utils.add_filter_software_trigger(path, store_array_debug_prescale=1)
196 
197  # Add the part of the dqm modules, which should run after every reconstruction
198  path_utils.add_hlt_dqm(path, run_type=run_type, components=reco_components, dqm_mode=constants.DQMModes.before_filter,
199  create_hlt_unit_histograms=create_hlt_unit_histograms)
200 
201  # Only turn on the filtering (by branching the path) if filtering is turned on
202  if softwaretrigger_mode == constants.SoftwareTriggerModes.filter:
203  # Now split up the path according to the HLT decision
204  hlt_filter_module = path_utils.add_filter_module(path)
205 
206  # There are two possibilities for the output of this module
207  # (1) the event is dismissed -> only store the metadata
208  path_utils.hlt_event_abort(hlt_filter_module, "==0", ROOT.Belle2.EventMetaData.c_HLTDiscard)
209  # (2) the event is accepted -> go on with the hlt reconstruction
210  hlt_filter_module.if_value("==1", accept_path, basf2.AfterConditionPath.CONTINUE)
211  elif softwaretrigger_mode == constants.SoftwareTriggerModes.monitor:
212  # Otherwise just always go with the accept path
213  path.add_path(accept_path)
214  else:
215  basf2.B2FATAL(f"The software trigger mode {softwaretrigger_mode} is not supported.")
216 
217  # For accepted events we continue the reconstruction
218  path_utils.add_post_filter_reconstruction(accept_path, run_type=run_type, components=reco_components)
219 
220  # Only create the ROIs for accepted events
221  add_roi_finder(accept_path)
222  accept_path.add_module('StatisticsSummary').set_name('Sum_ROI_Finder')
223 
224  # Add the HLT DQM modules only in case the event is accepted
225  path_utils.add_hlt_dqm(
226  accept_path,
227  run_type=run_type,
228  components=reco_components,
229  dqm_mode=constants.DQMModes.filtered,
230  create_hlt_unit_histograms=create_hlt_unit_histograms)
231 
232  # Make sure to create ROI payloads for sending them to ONSEN
233  # Do this for all events
234  # Normally, the payload assembler marks the event with the software trigger decision to inform the hardware to
235  # drop the data for the event in case the decision is "no"
236  # However, if we are running in monitoring mode, we ignore the decision
237  pxd_ignores_hlt_decision = (softwaretrigger_mode == constants.SoftwareTriggerModes.monitor)
238  add_roi_payload_assembler(path, ignore_hlt_decision=pxd_ignores_hlt_decision)
239  path.add_module('StatisticsSummary').set_name('Sum_ROI_Payload_Assembler')
240 
241  # Add the part of the dqm modules, which should run on all events, not only on the accepted onces
242  path_utils.add_hlt_dqm(path, run_type=run_type, components=reco_components, dqm_mode=constants.DQMModes.all_events,
243  create_hlt_unit_histograms=create_hlt_unit_histograms)
244 
245  if prune_output:
246  # And in the end remove everything which should not be stored
247  path_utils.add_store_only_rawdata_path(path)
248  path.add_module('StatisticsSummary').set_name('Sum_Close_Event')
249 
250 
251 def add_expressreco_processing(path,
252  run_type=constants.RunTypes.beam,
253  select_only_accepted_events=False,
254  prune_input=True,
255  prune_output=True,
256  unpacker_components=None,
257  reco_components=None,
258  do_reconstruction=True,
259  **kwargs):
260  """
261  Add all modules for processing on the ExpressReco machines
262  """
263  if unpacker_components is None:
264  unpacker_components = constants.DEFAULT_EXPRESSRECO_COMPONENTS
265  if reco_components is None:
266  reco_components = constants.DEFAULT_EXPRESSRECO_COMPONENTS
267 
268  check_components(unpacker_components)
269  check_components(reco_components)
270 
271  # If turned on, only events selected by the HLT will go to ereco.
272  # this is needed as by default also un-selected events will get passed to ereco,
273  # however they are empty.
274  if select_only_accepted_events:
275  skim_module = path.add_module("TriggerSkim", triggerLines=["software_trigger_cut&all&total_result"], resultOnMissing=0)
276  skim_module.if_value("==0", basf2.Path(), basf2.AfterConditionPath.END)
277 
278  # ensure that only DataStore content is present that we expect in
279  # in the ExpressReco configuration. If tracks are present in the
280  # input file, this can be a problem and lead to crashes
281  if prune_input:
282  path.add_module("PruneDataStore", matchEntries=constants.EXPRESSRECO_INPUT_OBJECTS)
283 
284  path_utils.add_geometry_if_not_present(path)
285  add_unpackers(path, components=unpacker_components, writeKLMDigitRaws=True)
286 
287  # dont filter/prune pxd for partly broken events, as we loose diagnostics in DQM
288  basf2.set_module_parameters(path, "PXDPostErrorChecker", CriticalErrorMask=0)
289 
290  if do_reconstruction:
291  if run_type == constants.RunTypes.beam:
292  add_reconstruction(path, components=reco_components, pruneTracks=False,
293  skipGeometryAdding=True, add_trigger_calculation=False, **kwargs)
294  elif run_type == constants.RunTypes.cosmic:
295  add_cosmics_reconstruction(path, components=reco_components, pruneTracks=False,
296  skipGeometryAdding=True, **kwargs)
297  else:
298  basf2.B2FATAL(f"Run Type {run_type} not supported.")
299 
300  basf2.set_module_parameters(path, "SVDTimeGrouping", forceGroupingFromDB=False,
301  isEnabledIn6Samples=True, isEnabledIn3Samples=True)
302 
303  path_utils.add_expressreco_dqm(path, run_type, components=reco_components)
304 
305  if prune_output:
306  path.add_module("PruneDataStore", matchEntries=constants.ALWAYS_SAVE_OBJECTS + constants.RAWDATA_OBJECTS +
307  constants.PROCESSED_OBJECTS)
308 
309 
310 def finalize_path(path, args, location, show_progress_bar=True):
311  """
312  Add the required output modules for expressreco/HLT
313  """
314  save_objects = constants.ALWAYS_SAVE_OBJECTS + constants.RAWDATA_OBJECTS
315  if location == constants.Location.expressreco:
316  save_objects += constants.PROCESSED_OBJECTS
317 
318  if show_progress_bar:
319  path.add_module("Progress")
320 
321  # Limit streaming objects for parallel processing
322  basf2.set_streamobjs(save_objects)
323 
324  if args.no_output:
325  return
326 
327  output_buffer_module_name = ""
328  if location == constants.Location.expressreco:
329  output_buffer_module_name = "Ds2Sample"
330  elif location == constants.Location.hlt:
331  output_buffer_module_name = "Ds2Rbuf"
332  else:
333  basf2.B2FATAL(f"Does not know location {location}")
334 
335  if not args.output_file:
336  path.add_module(output_buffer_module_name, RingBufferName=args.output_buffer_name,
337  saveObjs=save_objects)
338  else:
339  if args.output_file.endswith(".sroot"):
340  path.add_module("SeqRootOutput", saveObjs=save_objects, outputFileName=args.output_file)
341  else:
342  # We are storing everything on purpose!
343  path.add_module("RootOutput", outputFileName=args.output_file)
344 
345 
346 def finalize_zmq_path(path, args, location):
347  """
348  Add the required output modules for expressreco/HLT
349  """
350  save_objects = constants.ALWAYS_SAVE_OBJECTS + constants.RAWDATA_OBJECTS
351  if location == constants.Location.expressreco:
352  save_objects += constants.PROCESSED_OBJECTS
353 
354  # Limit streaming objects for parallel processing
355  basf2.set_streamobjs(save_objects)
356 
357  if location == constants.Location.expressreco:
358  path.add_module("HLTDs2ZMQ", output=args.output, raw=False, outputConfirmation=False)
359  elif location == constants.Location.hlt:
360  path.add_module("HLTDs2ZMQ", output=args.output, raw=True, outputConfirmation=True)
361  else:
362  basf2.B2FATAL(f"Does not know location {location}")