Belle II Software development
processing.py
1
8
9import os
10import argparse
11import multiprocessing
12import tempfile
13
14import basf2
15from softwaretrigger import constants, path_utils
16from geometry import check_components
17from pxd import add_pxd_percentframe
18from rawdata import add_unpackers
19from reconstruction import add_reconstruction, add_cosmics_reconstruction
20from tracking import add_roiFinder, add_roi_payload_assembler
21
22
23def setup_basf2_and_db(event_distribution_mode=constants.EventDistributionModes.ringbuffer):
24 """
25 Setup local database usage for HLT
26 """
27 parser = argparse.ArgumentParser(description='basf2 for online')
28
29 if event_distribution_mode == constants.EventDistributionModes.ringbuffer:
30 parser.add_argument('input_buffer_name', type=str,
31 help='Input Ring Buffer names')
32 parser.add_argument('output_buffer_name', type=str,
33 help='Output Ring Buffer name')
34 parser.add_argument('histo_port', type=int,
35 help='Port of the HistoManager to connect to')
36 parser.add_argument('--input-file', type=str,
37 help="Input sroot file, if set no RingBuffer input will be used",
38 default=None)
39 parser.add_argument('--output-file', type=str,
40 help="Filename for SeqRoot output, if set no RingBuffer output will be used",
41 default=None)
42 parser.add_argument('--histo-output-file', type=str,
43 help="Filename for histogram output",
44 default=None)
45 parser.add_argument('--no-output',
46 help="Don't write any output files",
47 action="store_true", default=False)
48 else:
49 parser.add_argument("--input", required=True, type=str, help="ZMQ Address of the distributor process")
50 parser.add_argument("--output", required=True, type=str, help="ZMQ Address of the collector process")
51 parser.add_argument("--dqm", required=True, type=str, help="ZMQ Address of the histoserver process")
52
53 parser.add_argument('--number-processes', type=int, default=multiprocessing.cpu_count() - 5,
54 help='Number of parallel processes to use')
55 parser.add_argument('--local-db-path', type=str,
56 help="set path to the local payload locations to use for the ConditionDB",
57 default=constants.DEFAULT_DB_FILE_LOCATION)
58 parser.add_argument('--local-db-tag', type=str, nargs="*",
59 help="Use the local db with a specific tag (can be applied multiple times, order is relevant)")
60 parser.add_argument('--central-db-tag', type=str, nargs="*",
61 help="Use the central db with a specific tag (can be applied multiple times, order is relevant)")
62 parser.add_argument('--udp-hostname', type=str,
63 help="set hostname for UDP logging connection", default=None)
64 parser.add_argument('--udp-port', type=int,
65 help="set port number for UDP logging connection", default=None)
66
67 args = parser.parse_args()
68
69 # Local DB specification
70 basf2.conditions.override_globaltags()
71 if args.central_db_tag:
72 for central_tag in args.central_db_tag:
73 basf2.conditions.prepend_globaltag(central_tag)
74 else:
75 if args.local_db_tag:
76 for local_tag in args.local_db_tag:
77 basf2.conditions.prepend_globaltag(local_tag)
78 else:
79 basf2.conditions.globaltags = ["online"]
80 basf2.conditions.metadata_providers = ["file://" + basf2.find_file(args.local_db_path + "/metadata.sqlite")]
81 basf2.conditions.payload_locations = [basf2.find_file(args.local_db_path)]
82
83 # Number of processes
84 basf2.set_nprocesses(args.number_processes)
85
86 # basf2 logging setup
87 basf2.set_log_level(basf2.LogLevel.ERROR)
88 # And because reasons we want every log message to be only one line,
89 # otherwise the LogFilter in daq_slc throws away the other lines
90 basf2.logging.enable_escape_newlines = True
91
92 # UDP logging
93 if (args.udp_hostname is not None) and (args.udp_port is not None):
94 basf2.logging.add_udp(args.udp_hostname, args.udp_port)
95
96 # Online realm
97 basf2.set_realm("online")
98
99 return args
100
101
102def start_path(args, location):
103 """
104 Create and return a path used for HLT and ExpressReco running
105 """
106 path = basf2.create_path()
107
108 input_buffer_module_name = ""
109 if location == constants.Location.expressreco:
110 input_buffer_module_name = "Rbuf2Ds"
111 elif location == constants.Location.hlt:
112 input_buffer_module_name = "Raw2Ds"
113 else:
114 basf2.B2FATAL(f"Does not know location {location}")
115
116 # Input
117 if not args.input_file:
118 path.add_module(input_buffer_module_name, RingBufferName=args.input_buffer_name)
119 else:
120 if args.input_file.endswith(".sroot"):
121 path.add_module('SeqRootInput', inputFileName=args.input_file)
122 else:
123 path.add_module('RootInput', inputFileName=args.input_file)
124
125 # Histogram Handling
126 if not args.histo_output_file:
127 path.add_module('DqmHistoManager', Port=args.histo_port, DumpInterval=1000, workDirName=tempfile.gettempdir()+"/")
128 else:
129 workdir = os.path.dirname(args.histo_output_file)
130 filename = os.path.basename(args.histo_output_file)
131 path.add_module('HistoManager', histoFileName=filename, workDirName=workdir)
132
133 return path
134
135
136def start_zmq_path(args, location, event_distribution_mode):
137 path = basf2.Path()
138 reco_path = basf2.Path()
139
140 if location == constants.Location.expressreco:
141 input_module = path.add_module("HLTZMQ2Ds", input=args.input, addExpressRecoObjects=True)
142 elif location == constants.Location.hlt:
143 input_module = path.add_module("HLTZMQ2Ds", input=args.input)
144 else:
145 basf2.B2FATAL(f"Does not know location {location}")
146
147 # zmq needs to discard the first event from HLTZMQ2Ds level
148 if event_distribution_mode == constants.EventDistributionModes.zmq:
149 input_module.if_value("==0", reco_path, basf2.AfterConditionPath.CONTINUE)
150 reco_path.add_module("HLTDQM2ZMQ", output=args.dqm, sendOutInterval=30)
151 # zmqbasf2 needs to pass the first event to the remaining path
152 elif event_distribution_mode == constants.EventDistributionModes.zmqbasf2:
153 path.add_module("HLTDQM2ZMQ", output=args.dqm, sendOutInterval=30)
154 else:
155 basf2.B2FATAL(f"Does not know event_distribution_mode {event_distribution_mode}")
156
157 return path, reco_path
158
159
160def add_hlt_processing(path,
161 run_type=constants.RunTypes.beam,
162 softwaretrigger_mode=constants.SoftwareTriggerModes.filter,
163 prune_input=True,
164 prune_output=True,
165 unpacker_components=None,
166 reco_components=None,
167 create_hlt_unit_histograms=True,
168 switch_off_slow_modules_for_online=True,
169 hlt_prefilter_mode=constants.HLTPrefilterModes.monitor,
170 dqm_run_type=None,
171 **kwargs):
172 """
173 Add all modules for processing on HLT filter machines
174 """
175
176 # Check if the run is cosmic and set the Environment accordingly
177 if run_type == constants.RunTypes.cosmic:
178 basf2.declare_cosmics()
179
180 # Check if the run is beam and set the Environment accordingly
181 if run_type == constants.RunTypes.beam:
182 basf2.declare_beam()
183
184 if dqm_run_type is None:
185 dqm_run_type = run_type
186
187 # Always avoid the top-level 'import ROOT'.
188 import ROOT # noqa
189
190 path.add_module('StatisticsSummary').set_name('Sum_Wait')
191
192 if unpacker_components is None:
193 unpacker_components = constants.DEFAULT_HLT_COMPONENTS
194 if reco_components is None:
195 reco_components = constants.DEFAULT_HLT_COMPONENTS
196
197 check_components(unpacker_components)
198 check_components(reco_components)
199
200 # ensure that only DataStore content is present that we expect in
201 # in the HLT configuration. If ROIpayloads or tracks are present in the
202 # input file, this can be a problem and lead to crashes
203 if prune_input:
204 path.add_module("PruneDataStore", matchEntries=constants.HLT_INPUT_OBJECTS)
205
206 # Add the geometry (if not already present)
207 path_utils.add_geometry_if_not_present(path)
208 path.add_module('StatisticsSummary').set_name('Sum_Initialization')
209
210 # Unpack the event content
211 add_unpackers(path, components=unpacker_components, writeKLMDigitRaws=True)
212 path.add_module('StatisticsSummary').set_name('Sum_Unpackers')
213
214 # HLT prefilter
215 path_utils.add_prefilter_module(path, mode=hlt_prefilter_mode)
216
217 # Build one path for all accepted events...
218 accept_path = basf2.Path()
219
220 # Do the reconstruction needed for the HLT decision
221 path_utils.add_pre_filter_reconstruction(
222 path,
223 run_type=run_type,
224 components=reco_components,
225 switch_off_slow_modules_for_online=switch_off_slow_modules_for_online,
226 **kwargs
227 )
228
229 # Perform HLT filter calculation
230 path_utils.add_filter_software_trigger(path, store_array_debug_prescale=1)
231
232 # Add the part of the dqm modules, which should run after every reconstruction
233 path_utils.add_hlt_dqm(path, run_type=dqm_run_type, components=reco_components, dqm_mode=constants.DQMModes.before_filter,
234 create_hlt_unit_histograms=create_hlt_unit_histograms)
235
236 # Only turn on the filtering (by branching the path) if filtering is turned on
237 if softwaretrigger_mode == constants.SoftwareTriggerModes.filter:
238 # Now split up the path according to the HLT decision
239 hlt_filter_module = path_utils.add_filter_module(path)
240
241 # There are two possibilities for the output of this module
242 # (1) the event is dismissed -> only store the metadata
243 path_utils.hlt_event_abort(hlt_filter_module, "==0", ROOT.Belle2.EventMetaData.c_HLTDiscard)
244 # (2) the event is accepted -> go on with the hlt reconstruction
245 hlt_filter_module.if_value("==1", accept_path, basf2.AfterConditionPath.CONTINUE)
246 elif softwaretrigger_mode == constants.SoftwareTriggerModes.monitor:
247 # Otherwise just always go with the accept path
248 path.add_path(accept_path)
249 else:
250 basf2.B2FATAL(f"The software trigger mode {softwaretrigger_mode} is not supported.")
251
252 # For accepted events we continue the reconstruction
253 path_utils.add_post_filter_reconstruction(
254 accept_path,
255 run_type=run_type,
256 components=reco_components,
257 switch_off_slow_modules_for_online=switch_off_slow_modules_for_online
258 )
259
260 # Only create the ROIs for accepted events
261 add_roiFinder(accept_path)
262 accept_path.add_module('StatisticsSummary').set_name('Sum_ROI_Finder')
263
264 # Add the HLT DQM modules only in case the event is accepted
265 path_utils.add_hlt_dqm(
266 accept_path,
267 run_type=dqm_run_type,
268 components=reco_components,
269 dqm_mode=constants.DQMModes.filtered,
270 create_hlt_unit_histograms=create_hlt_unit_histograms)
271
272 # Make sure to create ROI payloads for sending them to ONSEN
273 # Do this for all events
274 # Normally, the payload assembler marks the event with the software trigger decision to inform the hardware to
275 # drop the data for the event in case the decision is "no"
276 # However, if we are running in monitoring mode, we ignore the decision
277 pxd_ignores_hlt_decision = (softwaretrigger_mode == constants.SoftwareTriggerModes.monitor)
278 add_roi_payload_assembler(path, ignore_hlt_decision=pxd_ignores_hlt_decision)
279 path.add_module('StatisticsSummary').set_name('Sum_ROI_Payload_Assembler')
280
281 # Add the part of the dqm modules, which should run on all events, not only on the accepted ones
282 path_utils.add_hlt_dqm(path, run_type=dqm_run_type, components=reco_components, dqm_mode=constants.DQMModes.all_events,
283 create_hlt_unit_histograms=create_hlt_unit_histograms)
284
285 if prune_output:
286 # And in the end remove everything which should not be stored
287 path_utils.add_store_only_rawdata_path(path)
288 path.add_module('StatisticsSummary').set_name('Sum_Close_Event')
289
290
291def add_hlt_passthrough(path):
292 """
293 Add all modules for operating HLT machines in passthrough mode.
294 """
295 add_pxd_percentframe(path, fraction=0.1, random_position=True)
296 add_roi_payload_assembler(path, ignore_hlt_decision=True)
297
298
299def add_expressreco_processing(path,
300 run_type=constants.RunTypes.beam,
301 select_only_accepted_events=False,
302 prune_input=True,
303 prune_output=True,
304 unpacker_components=None,
305 reco_components=None,
306 do_reconstruction=True,
307 switch_off_slow_modules_for_online=True,
308 dqm_run_type=None,
309 **kwargs):
310 """
311 Add all modules for processing on the ExpressReco machines
312 """
313
314 # Check if the run is cosmic and set the Environment accordingly
315 if run_type == constants.RunTypes.cosmic:
316 basf2.declare_cosmics()
317
318 # Check if the run is beam and set the Environment accordingly
319 if run_type == constants.RunTypes.beam:
320 basf2.declare_beam()
321
322 if dqm_run_type is None:
323 dqm_run_type = run_type
324
325 if unpacker_components is None:
326 unpacker_components = constants.DEFAULT_EXPRESSRECO_COMPONENTS
327 if reco_components is None:
328 reco_components = constants.DEFAULT_EXPRESSRECO_COMPONENTS
329
330 check_components(unpacker_components)
331 check_components(reco_components)
332
333 # If turned on, only events selected by the HLT will go to ereco.
334 # this is needed as by default also un-selected events will get passed to ereco,
335 # however they are empty.
336 if select_only_accepted_events:
337 skim_module = path.add_module("TriggerSkim", triggerLines=["software_trigger_cut&all&total_result"], resultOnMissing=0)
338 skim_module.if_value("==0", basf2.Path(), basf2.AfterConditionPath.END)
339
340 # ensure that only DataStore content is present that we expect in
341 # in the ExpressReco configuration. If tracks are present in the
342 # input file, this can be a problem and lead to crashes
343 if prune_input:
344 path.add_module("PruneDataStore", matchEntries=constants.EXPRESSRECO_INPUT_OBJECTS)
345
346 path_utils.add_geometry_if_not_present(path)
347 add_unpackers(path, components=unpacker_components, writeKLMDigitRaws=True)
348
349 # dont filter/prune pxd for partly broken events, as we loose diagnostics in DQM
350 basf2.set_module_parameters(path, "PXDPostErrorChecker", CriticalErrorMask=0)
351
352 if do_reconstruction:
353 if run_type == constants.RunTypes.beam:
354 add_reconstruction(path,
355 components=reco_components,
356 pruneTracks=False,
357 skipGeometryAdding=True,
358 add_trigger_calculation=False,
359 switch_off_slow_modules_for_online=switch_off_slow_modules_for_online,
360 **kwargs)
361 elif run_type == constants.RunTypes.cosmic:
362 add_cosmics_reconstruction(path, components=reco_components, pruneTracks=False,
363 skipGeometryAdding=True, **kwargs)
364 else:
365 basf2.B2FATAL(f"Run Type {run_type} not supported.")
366
367 basf2.set_module_parameters(path, "SVDTimeGrouping", forceGroupingFromDB=False,
368 isEnabledIn6Samples=True, isEnabledIn3Samples=True)
369
370 path_utils.add_expressreco_dqm(path, dqm_run_type, components=reco_components)
371
372 if prune_output:
373 path.add_module("PruneDataStore", matchEntries=constants.ALWAYS_SAVE_OBJECTS + constants.RAWDATA_OBJECTS +
374 constants.PROCESSED_OBJECTS)
375
376
377def finalize_path(path, args, location, show_progress_bar=True):
378 """
379 Add the required output modules for expressreco/HLT
380 """
381 save_objects = constants.ALWAYS_SAVE_OBJECTS + constants.RAWDATA_OBJECTS
382 if location == constants.Location.expressreco:
383 save_objects += constants.PROCESSED_OBJECTS
384
385 if show_progress_bar:
386 path.add_module("Progress")
387
388 # Limit streaming objects for parallel processing
389 basf2.set_streamobjs(save_objects)
390
391 if args.no_output:
392 return
393
394 output_buffer_module_name = ""
395 if location == constants.Location.expressreco:
396 output_buffer_module_name = "Ds2Sample"
397 elif location == constants.Location.hlt:
398 output_buffer_module_name = "Ds2Rbuf"
399 else:
400 basf2.B2FATAL(f"Does not know location {location}")
401
402 if not args.output_file:
403 path.add_module(output_buffer_module_name, RingBufferName=args.output_buffer_name,
404 saveObjs=save_objects)
405 else:
406 if args.output_file.endswith(".sroot"):
407 path.add_module("SeqRootOutput", saveObjs=save_objects, outputFileName=args.output_file)
408 else:
409 # We are storing everything on purpose!
410 path.add_module("RootOutput", outputFileName=args.output_file)
411
412
413def finalize_zmq_path(path, args, location):
414 """
415 Add the required output modules for expressreco/HLT
416 """
417 save_objects = constants.ALWAYS_SAVE_OBJECTS + constants.RAWDATA_OBJECTS
418 if location == constants.Location.expressreco:
419 save_objects += constants.PROCESSED_OBJECTS
420
421 # Limit streaming objects for parallel processing
422 basf2.set_streamobjs(save_objects)
423
424 if location == constants.Location.expressreco:
425 path.add_module("HLTDs2ZMQ", output=args.output, raw=False, outputConfirmation=False)
426 elif location == constants.Location.hlt:
427 path.add_module("HLTDs2ZMQ", output=args.output, raw=True, outputConfirmation=True)
428 else:
429 basf2.B2FATAL(f"Does not know location {location}")