Belle II Software development
processing.py
1
8
9import os
10import argparse
11import multiprocessing
12import tempfile
13
14import basf2
15from softwaretrigger import constants, path_utils
16from geometry import check_components
17from pxd import add_pxd_percentframe
18from rawdata import add_unpackers
19from reconstruction import add_reconstruction, add_cosmics_reconstruction
20from tracking import add_roiFinder, add_roi_payload_assembler
21
22
23def setup_basf2_and_db(event_distribution_mode=constants.EventDistributionModes.ringbuffer):
24 """
25 Setup local database usage for HLT
26 """
27 parser = argparse.ArgumentParser(description='basf2 for online')
28
29 if event_distribution_mode == constants.EventDistributionModes.ringbuffer:
30 parser.add_argument('input_buffer_name', type=str,
31 help='Input Ring Buffer names')
32 parser.add_argument('output_buffer_name', type=str,
33 help='Output Ring Buffer name')
34 parser.add_argument('histo_port', type=int,
35 help='Port of the HistoManager to connect to')
36 parser.add_argument('--input-file', type=str,
37 help="Input sroot file, if set no RingBuffer input will be used",
38 default=None)
39 parser.add_argument('--output-file', type=str,
40 help="Filename for SeqRoot output, if set no RingBuffer output will be used",
41 default=None)
42 parser.add_argument('--histo-output-file', type=str,
43 help="Filename for histogram output",
44 default=None)
45 parser.add_argument('--no-output',
46 help="Don't write any output files",
47 action="store_true", default=False)
48 else:
49 parser.add_argument("--input", required=True, type=str, help="ZMQ Address of the distributor process")
50 parser.add_argument("--output", required=True, type=str, help="ZMQ Address of the collector process")
51 parser.add_argument("--dqm", required=True, type=str, help="ZMQ Address of the histoserver process")
52 if event_distribution_mode == constants.EventDistributionModes.zmqbasf2:
53 parser.add_argument('--input-file', type=str,
54 help="Input (s)root file, ZMQ input address is ignored if this is set",
55 default=None)
56 parser.add_argument('--output-file', type=str,
57 help="Filename for (s)root output, ZMQ input address is ignored if this is set",
58 default=None)
59 parser.add_argument('--histo-output-file', type=str,
60 help="Filename for histogram output, ZMQ histoserver address is ignored if this is set",
61 default=None)
62
63 parser.add_argument('--number-processes', type=int, default=multiprocessing.cpu_count() - 5,
64 help='Number of parallel processes to use')
65 parser.add_argument('--local-db-path', type=str,
66 help="set path to the local payload locations to use for the ConditionDB",
67 default=constants.DEFAULT_DB_FILE_LOCATION)
68 parser.add_argument('--local-db-tag', type=str, nargs="*",
69 help="Use the local db with a specific tag (can be applied multiple times, order is relevant)")
70 parser.add_argument('--central-db-tag', type=str, nargs="*",
71 help="Use the central db with a specific tag (can be applied multiple times, order is relevant)")
72 parser.add_argument('--udp-hostname', type=str,
73 help="set hostname for UDP logging connection", default=None)
74 parser.add_argument('--udp-port', type=int,
75 help="set port number for UDP logging connection", default=None)
76
77 args = parser.parse_args()
78
79 # Local DB specification
80 basf2.conditions.override_globaltags()
81 if args.central_db_tag:
82 for central_tag in args.central_db_tag:
83 basf2.conditions.prepend_globaltag(central_tag)
84 else:
85 if args.local_db_tag:
86 for local_tag in args.local_db_tag:
87 basf2.conditions.prepend_globaltag(local_tag)
88 else:
89 basf2.conditions.globaltags = ["online"]
90 basf2.conditions.metadata_providers = ["file://" + basf2.find_file(args.local_db_path + "/metadata.sqlite")]
91 basf2.conditions.payload_locations = [basf2.find_file(args.local_db_path)]
92
93 # Number of processes
94 basf2.set_nprocesses(args.number_processes)
95
96 # basf2 logging setup
97 basf2.set_log_level(basf2.LogLevel.ERROR)
98 # And because reasons we want every log message to be only one line,
99 # otherwise the LogFilter in daq_slc throws away the other lines
100 basf2.logging.enable_escape_newlines = True
101
102 # UDP logging
103 if (args.udp_hostname is not None) and (args.udp_port is not None):
104 basf2.logging.add_udp(args.udp_hostname, args.udp_port)
105
106 # Online realm
107 basf2.set_realm("online")
108
109 return args
110
111
112def start_path(args, location):
113 """
114 Create and return a path used for HLT and ExpressReco running
115 """
116 path = basf2.create_path()
117
118 input_buffer_module_name = ""
119 if location == constants.Location.expressreco:
120 input_buffer_module_name = "Rbuf2Ds"
121 elif location == constants.Location.hlt:
122 input_buffer_module_name = "Raw2Ds"
123 else:
124 basf2.B2FATAL(f"Does not know location {location}")
125
126 # Input
127 if not args.input_file:
128 path.add_module(input_buffer_module_name, RingBufferName=args.input_buffer_name)
129 else:
130 if args.input_file.endswith(".sroot"):
131 path.add_module('SeqRootInput', inputFileName=args.input_file)
132 else:
133 path.add_module('RootInput', inputFileName=args.input_file)
134
135 # Histogram Handling
136 if not args.histo_output_file:
137 path.add_module('DqmHistoManager', Port=args.histo_port, DumpInterval=1000, workDirName=tempfile.gettempdir()+"/")
138 else:
139 workdir = os.path.dirname(args.histo_output_file)
140 filename = os.path.basename(args.histo_output_file)
141 path.add_module('HistoManager', histoFileName=filename, workDirName=workdir)
142
143 return path
144
145
146def start_zmq_path(args, location, event_distribution_mode):
147 path = basf2.Path()
148 reco_path = basf2.Path()
149
150 if not args.input_file:
151 if location == constants.Location.expressreco:
152 input_module = path.add_module("HLTZMQ2Ds", input=args.input, addExpressRecoObjects=True)
153 elif location == constants.Location.hlt:
154 input_module = path.add_module("HLTZMQ2Ds", input=args.input)
155 else:
156 basf2.B2FATAL(f"Does not know location {location}")
157 else:
158 if args.input_file.endswith(".sroot"):
159 path.add_module('SeqRootInput', inputFileName=args.input_file)
160 else:
161 path.add_module('RootInput', inputFileName=args.input_file)
162
163 # zmq needs to discard the first event from HLTZMQ2Ds level
164 if not args.histo_output_file:
165 if event_distribution_mode == constants.EventDistributionModes.zmq:
166 input_module.if_value("==0", reco_path, basf2.AfterConditionPath.CONTINUE)
167 reco_path.add_module("HLTDQM2ZMQ", output=args.dqm, sendOutInterval=30)
168 # zmqbasf2 needs to pass the first event to the remaining path
169 elif event_distribution_mode == constants.EventDistributionModes.zmqbasf2:
170 path.add_module("HLTDQM2ZMQ", output=args.dqm, sendOutInterval=30)
171 else:
172 basf2.B2FATAL(f"Does not know event_distribution_mode {event_distribution_mode}")
173 else:
174 workdir = os.path.dirname(args.histo_output_file)
175 filename = os.path.basename(args.histo_output_file)
176 path.add_module('HistoManager', histoFileName=filename, workDirName=workdir)
177
178 return path, reco_path
179
180
181def add_hlt_processing(path,
182 run_type=constants.RunTypes.beam,
183 softwaretrigger_mode=constants.SoftwareTriggerModes.filter,
184 prune_input=True,
185 prune_output=True,
186 unpacker_components=None,
187 reco_components=None,
188 create_hlt_unit_histograms=True,
189 switch_off_slow_modules_for_online=True,
190 hlt_prefilter_mode=constants.HLTPrefilterModes.monitor,
191 dqm_run_type=None,
192 **kwargs):
193 """
194 Add all modules for processing on HLT filter machines
195 """
196
197 # Check if the run is cosmic and set the Environment accordingly
198 if run_type == constants.RunTypes.cosmic:
199 basf2.declare_cosmics()
200
201 # Check if the run is beam and set the Environment accordingly
202 if run_type == constants.RunTypes.beam:
203 basf2.declare_beam()
204
205 if dqm_run_type is None:
206 dqm_run_type = run_type
207
208 # Always avoid the top-level 'import ROOT'.
209 import ROOT # noqa
210
211 path.add_module('StatisticsSummary').set_name('Sum_Wait')
212
213 if unpacker_components is None:
214 unpacker_components = constants.DEFAULT_HLT_COMPONENTS
215 if reco_components is None:
216 reco_components = constants.DEFAULT_HLT_COMPONENTS
217
218 check_components(unpacker_components)
219 check_components(reco_components)
220
221 # ensure that only DataStore content is present that we expect in
222 # in the HLT configuration. If ROIpayloads or tracks are present in the
223 # input file, this can be a problem and lead to crashes
224 if prune_input:
225 path.add_module("PruneDataStore", matchEntries=constants.HLT_INPUT_OBJECTS)
226
227 # Add the geometry (if not already present)
228 path_utils.add_geometry_if_not_present(path)
229 path.add_module('StatisticsSummary').set_name('Sum_Initialization')
230
231 # Unpack the event content
232 add_unpackers(path, components=unpacker_components, writeKLMDigitRaws=True)
233 path.add_module('StatisticsSummary').set_name('Sum_Unpackers')
234
235 # HLT prefilter
236 path_utils.add_prefilter_module(path, mode=hlt_prefilter_mode)
237
238 # Build one path for all accepted events...
239 accept_path = basf2.Path()
240
241 # Do the reconstruction needed for the HLT decision
242 path_utils.add_pre_filter_reconstruction(
243 path,
244 run_type=run_type,
245 components=reco_components,
246 switch_off_slow_modules_for_online=switch_off_slow_modules_for_online,
247 **kwargs
248 )
249
250 # Perform HLT filter calculation
251 path_utils.add_filter_software_trigger(path, store_array_debug_prescale=1)
252
253 # Add the part of the dqm modules, which should run after every reconstruction
254 path_utils.add_hlt_dqm(path, run_type=dqm_run_type, components=reco_components, dqm_mode=constants.DQMModes.before_filter,
255 create_hlt_unit_histograms=create_hlt_unit_histograms)
256
257 # Only turn on the filtering (by branching the path) if filtering is turned on
258 if softwaretrigger_mode == constants.SoftwareTriggerModes.filter:
259 # Now split up the path according to the HLT decision
260 hlt_filter_module = path_utils.add_filter_module(path)
261
262 # There are two possibilities for the output of this module
263 # (1) the event is dismissed -> only store the metadata
264 path_utils.hlt_event_abort(hlt_filter_module, "==0", ROOT.Belle2.EventMetaData.c_HLTDiscard)
265 # (2) the event is accepted -> go on with the hlt reconstruction
266 hlt_filter_module.if_value("==1", accept_path, basf2.AfterConditionPath.CONTINUE)
267 elif softwaretrigger_mode == constants.SoftwareTriggerModes.monitor:
268 # Otherwise just always go with the accept path
269 path.add_path(accept_path)
270 else:
271 basf2.B2FATAL(f"The software trigger mode {softwaretrigger_mode} is not supported.")
272
273 # For accepted events we continue the reconstruction
274 path_utils.add_post_filter_reconstruction(
275 accept_path,
276 run_type=run_type,
277 components=reco_components,
278 switch_off_slow_modules_for_online=switch_off_slow_modules_for_online
279 )
280
281 # Only create the ROIs for accepted events
282 add_roiFinder(accept_path)
283 accept_path.add_module('StatisticsSummary').set_name('Sum_ROI_Finder')
284
285 # Add the HLT DQM modules only in case the event is accepted
286 path_utils.add_hlt_dqm(
287 accept_path,
288 run_type=dqm_run_type,
289 components=reco_components,
290 dqm_mode=constants.DQMModes.filtered,
291 create_hlt_unit_histograms=create_hlt_unit_histograms)
292
293 # Make sure to create ROI payloads for sending them to ONSEN
294 # Do this for all events
295 # Normally, the payload assembler marks the event with the software trigger decision to inform the hardware to
296 # drop the data for the event in case the decision is "no"
297 # However, if we are running in monitoring mode, we ignore the decision
298 pxd_ignores_hlt_decision = (softwaretrigger_mode == constants.SoftwareTriggerModes.monitor)
299 add_roi_payload_assembler(path, ignore_hlt_decision=pxd_ignores_hlt_decision)
300 path.add_module('StatisticsSummary').set_name('Sum_ROI_Payload_Assembler')
301
302 # Add the part of the dqm modules, which should run on all events, not only on the accepted ones
303 path_utils.add_hlt_dqm(path, run_type=dqm_run_type, components=reco_components, dqm_mode=constants.DQMModes.all_events,
304 create_hlt_unit_histograms=create_hlt_unit_histograms)
305
306 if prune_output:
307 # And in the end remove everything which should not be stored
308 path_utils.add_store_only_rawdata_path(path)
309 path.add_module('StatisticsSummary').set_name('Sum_Close_Event')
310
311
312def add_hlt_passthrough(path):
313 """
314 Add all modules for operating HLT machines in passthrough mode.
315 """
316 add_pxd_percentframe(path, fraction=0.1, random_position=True)
317 add_roi_payload_assembler(path, ignore_hlt_decision=True)
318
319
320def add_expressreco_processing(path,
321 run_type=constants.RunTypes.beam,
322 select_only_accepted_events=False,
323 prune_input=True,
324 prune_output=True,
325 unpacker_components=None,
326 reco_components=None,
327 do_reconstruction=True,
328 switch_off_slow_modules_for_online=True,
329 dqm_run_type=None,
330 **kwargs):
331 """
332 Add all modules for processing on the ExpressReco machines
333 """
334
335 # Check if the run is cosmic and set the Environment accordingly
336 if run_type == constants.RunTypes.cosmic:
337 basf2.declare_cosmics()
338
339 # Check if the run is beam and set the Environment accordingly
340 if run_type == constants.RunTypes.beam:
341 basf2.declare_beam()
342
343 if dqm_run_type is None:
344 dqm_run_type = run_type
345
346 if unpacker_components is None:
347 unpacker_components = constants.DEFAULT_EXPRESSRECO_COMPONENTS
348 if reco_components is None:
349 reco_components = constants.DEFAULT_EXPRESSRECO_COMPONENTS
350
351 check_components(unpacker_components)
352 check_components(reco_components)
353
354 # If turned on, only events selected by the HLT will go to ereco.
355 # this is needed as by default also un-selected events will get passed to ereco,
356 # however they are empty.
357 if select_only_accepted_events:
358 skim_module = path.add_module("TriggerSkim", triggerLines=["software_trigger_cut&all&total_result"], resultOnMissing=0)
359 skim_module.if_value("==0", basf2.Path(), basf2.AfterConditionPath.END)
360
361 # ensure that only DataStore content is present that we expect in
362 # in the ExpressReco configuration. If tracks are present in the
363 # input file, this can be a problem and lead to crashes
364 if prune_input:
365 path.add_module("PruneDataStore", matchEntries=constants.EXPRESSRECO_INPUT_OBJECTS)
366
367 path_utils.add_geometry_if_not_present(path)
368 add_unpackers(path, components=unpacker_components, writeKLMDigitRaws=True)
369
370 # dont filter/prune pxd for partly broken events, as we loose diagnostics in DQM
371 basf2.set_module_parameters(path, "PXDPostErrorChecker", CriticalErrorMask=0)
372
373 if do_reconstruction:
374 if run_type == constants.RunTypes.beam:
375 add_reconstruction(path,
376 components=reco_components,
377 pruneTracks=False,
378 skipGeometryAdding=True,
379 add_trigger_calculation=False,
380 switch_off_slow_modules_for_online=switch_off_slow_modules_for_online,
381 **kwargs)
382 elif run_type == constants.RunTypes.cosmic:
383 add_cosmics_reconstruction(path, components=reco_components, pruneTracks=False,
384 skipGeometryAdding=True, **kwargs)
385 else:
386 basf2.B2FATAL(f"Run Type {run_type} not supported.")
387
388 basf2.set_module_parameters(path, "SVDTimeGrouping", forceGroupingFromDB=False,
389 isEnabledIn6Samples=True, isEnabledIn3Samples=True)
390
391 path_utils.add_expressreco_dqm(path, dqm_run_type, components=reco_components)
392
393 if prune_output:
394 path.add_module("PruneDataStore", matchEntries=constants.ALWAYS_SAVE_OBJECTS + constants.RAWDATA_OBJECTS +
395 constants.PROCESSED_OBJECTS)
396
397
398def finalize_path(path, args, location, show_progress_bar=True):
399 """
400 Add the required output modules for expressreco/HLT
401 """
402 save_objects = constants.ALWAYS_SAVE_OBJECTS + constants.RAWDATA_OBJECTS
403 if location == constants.Location.expressreco:
404 save_objects += constants.PROCESSED_OBJECTS
405
406 if show_progress_bar:
407 path.add_module("Progress")
408
409 # Limit streaming objects for parallel processing
410 basf2.set_streamobjs(save_objects)
411
412 if args.no_output:
413 return
414
415 output_buffer_module_name = ""
416 if location == constants.Location.expressreco:
417 output_buffer_module_name = "Ds2Sample"
418 elif location == constants.Location.hlt:
419 output_buffer_module_name = "Ds2Rbuf"
420 else:
421 basf2.B2FATAL(f"Does not know location {location}")
422
423 if not args.output_file:
424 path.add_module(output_buffer_module_name, RingBufferName=args.output_buffer_name,
425 saveObjs=save_objects)
426 else:
427 if args.output_file.endswith(".sroot"):
428 path.add_module("SeqRootOutput", saveObjs=save_objects, outputFileName=args.output_file)
429 else:
430 # We are storing everything on purpose!
431 path.add_module("RootOutput", outputFileName=args.output_file)
432
433
434def finalize_zmq_path(path, args, location):
435 """
436 Add the required output modules for expressreco/HLT
437 """
438 save_objects = constants.ALWAYS_SAVE_OBJECTS + constants.RAWDATA_OBJECTS
439 if location == constants.Location.expressreco:
440 save_objects += constants.PROCESSED_OBJECTS
441
442 # Limit streaming objects for parallel processing
443 basf2.set_streamobjs(save_objects)
444
445 if not args.output_file:
446 if location == constants.Location.expressreco:
447 path.add_module("HLTDs2ZMQ", output=args.output, raw=False, outputConfirmation=False)
448 elif location == constants.Location.hlt:
449 path.add_module("HLTDs2ZMQ", output=args.output, raw=True, outputConfirmation=True)
450 else:
451 basf2.B2FATAL(f"Does not know location {location}")
452 else:
453 if args.output_file.endswith(".sroot"):
454 path.add_module("SeqRootOutput", saveObjs=save_objects, outputFileName=args.output_file)
455 else:
456 # We are storing everything on purpose!
457 path.add_module("RootOutput", outputFileName=args.output_file)