Belle II Software release-09-00-07
processing.py
1
8
9import os
10import argparse
11import multiprocessing
12import tempfile
13
14import basf2
15from softwaretrigger import constants, path_utils
16from geometry import check_components
17from pxd import add_roi_finder, add_roi_payload_assembler, add_pxd_percentframe
18from rawdata import add_unpackers
19from reconstruction import add_reconstruction, add_cosmics_reconstruction
20
21
22def setup_basf2_and_db(event_distribution_mode=constants.EventDistributionModes.ringbuffer):
23 """
24 Setup local database usage for HLT
25 """
26 parser = argparse.ArgumentParser(description='basf2 for online')
27
28 if event_distribution_mode == constants.EventDistributionModes.ringbuffer:
29 parser.add_argument('input_buffer_name', type=str,
30 help='Input Ring Buffer names')
31 parser.add_argument('output_buffer_name', type=str,
32 help='Output Ring Buffer name')
33 parser.add_argument('histo_port', type=int,
34 help='Port of the HistoManager to connect to')
35 parser.add_argument('--input-file', type=str,
36 help="Input sroot file, if set no RingBuffer input will be used",
37 default=None)
38 parser.add_argument('--output-file', type=str,
39 help="Filename for SeqRoot output, if set no RingBuffer output will be used",
40 default=None)
41 parser.add_argument('--histo-output-file', type=str,
42 help="Filename for histogram output",
43 default=None)
44 parser.add_argument('--no-output',
45 help="Don't write any output files",
46 action="store_true", default=False)
47 else:
48 parser.add_argument("--input", required=True, type=str, help="ZMQ Address of the distributor process")
49 parser.add_argument("--output", required=True, type=str, help="ZMQ Address of the collector process")
50 parser.add_argument("--dqm", required=True, type=str, help="ZMQ Address of the histoserver process")
51
52 parser.add_argument('--number-processes', type=int, default=multiprocessing.cpu_count() - 5,
53 help='Number of parallel processes to use')
54 parser.add_argument('--local-db-path', type=str,
55 help="set path to the local payload locations to use for the ConditionDB",
56 default=constants.DEFAULT_DB_FILE_LOCATION)
57 parser.add_argument('--local-db-tag', type=str, nargs="*",
58 help="Use the local db with a specific tag (can be applied multiple times, order is relevant)")
59 parser.add_argument('--central-db-tag', type=str, nargs="*",
60 help="Use the central db with a specific tag (can be applied multiple times, order is relevant)")
61 parser.add_argument('--udp-hostname', type=str,
62 help="set hostname for UDP logging connection", default=None)
63 parser.add_argument('--udp-port', type=int,
64 help="set port number for UDP logging connection", default=None)
65
66 args = parser.parse_args()
67
68 # Local DB specification
69 basf2.conditions.override_globaltags()
70 if args.central_db_tag:
71 for central_tag in args.central_db_tag:
72 basf2.conditions.prepend_globaltag(central_tag)
73 else:
74 if args.local_db_tag:
75 for local_tag in args.local_db_tag:
76 basf2.conditions.prepend_globaltag(local_tag)
77 else:
78 basf2.conditions.globaltags = ["online"]
79 basf2.conditions.metadata_providers = ["file://" + basf2.find_file(args.local_db_path + "/metadata.sqlite")]
80 basf2.conditions.payload_locations = [basf2.find_file(args.local_db_path)]
81
82 # Number of processes
83 basf2.set_nprocesses(args.number_processes)
84
85 # basf2 logging setup
86 basf2.set_log_level(basf2.LogLevel.ERROR)
87 # And because reasons we want every log message to be only one line,
88 # otherwise the LogFilter in daq_slc throws away the other lines
89 basf2.logging.enable_escape_newlines = True
90
91 # UDP logging
92 if (args.udp_hostname is not None) and (args.udp_port is not None):
93 basf2.logging.add_udp(args.udp_hostname, args.udp_port)
94
95 # Online realm
96 basf2.set_realm("online")
97
98 return args
99
100
101def start_path(args, location):
102 """
103 Create and return a path used for HLT and ExpressReco running
104 """
105 path = basf2.create_path()
106
107 input_buffer_module_name = ""
108 if location == constants.Location.expressreco:
109 input_buffer_module_name = "Rbuf2Ds"
110 elif location == constants.Location.hlt:
111 input_buffer_module_name = "Raw2Ds"
112 else:
113 basf2.B2FATAL(f"Does not know location {location}")
114
115 # Input
116 if not args.input_file:
117 path.add_module(input_buffer_module_name, RingBufferName=args.input_buffer_name)
118 else:
119 if args.input_file.endswith(".sroot"):
120 path.add_module('SeqRootInput', inputFileName=args.input_file)
121 else:
122 path.add_module('RootInput', inputFileName=args.input_file)
123
124 # Histogram Handling
125 if not args.histo_output_file:
126 path.add_module('DqmHistoManager', Port=args.histo_port, DumpInterval=1000, workDirName=tempfile.gettempdir()+"/")
127 else:
128 workdir = os.path.dirname(args.histo_output_file)
129 filename = os.path.basename(args.histo_output_file)
130 path.add_module('HistoManager', histoFileName=filename, workDirName=workdir)
131
132 return path
133
134
135def start_zmq_path(args, location, event_distribution_mode):
136 path = basf2.Path()
137 reco_path = basf2.Path()
138
139 if location == constants.Location.expressreco:
140 input_module = path.add_module("HLTZMQ2Ds", input=args.input, addExpressRecoObjects=True)
141 elif location == constants.Location.hlt:
142 input_module = path.add_module("HLTZMQ2Ds", input=args.input)
143 else:
144 basf2.B2FATAL(f"Does not know location {location}")
145
146 # zmq needs to discard the first event from HLTZMQ2Ds level
147 if event_distribution_mode == constants.EventDistributionModes.zmq:
148 input_module.if_value("==0", reco_path, basf2.AfterConditionPath.CONTINUE)
149 reco_path.add_module("HLTDQM2ZMQ", output=args.dqm, sendOutInterval=30)
150 # zmqbasf2 needs to pass the first event to the remaining path
151 elif event_distribution_mode == constants.EventDistributionModes.zmqbasf2:
152 path.add_module("HLTDQM2ZMQ", output=args.dqm, sendOutInterval=30)
153 else:
154 basf2.B2FATAL(f"Does not know event_distribution_mode {event_distribution_mode}")
155
156 return path, reco_path
157
158
159def add_hlt_processing(path,
160 run_type=constants.RunTypes.beam,
161 softwaretrigger_mode=constants.SoftwareTriggerModes.filter,
162 prune_input=True,
163 prune_output=True,
164 unpacker_components=None,
165 reco_components=None,
166 create_hlt_unit_histograms=True,
167 switch_off_slow_modules_for_online=True,
168 hlt_prefilter_mode=constants.HLTPrefilterModes.monitor,
169 dqm_run_type=None,
170 **kwargs):
171 """
172 Add all modules for processing on HLT filter machines
173 """
174
175 # Check if the run is cosmic and set the Environment accordingly
176 if run_type == constants.RunTypes.cosmic:
177 basf2.declare_cosmics()
178
179 # Check if the run is beam and set the Environment accordingly
180 if run_type == constants.RunTypes.beam:
181 basf2.declare_beam()
182
183 if dqm_run_type is None:
184 dqm_run_type = run_type
185
186 # Always avoid the top-level 'import ROOT'.
187 import ROOT # noqa
188
189 path.add_module('StatisticsSummary').set_name('Sum_Wait')
190
191 if unpacker_components is None:
192 unpacker_components = constants.DEFAULT_HLT_COMPONENTS
193 if reco_components is None:
194 reco_components = constants.DEFAULT_HLT_COMPONENTS
195
196 check_components(unpacker_components)
197 check_components(reco_components)
198
199 # ensure that only DataStore content is present that we expect in
200 # in the HLT configuration. If ROIpayloads or tracks are present in the
201 # input file, this can be a problem and lead to crashes
202 if prune_input:
203 path.add_module("PruneDataStore", matchEntries=constants.HLT_INPUT_OBJECTS)
204
205 # Add the geometry (if not already present)
206 path_utils.add_geometry_if_not_present(path)
207 path.add_module('StatisticsSummary').set_name('Sum_Initialization')
208
209 # Unpack the event content
210 add_unpackers(path, components=unpacker_components, writeKLMDigitRaws=True)
211 path.add_module('StatisticsSummary').set_name('Sum_Unpackers')
212
213 # HLT prefilter
214 path_utils.add_prefilter_module(path, mode=hlt_prefilter_mode)
215
216 # Build one path for all accepted events...
217 accept_path = basf2.Path()
218
219 # Do the reconstruction needed for the HLT decision
220 path_utils.add_pre_filter_reconstruction(
221 path,
222 run_type=run_type,
223 components=reco_components,
224 switch_off_slow_modules_for_online=switch_off_slow_modules_for_online,
225 **kwargs
226 )
227
228 # Perform HLT filter calculation
229 path_utils.add_filter_software_trigger(path, store_array_debug_prescale=1)
230
231 # Add the part of the dqm modules, which should run after every reconstruction
232 path_utils.add_hlt_dqm(path, run_type=dqm_run_type, components=reco_components, dqm_mode=constants.DQMModes.before_filter,
233 create_hlt_unit_histograms=create_hlt_unit_histograms)
234
235 # Only turn on the filtering (by branching the path) if filtering is turned on
236 if softwaretrigger_mode == constants.SoftwareTriggerModes.filter:
237 # Now split up the path according to the HLT decision
238 hlt_filter_module = path_utils.add_filter_module(path)
239
240 # There are two possibilities for the output of this module
241 # (1) the event is dismissed -> only store the metadata
242 path_utils.hlt_event_abort(hlt_filter_module, "==0", ROOT.Belle2.EventMetaData.c_HLTDiscard)
243 # (2) the event is accepted -> go on with the hlt reconstruction
244 hlt_filter_module.if_value("==1", accept_path, basf2.AfterConditionPath.CONTINUE)
245 elif softwaretrigger_mode == constants.SoftwareTriggerModes.monitor:
246 # Otherwise just always go with the accept path
247 path.add_path(accept_path)
248 else:
249 basf2.B2FATAL(f"The software trigger mode {softwaretrigger_mode} is not supported.")
250
251 # For accepted events we continue the reconstruction
252 path_utils.add_post_filter_reconstruction(
253 accept_path,
254 run_type=run_type,
255 components=reco_components,
256 switch_off_slow_modules_for_online=switch_off_slow_modules_for_online
257 )
258
259 # Only create the ROIs for accepted events
260 add_roi_finder(accept_path)
261 accept_path.add_module('StatisticsSummary').set_name('Sum_ROI_Finder')
262
263 # Add the HLT DQM modules only in case the event is accepted
264 path_utils.add_hlt_dqm(
265 accept_path,
266 run_type=dqm_run_type,
267 components=reco_components,
268 dqm_mode=constants.DQMModes.filtered,
269 create_hlt_unit_histograms=create_hlt_unit_histograms)
270
271 # Make sure to create ROI payloads for sending them to ONSEN
272 # Do this for all events
273 # Normally, the payload assembler marks the event with the software trigger decision to inform the hardware to
274 # drop the data for the event in case the decision is "no"
275 # However, if we are running in monitoring mode, we ignore the decision
276 pxd_ignores_hlt_decision = (softwaretrigger_mode == constants.SoftwareTriggerModes.monitor)
277 add_roi_payload_assembler(path, ignore_hlt_decision=pxd_ignores_hlt_decision)
278 path.add_module('StatisticsSummary').set_name('Sum_ROI_Payload_Assembler')
279
280 # Add the part of the dqm modules, which should run on all events, not only on the accepted ones
281 path_utils.add_hlt_dqm(path, run_type=dqm_run_type, components=reco_components, dqm_mode=constants.DQMModes.all_events,
282 create_hlt_unit_histograms=create_hlt_unit_histograms)
283
284 if prune_output:
285 # And in the end remove everything which should not be stored
286 path_utils.add_store_only_rawdata_path(path)
287 path.add_module('StatisticsSummary').set_name('Sum_Close_Event')
288
289
290def add_hlt_passthrough(path):
291 """
292 Add all modules for operating HLT machines in passthrough mode.
293 """
294 add_pxd_percentframe(path, fraction=0.1, random_position=True)
295 add_roi_payload_assembler(path, ignore_hlt_decision=True)
296
297
298def add_expressreco_processing(path,
299 run_type=constants.RunTypes.beam,
300 select_only_accepted_events=False,
301 prune_input=True,
302 prune_output=True,
303 unpacker_components=None,
304 reco_components=None,
305 do_reconstruction=True,
306 switch_off_slow_modules_for_online=True,
307 dqm_run_type=None,
308 **kwargs):
309 """
310 Add all modules for processing on the ExpressReco machines
311 """
312
313 # Check if the run is cosmic and set the Environment accordingly
314 if run_type == constants.RunTypes.cosmic:
315 basf2.declare_cosmics()
316
317 # Check if the run is beam and set the Environment accordingly
318 if run_type == constants.RunTypes.beam:
319 basf2.declare_beam()
320
321 if dqm_run_type is None:
322 dqm_run_type = run_type
323
324 if unpacker_components is None:
325 unpacker_components = constants.DEFAULT_EXPRESSRECO_COMPONENTS
326 if reco_components is None:
327 reco_components = constants.DEFAULT_EXPRESSRECO_COMPONENTS
328
329 check_components(unpacker_components)
330 check_components(reco_components)
331
332 # If turned on, only events selected by the HLT will go to ereco.
333 # this is needed as by default also un-selected events will get passed to ereco,
334 # however they are empty.
335 if select_only_accepted_events:
336 skim_module = path.add_module("TriggerSkim", triggerLines=["software_trigger_cut&all&total_result"], resultOnMissing=0)
337 skim_module.if_value("==0", basf2.Path(), basf2.AfterConditionPath.END)
338
339 # ensure that only DataStore content is present that we expect in
340 # in the ExpressReco configuration. If tracks are present in the
341 # input file, this can be a problem and lead to crashes
342 if prune_input:
343 path.add_module("PruneDataStore", matchEntries=constants.EXPRESSRECO_INPUT_OBJECTS)
344
345 path_utils.add_geometry_if_not_present(path)
346 add_unpackers(path, components=unpacker_components, writeKLMDigitRaws=True)
347
348 # dont filter/prune pxd for partly broken events, as we loose diagnostics in DQM
349 basf2.set_module_parameters(path, "PXDPostErrorChecker", CriticalErrorMask=0)
350
351 if do_reconstruction:
352 if run_type == constants.RunTypes.beam:
353 add_reconstruction(path,
354 components=reco_components,
355 pruneTracks=False,
356 skipGeometryAdding=True,
357 add_trigger_calculation=False,
358 switch_off_slow_modules_for_online=switch_off_slow_modules_for_online,
359 **kwargs)
360 elif run_type == constants.RunTypes.cosmic:
361 add_cosmics_reconstruction(path, components=reco_components, pruneTracks=False,
362 skipGeometryAdding=True, **kwargs)
363 else:
364 basf2.B2FATAL(f"Run Type {run_type} not supported.")
365
366 basf2.set_module_parameters(path, "SVDTimeGrouping", forceGroupingFromDB=False,
367 isEnabledIn6Samples=True, isEnabledIn3Samples=True)
368
369 path_utils.add_expressreco_dqm(path, dqm_run_type, components=reco_components)
370
371 if prune_output:
372 path.add_module("PruneDataStore", matchEntries=constants.ALWAYS_SAVE_OBJECTS + constants.RAWDATA_OBJECTS +
373 constants.PROCESSED_OBJECTS)
374
375
376def finalize_path(path, args, location, show_progress_bar=True):
377 """
378 Add the required output modules for expressreco/HLT
379 """
380 save_objects = constants.ALWAYS_SAVE_OBJECTS + constants.RAWDATA_OBJECTS
381 if location == constants.Location.expressreco:
382 save_objects += constants.PROCESSED_OBJECTS
383
384 if show_progress_bar:
385 path.add_module("Progress")
386
387 # Limit streaming objects for parallel processing
388 basf2.set_streamobjs(save_objects)
389
390 if args.no_output:
391 return
392
393 output_buffer_module_name = ""
394 if location == constants.Location.expressreco:
395 output_buffer_module_name = "Ds2Sample"
396 elif location == constants.Location.hlt:
397 output_buffer_module_name = "Ds2Rbuf"
398 else:
399 basf2.B2FATAL(f"Does not know location {location}")
400
401 if not args.output_file:
402 path.add_module(output_buffer_module_name, RingBufferName=args.output_buffer_name,
403 saveObjs=save_objects)
404 else:
405 if args.output_file.endswith(".sroot"):
406 path.add_module("SeqRootOutput", saveObjs=save_objects, outputFileName=args.output_file)
407 else:
408 # We are storing everything on purpose!
409 path.add_module("RootOutput", outputFileName=args.output_file)
410
411
412def finalize_zmq_path(path, args, location):
413 """
414 Add the required output modules for expressreco/HLT
415 """
416 save_objects = constants.ALWAYS_SAVE_OBJECTS + constants.RAWDATA_OBJECTS
417 if location == constants.Location.expressreco:
418 save_objects += constants.PROCESSED_OBJECTS
419
420 # Limit streaming objects for parallel processing
421 basf2.set_streamobjs(save_objects)
422
423 if location == constants.Location.expressreco:
424 path.add_module("HLTDs2ZMQ", output=args.output, raw=False, outputConfirmation=False)
425 elif location == constants.Location.hlt:
426 path.add_module("HLTDs2ZMQ", output=args.output, raw=True, outputConfirmation=True)
427 else:
428 basf2.B2FATAL(f"Does not know location {location}")