Belle II Software  release-05-02-19
processing.py
1 import argparse
2 import multiprocessing
3 import sys
4 
5 import basf2
6 import ROOT
7 
8 from softwaretrigger import constants
9 from pxd import add_roi_payload_assembler, add_roi_finder
10 
11 from reconstruction import add_reconstruction, add_cosmics_reconstruction
12 from softwaretrigger import path_utils
13 import reconstruction
14 from geometry import check_components
15 from rawdata import add_unpackers
16 
17 
18 def setup_basf2_and_db(zmq=False):
19  """
20  Setup local database usage for HLT
21  """
22  parser = argparse.ArgumentParser(description='basf2 for online')
23 
24  if zmq:
25  parser.add_argument("--input", required=True, type=str, help="ZMQ Address of the distributor process")
26  parser.add_argument("--output", required=True, type=str, help="ZMQ Address of the collector process")
27  parser.add_argument("--dqm", required=True, type=str, help="ZMQ Address of the histoserver process")
28  else:
29  parser.add_argument('input_buffer_name', type=str,
30  help='Input Ring Buffer names')
31  parser.add_argument('output_buffer_name', type=str,
32  help='Output Ring Buffer name')
33  parser.add_argument('histo_port', type=int,
34  help='Port of the HistoManager to connect to')
35  parser.add_argument('--input-file', type=str,
36  help="Input sroot file, if set no RingBuffer input will be used",
37  default=None)
38  parser.add_argument('--output-file', type=str,
39  help="Filename for SeqRoot output, if set no RingBuffer output will be used",
40  default=None)
41  parser.add_argument('--histo-output-file', type=str,
42  help="Filename for histogram output",
43  default=None)
44  parser.add_argument('--no-output',
45  help="Don't write any output files",
46  action="store_true", default=False)
47 
48  parser.add_argument('--number-processes', type=int, default=multiprocessing.cpu_count() - 5,
49  help='Number of parallel processes to use')
50  parser.add_argument('--local-db-path', type=str,
51  help="set path to the local payload locations to use for the ConditionDB",
52  default=constants.DEFAULT_DB_FILE_LOCATION)
53  parser.add_argument('--central-db-tag', type=str, nargs="*",
54  help="Use the central db with a specific tag (can be applied multiple times, order is relevant)")
55 
56  args = parser.parse_args()
57 
58  # Local DB specification
59  basf2.reset_database()
60  basf2.conditions.override_globaltags()
61  if args.central_db_tag:
62  for central_tag in args.central_db_tag:
63  basf2.conditions.prepend_globaltag(central_tag)
64  else:
65  basf2.conditions.globaltags = ["online"]
66  basf2.conditions.metadata_providers = ["file://" + basf2.find_file(args.local_db_path + "/metadata.sqlite")]
67  basf2.conditions.payload_locations = [basf2.find_file(args.local_db_path)]
68 
69  # Number of processes
70  basf2.set_nprocesses(args.number_processes)
71 
72  # Logging
73  basf2.set_log_level(basf2.LogLevel.ERROR)
74  # And because reasons we want every log message to be only one line,
75  # otherwise the LogFilter in daq_slc throws away the other lines
76  basf2.logging.enable_escape_newlines = True
77 
78  return args
79 
80 
81 def start_path(args, location):
82  """
83  Create and return a path used for HLT and ExpressReco running
84  """
85  path = basf2.create_path()
86 
87  input_buffer_module_name = ""
88  if location == constants.Location.expressreco:
89  input_buffer_module_name = "Rbuf2Ds"
90  elif location == constants.Location.hlt:
91  input_buffer_module_name = "Raw2Ds"
92  else:
93  basf2.B2FATAL(f"Does not know location {location}")
94 
95  # Input
96  if not args.input_file:
97  path.add_module(input_buffer_module_name, RingBufferName=args.input_buffer_name)
98  else:
99  if args.input_file.endswith(".sroot"):
100  path.add_module('SeqRootInput', inputFileName=args.input_file)
101  else:
102  path.add_module('RootInput', inputFileName=args.input_file)
103 
104  # Histogram Handling
105  if not args.histo_output_file:
106  path.add_module('DqmHistoManager', Port=args.histo_port, DumpInterval=1000, workDirName="/tmp/")
107  else:
108  path.add_module('HistoManager', histoFileName=args.histo_output_file)
109 
110  return path
111 
112 
113 def start_zmq_path(args, location):
114  path = basf2.Path()
115  reco_path = basf2.Path()
116 
117  if location == constants.Location.expressreco:
118  input_module = path.add_module("HLTZMQ2Ds", input=args.input, addExpressRecoObjects=True)
119  elif location == constants.Location.hlt:
120  input_module = path.add_module("HLTZMQ2Ds", input=args.input)
121  else:
122  basf2.B2FATAL(f"Does not know location {location}")
123 
124  input_module.if_value("==0", reco_path, basf2.AfterConditionPath.CONTINUE)
125  reco_path.add_module("HLTDQM2ZMQ", output=args.dqm, sendOutInterval=30)
126 
127  return path, reco_path
128 
129 
130 def add_hlt_processing(path,
131  run_type=constants.RunTypes.beam,
132  softwaretrigger_mode=constants.SoftwareTriggerModes.filter,
133  prune_input=True,
134  prune_output=True,
135  unpacker_components=None,
136  reco_components=None,
137  create_hlt_unit_histograms=True,
138  **kwargs):
139  """
140  Add all modules for processing on HLT filter machines
141  """
142  path.add_module('StatisticsSummary').set_name('Sum_Wait')
143 
144  if unpacker_components is None:
145  unpacker_components = constants.DEFAULT_HLT_COMPONENTS
146  if reco_components is None:
147  reco_components = constants.DEFAULT_HLT_COMPONENTS
148 
149  check_components(unpacker_components)
150  check_components(reco_components)
151 
152  # ensure that only DataStore content is present that we expect in
153  # in the HLT configuration. If ROIpayloads or tracks are present in the
154  # input file, this can be a problem and lead to crashes
155  if prune_input:
156  path.add_module("PruneDataStore", matchEntries=constants.HLT_INPUT_OBJECTS)
157 
158  # Add the geometry (if not already present)
159  path_utils.add_geometry_if_not_present(path)
160  path.add_module('StatisticsSummary').set_name('Sum_Initialization')
161 
162  # Unpack the event content
163  add_unpackers(path, components=unpacker_components, writeKLMDigitRaws=True)
164  path.add_module('StatisticsSummary').set_name('Sum_Unpackers')
165 
166  # Build one path for all accepted events...
167  accept_path = basf2.Path()
168 
169  # Do the reconstruction needed for the HLT decision
170  path_utils.add_filter_reconstruction(path, run_type=run_type, components=reco_components, **kwargs)
171 
172  # Add the part of the dqm modules, which should run after every reconstruction
173  path_utils.add_hlt_dqm(path, run_type=run_type, components=reco_components, dqm_mode=constants.DQMModes.before_filter,
174  create_hlt_unit_histograms=create_hlt_unit_histograms)
175 
176  # Only turn on the filtering (by branching the path) if filtering is turned on
177  if softwaretrigger_mode == constants.SoftwareTriggerModes.filter:
178  # Now split up the path according to the HLT decision
179  hlt_filter_module = path_utils.add_filter_module(path)
180 
181  # There are two possibilities for the output of this module
182  # (1) the event is dismissed -> only store the metadata
183  path_utils.hlt_event_abort(hlt_filter_module, "==0", ROOT.Belle2.EventMetaData.c_HLTDiscard)
184  # (2) the event is accepted -> go on with the hlt reconstruction
185  hlt_filter_module.if_value("==1", accept_path, basf2.AfterConditionPath.CONTINUE)
186  elif softwaretrigger_mode == constants.SoftwareTriggerModes.monitor:
187  # Otherwise just always go with the accept path
188  path.add_path(accept_path)
189  else:
190  basf2.B2FATAL(f"The software trigger mode {softwaretrigger_mode} is not supported.")
191 
192  # For accepted events we continue the reconstruction
193  path_utils.add_post_filter_reconstruction(accept_path, run_type=run_type, components=reco_components)
194 
195  # Only create the ROIs for accepted events
196  add_roi_finder(accept_path)
197  accept_path.add_module('StatisticsSummary').set_name('Sum_ROI_Finder')
198 
199  # Add the HLT DQM modules only in case the event is accepted
200  path_utils.add_hlt_dqm(
201  accept_path,
202  run_type=run_type,
203  components=reco_components,
204  dqm_mode=constants.DQMModes.filtered,
205  create_hlt_unit_histograms=create_hlt_unit_histograms)
206 
207  # Make sure to create ROI payloads for sending them to ONSEN
208  # Do this for all events
209  # Normally, the payload assembler marks the event with the software trigger decision to inform the hardware to
210  # drop the data for the event in case the decision is "no"
211  # However, if we are running in monitoring mode, we ignore the decision
212  pxd_ignores_hlt_decision = (softwaretrigger_mode == constants.SoftwareTriggerModes.monitor)
213  add_roi_payload_assembler(path, ignore_hlt_decision=pxd_ignores_hlt_decision)
214  path.add_module('StatisticsSummary').set_name('Sum_ROI_Payload_Assembler')
215 
216  # Add the part of the dqm modules, which should run on all events, not only on the accepted onces
217  path_utils.add_hlt_dqm(path, run_type=run_type, components=reco_components, dqm_mode=constants.DQMModes.all_events,
218  create_hlt_unit_histograms=create_hlt_unit_histograms)
219 
220  if prune_output:
221  # And in the end remove everything which should not be stored
222  path_utils.add_store_only_rawdata_path(path)
223  path.add_module('StatisticsSummary').set_name('Sum_Close_Event')
224 
225 
226 def add_expressreco_processing(path,
227  run_type=constants.RunTypes.beam,
228  select_only_accepted_events=False,
229  prune_input=True,
230  prune_output=True,
231  unpacker_components=None,
232  reco_components=None,
233  do_reconstruction=True,
234  **kwargs):
235  """
236  Add all modules for processing on the ExpressReco machines
237  """
238  if unpacker_components is None:
239  unpacker_components = constants.DEFAULT_EXPRESSRECO_COMPONENTS
240  if reco_components is None:
241  reco_components = constants.DEFAULT_EXPRESSRECO_COMPONENTS
242 
243  check_components(unpacker_components)
244  check_components(reco_components)
245 
246  # If turned on, only events selected by the HLT will go to ereco.
247  # this is needed as by default also un-selected events will get passed to ereco,
248  # however they are empty.
249  if select_only_accepted_events:
250  skim_module = path.add_module("TriggerSkim", triggerLines=["software_trigger_cut&all&total_result"], resultOnMissing=0)
251  skim_module.if_value("==0", basf2.Path(), basf2.AfterConditionPath.END)
252 
253  # ensure that only DataStore content is present that we expect in
254  # in the ExpressReco configuration. If tracks are present in the
255  # input file, this can be a problem and lead to crashes
256  if prune_input:
257  path.add_module("PruneDataStore", matchEntries=constants.EXPRESSRECO_INPUT_OBJECTS)
258 
259  path_utils.add_geometry_if_not_present(path)
260  add_unpackers(path, components=unpacker_components, writeKLMDigitRaws=True)
261 
262  # dont filter/prune pxd for partly broken events, as we loose diagnostics in DQM
263  basf2.set_module_parameters(path, "PXDPostErrorChecker", CriticalErrorMask=0)
264 
265  if do_reconstruction:
266  if run_type == constants.RunTypes.beam:
267  add_reconstruction(path, components=reco_components, pruneTracks=False,
268  skipGeometryAdding=True, add_trigger_calculation=False, **kwargs)
269  elif run_type == constants.RunTypes.cosmic:
270  add_cosmics_reconstruction(path, components=reco_components, pruneTracks=False,
271  skipGeometryAdding=True, **kwargs)
272  else:
273  basf2.B2FATAL("Run Type {} not supported.".format(run_type))
274 
275  path_utils.add_expressreco_dqm(path, run_type, components=reco_components)
276 
277  # Will be removed later if not going to be used:
278  # Build one path for all events coming from L1 passthrough...
279  # l1_passthrough_path = basf2.Path()
280 
281  # Find if the event is triggered in L1_trigger filter line, if yes, send through l1_passthrough_path
282  # l1_passthrough_module = path.add_module(
283  # "TriggerSkim",
284  # triggerLines=["software_trigger_cut&filter&L1_trigger"],
285  # resultOnMissing=0)
286  # l1_passthrough_module.if_value("==1", l1_passthrough_path, basf2.AfterConditionPath.CONTINUE)
287 
288  # path_utils.add_expressreco_dqm(
289  # l1_passthrough_path,
290  # run_type,
291  # components=reco_components,
292  # dqm_mode=constants.DQMModes.l1_passthrough.name)
293 
294  if prune_output:
295  path.add_module("PruneDataStore", matchEntries=constants.ALWAYS_SAVE_OBJECTS + constants.RAWDATA_OBJECTS +
296  constants.PROCESSED_OBJECTS)
297 
298 
299 def finalize_path(path, args, location, show_progress_bar=True):
300  """
301  Add the required output modules for expressreco/HLT
302  """
303  save_objects = constants.ALWAYS_SAVE_OBJECTS + constants.RAWDATA_OBJECTS
304  if location == constants.Location.expressreco:
305  save_objects += constants.PROCESSED_OBJECTS
306 
307  if show_progress_bar:
308  path.add_module("Progress")
309 
310  # Limit streaming objects for parallel processing
311  basf2.set_streamobjs(save_objects)
312 
313  if args.no_output:
314  return
315 
316  output_buffer_module_name = ""
317  if location == constants.Location.expressreco:
318  output_buffer_module_name = "Ds2Sample"
319  elif location == constants.Location.hlt:
320  output_buffer_module_name = "Ds2Rbuf"
321  else:
322  basf2.B2FATAL(f"Does not know location {location}")
323 
324  if not args.output_file:
325  path.add_module(output_buffer_module_name, RingBufferName=args.output_buffer_name,
326  saveObjs=save_objects)
327  else:
328  if args.output_file.endswith(".sroot"):
329  path.add_module("SeqRootOutput", saveObjs=save_objects, outputFileName=args.output_file)
330  else:
331  # We are storing everything on purpose!
332  path.add_module("RootOutput", outputFileName=args.output_file)
333 
334 
335 def finalize_zmq_path(path, args, location):
336  """
337  Add the required output modules for expressreco/HLT
338  """
339  save_objects = constants.ALWAYS_SAVE_OBJECTS + constants.RAWDATA_OBJECTS
340  if location == constants.Location.expressreco:
341  save_objects += constants.PROCESSED_OBJECTS
342 
343  # Limit streaming objects for parallel processing
344  basf2.set_streamobjs(save_objects)
345 
346  if location == constants.Location.expressreco:
347  path.add_module("HLTDs2ZMQ", output=args.output, raw=False)
348  elif location == constants.Location.hlt:
349  path.add_module("HLTDs2ZMQ", output=args.output, raw=True)
350  else:
351  basf2.B2FATAL(f"Does not know location {location}")