Belle II Software development
processing.py
1
8import os
9import argparse
10import multiprocessing
11import tempfile
12
13import basf2
14
15from softwaretrigger import constants
16from tracking import add_roiFinder, add_roi_payload_assembler
17
18from reconstruction import add_reconstruction, add_cosmics_reconstruction
19from softwaretrigger import path_utils
20from geometry import check_components
21from rawdata import add_unpackers
22
23
24def setup_basf2_and_db(zmq=False):
25 """
26 Setup local database usage for HLT
27 """
28 parser = argparse.ArgumentParser(description='basf2 for online')
29
30 if zmq:
31 parser.add_argument("--input", required=True, type=str, help="ZMQ Address of the distributor process")
32 parser.add_argument("--output", required=True, type=str, help="ZMQ Address of the collector process")
33 parser.add_argument("--dqm", required=True, type=str, help="ZMQ Address of the histoserver process")
34 else:
35 parser.add_argument('input_buffer_name', type=str,
36 help='Input Ring Buffer names')
37 parser.add_argument('output_buffer_name', type=str,
38 help='Output Ring Buffer name')
39 parser.add_argument('histo_port', type=int,
40 help='Port of the HistoManager to connect to')
41 parser.add_argument('--input-file', type=str,
42 help="Input sroot file, if set no RingBuffer input will be used",
43 default=None)
44 parser.add_argument('--output-file', type=str,
45 help="Filename for SeqRoot output, if set no RingBuffer output will be used",
46 default=None)
47 parser.add_argument('--histo-output-file', type=str,
48 help="Filename for histogram output",
49 default=None)
50 parser.add_argument('--no-output',
51 help="Don't write any output files",
52 action="store_true", default=False)
53
54 parser.add_argument('--number-processes', type=int, default=multiprocessing.cpu_count() - 5,
55 help='Number of parallel processes to use')
56 parser.add_argument('--local-db-path', type=str,
57 help="set path to the local payload locations to use for the ConditionDB",
58 default=constants.DEFAULT_DB_FILE_LOCATION)
59 parser.add_argument('--local-db-tag', type=str, nargs="*",
60 help="Use the local db with a specific tag (can be applied multiple times, order is relevant)")
61 parser.add_argument('--central-db-tag', type=str, nargs="*",
62 help="Use the central db with a specific tag (can be applied multiple times, order is relevant)")
63 parser.add_argument('--udp-hostname', type=str,
64 help="set hostname for UDP logging connection", default=None)
65 parser.add_argument('--udp-port', type=int,
66 help="set port number for UDP logging connection", default=None)
67
68 args = parser.parse_args()
69
70 # Local DB specification
71 basf2.conditions.override_globaltags()
72 if args.central_db_tag:
73 for central_tag in args.central_db_tag:
74 basf2.conditions.prepend_globaltag(central_tag)
75 else:
76 if args.local_db_tag:
77 for local_tag in args.local_db_tag:
78 basf2.conditions.prepend_globaltag(local_tag)
79 else:
80 basf2.conditions.globaltags = ["online"]
81 basf2.conditions.metadata_providers = ["file://" + basf2.find_file(args.local_db_path + "/metadata.sqlite")]
82 basf2.conditions.payload_locations = [basf2.find_file(args.local_db_path)]
83
84 # Number of processes
85 basf2.set_nprocesses(args.number_processes)
86
87 # basf2 logging setup
88 basf2.set_log_level(basf2.LogLevel.ERROR)
89 # And because reasons we want every log message to be only one line,
90 # otherwise the LogFilter in daq_slc throws away the other lines
91 basf2.logging.enable_escape_newlines = True
92
93 # UDP logging
94 if (args.udp_hostname is not None) and (args.udp_port is not None):
95 basf2.logging.add_udp(args.udp_hostname, args.udp_port)
96
97 # Online realm
98 basf2.set_realm("online")
99
100 return args
101
102
103def start_path(args, location):
104 """
105 Create and return a path used for HLT and ExpressReco running
106 """
107 path = basf2.create_path()
108
109 input_buffer_module_name = ""
110 if location == constants.Location.expressreco:
111 input_buffer_module_name = "Rbuf2Ds"
112 elif location == constants.Location.hlt:
113 input_buffer_module_name = "Raw2Ds"
114 else:
115 basf2.B2FATAL(f"Does not know location {location}")
116
117 # Input
118 if not args.input_file:
119 path.add_module(input_buffer_module_name, RingBufferName=args.input_buffer_name)
120 else:
121 if args.input_file.endswith(".sroot"):
122 path.add_module('SeqRootInput', inputFileName=args.input_file)
123 else:
124 path.add_module('RootInput', inputFileName=args.input_file)
125
126 # Histogram Handling
127 if not args.histo_output_file:
128 path.add_module('DqmHistoManager', Port=args.histo_port, DumpInterval=1000, workDirName=tempfile.gettempdir()+"/")
129 else:
130 workdir = os.path.dirname(args.histo_output_file)
131 filename = os.path.basename(args.histo_output_file)
132 path.add_module('HistoManager', histoFileName=filename, workDirName=workdir)
133
134 return path
135
136
137def start_zmq_path(args, location):
138 path = basf2.Path()
139 reco_path = basf2.Path()
140
141 if location == constants.Location.expressreco:
142 input_module = path.add_module("HLTZMQ2Ds", input=args.input, addExpressRecoObjects=True)
143 elif location == constants.Location.hlt:
144 input_module = path.add_module("HLTZMQ2Ds", input=args.input)
145 else:
146 basf2.B2FATAL(f"Does not know location {location}")
147
148 input_module.if_value("==0", reco_path, basf2.AfterConditionPath.CONTINUE)
149 reco_path.add_module("HLTDQM2ZMQ", output=args.dqm, sendOutInterval=30)
150
151 return path, reco_path
152
153
154def add_hlt_processing(path,
155 run_type=constants.RunTypes.beam,
156 softwaretrigger_mode=constants.SoftwareTriggerModes.filter,
157 prune_input=True,
158 prune_output=True,
159 unpacker_components=None,
160 reco_components=None,
161 create_hlt_unit_histograms=True,
162 switch_off_slow_modules_for_online=True,
163 **kwargs):
164 """
165 Add all modules for processing on HLT filter machines
166 """
167
168 # Check if the run is cosmic and set the Environment accordingly
169 if run_type == constants.RunTypes.cosmic:
170 basf2.declare_cosmics()
171
172 # Check if the run is beam and set the Environment accordingly
173 if run_type == constants.RunTypes.beam:
174 basf2.declare_beam()
175
176 # Always avoid the top-level 'import ROOT'.
177 import ROOT # noqa
178
179 path.add_module('StatisticsSummary').set_name('Sum_Wait')
180
181 if unpacker_components is None:
182 unpacker_components = constants.DEFAULT_HLT_COMPONENTS
183 if reco_components is None:
184 reco_components = constants.DEFAULT_HLT_COMPONENTS
185
186 check_components(unpacker_components)
187 check_components(reco_components)
188
189 # ensure that only DataStore content is present that we expect in
190 # in the HLT configuration. If ROIpayloads or tracks are present in the
191 # input file, this can be a problem and lead to crashes
192 if prune_input:
193 path.add_module("PruneDataStore", matchEntries=constants.HLT_INPUT_OBJECTS)
194
195 # Add the geometry (if not already present)
196 path_utils.add_geometry_if_not_present(path)
197 path.add_module('StatisticsSummary').set_name('Sum_Initialization')
198
199 # Unpack the event content
200 add_unpackers(path, components=unpacker_components, writeKLMDigitRaws=True)
201 path.add_module('StatisticsSummary').set_name('Sum_Unpackers')
202
203 # Build one path for all accepted events...
204 accept_path = basf2.Path()
205
206 # Do the reconstruction needed for the HLT decision
207 path_utils.add_pre_filter_reconstruction(
208 path,
209 run_type=run_type,
210 components=reco_components,
211 switch_off_slow_modules_for_online=switch_off_slow_modules_for_online,
212 **kwargs
213 )
214
215 # Perform HLT filter calculation
216 path_utils.add_filter_software_trigger(path, store_array_debug_prescale=1)
217
218 # Add the part of the dqm modules, which should run after every reconstruction
219 path_utils.add_hlt_dqm(path, run_type=run_type, components=reco_components, dqm_mode=constants.DQMModes.before_filter,
220 create_hlt_unit_histograms=create_hlt_unit_histograms)
221
222 # Only turn on the filtering (by branching the path) if filtering is turned on
223 if softwaretrigger_mode == constants.SoftwareTriggerModes.filter:
224 # Now split up the path according to the HLT decision
225 hlt_filter_module = path_utils.add_filter_module(path)
226
227 # There are two possibilities for the output of this module
228 # (1) the event is dismissed -> only store the metadata
229 path_utils.hlt_event_abort(hlt_filter_module, "==0", ROOT.Belle2.EventMetaData.c_HLTDiscard)
230 # (2) the event is accepted -> go on with the hlt reconstruction
231 hlt_filter_module.if_value("==1", accept_path, basf2.AfterConditionPath.CONTINUE)
232 elif softwaretrigger_mode == constants.SoftwareTriggerModes.monitor:
233 # Otherwise just always go with the accept path
234 path.add_path(accept_path)
235 else:
236 basf2.B2FATAL(f"The software trigger mode {softwaretrigger_mode} is not supported.")
237
238 # For accepted events we continue the reconstruction
239 path_utils.add_post_filter_reconstruction(
240 accept_path,
241 run_type=run_type,
242 components=reco_components,
243 switch_off_slow_modules_for_online=switch_off_slow_modules_for_online
244 )
245
246 # Only create the ROIs for accepted events
247 add_roiFinder(accept_path)
248 accept_path.add_module('StatisticsSummary').set_name('Sum_ROI_Finder')
249
250 # Add the HLT DQM modules only in case the event is accepted
251 path_utils.add_hlt_dqm(
252 accept_path,
253 run_type=run_type,
254 components=reco_components,
255 dqm_mode=constants.DQMModes.filtered,
256 create_hlt_unit_histograms=create_hlt_unit_histograms)
257
258 # Make sure to create ROI payloads for sending them to ONSEN
259 # Do this for all events
260 # Normally, the payload assembler marks the event with the software trigger decision to inform the hardware to
261 # drop the data for the event in case the decision is "no"
262 # However, if we are running in monitoring mode, we ignore the decision
263 pxd_ignores_hlt_decision = (softwaretrigger_mode == constants.SoftwareTriggerModes.monitor)
264 add_roi_payload_assembler(path, ignore_hlt_decision=pxd_ignores_hlt_decision)
265 path.add_module('StatisticsSummary').set_name('Sum_ROI_Payload_Assembler')
266
267 # Add the part of the dqm modules, which should run on all events, not only on the accepted ones
268 path_utils.add_hlt_dqm(path, run_type=run_type, components=reco_components, dqm_mode=constants.DQMModes.all_events,
269 create_hlt_unit_histograms=create_hlt_unit_histograms)
270
271 if prune_output:
272 # And in the end remove everything which should not be stored
273 path_utils.add_store_only_rawdata_path(path)
274 path.add_module('StatisticsSummary').set_name('Sum_Close_Event')
275
276
277def add_expressreco_processing(path,
278 run_type=constants.RunTypes.beam,
279 select_only_accepted_events=False,
280 prune_input=True,
281 prune_output=True,
282 unpacker_components=None,
283 reco_components=None,
284 do_reconstruction=True,
285 switch_off_slow_modules_for_online=True,
286 **kwargs):
287 """
288 Add all modules for processing on the ExpressReco machines
289 """
290
291 # Check if the run is cosmic and set the Environment accordingly
292 if run_type == constants.RunTypes.cosmic:
293 basf2.declare_cosmics()
294
295 # Check if the run is beam and set the Environment accordingly
296 if run_type == constants.RunTypes.beam:
297 basf2.declare_beam()
298
299 if unpacker_components is None:
300 unpacker_components = constants.DEFAULT_EXPRESSRECO_COMPONENTS
301 if reco_components is None:
302 reco_components = constants.DEFAULT_EXPRESSRECO_COMPONENTS
303
304 check_components(unpacker_components)
305 check_components(reco_components)
306
307 # If turned on, only events selected by the HLT will go to ereco.
308 # this is needed as by default also un-selected events will get passed to ereco,
309 # however they are empty.
310 if select_only_accepted_events:
311 skim_module = path.add_module("TriggerSkim", triggerLines=["software_trigger_cut&all&total_result"], resultOnMissing=0)
312 skim_module.if_value("==0", basf2.Path(), basf2.AfterConditionPath.END)
313
314 # ensure that only DataStore content is present that we expect in
315 # in the ExpressReco configuration. If tracks are present in the
316 # input file, this can be a problem and lead to crashes
317 if prune_input:
318 path.add_module("PruneDataStore", matchEntries=constants.EXPRESSRECO_INPUT_OBJECTS)
319
320 path_utils.add_geometry_if_not_present(path)
321 add_unpackers(path, components=unpacker_components, writeKLMDigitRaws=True)
322
323 # dont filter/prune pxd for partly broken events, as we loose diagnostics in DQM
324 basf2.set_module_parameters(path, "PXDPostErrorChecker", CriticalErrorMask=0)
325
326 if do_reconstruction:
327 if run_type == constants.RunTypes.beam:
328 add_reconstruction(path,
329 components=reco_components,
330 pruneTracks=False,
331 skipGeometryAdding=True,
332 add_trigger_calculation=False,
333 switch_off_slow_modules_for_online=switch_off_slow_modules_for_online,
334 **kwargs)
335 elif run_type == constants.RunTypes.cosmic:
336 add_cosmics_reconstruction(path, components=reco_components, pruneTracks=False,
337 skipGeometryAdding=True, **kwargs)
338 else:
339 basf2.B2FATAL(f"Run Type {run_type} not supported.")
340
341 basf2.set_module_parameters(path, "SVDTimeGrouping", forceGroupingFromDB=False,
342 isEnabledIn6Samples=True, isEnabledIn3Samples=True)
343
344 path_utils.add_expressreco_dqm(path, run_type, components=reco_components)
345
346 if prune_output:
347 path.add_module("PruneDataStore", matchEntries=constants.ALWAYS_SAVE_OBJECTS + constants.RAWDATA_OBJECTS +
348 constants.PROCESSED_OBJECTS)
349
350
351def finalize_path(path, args, location, show_progress_bar=True):
352 """
353 Add the required output modules for expressreco/HLT
354 """
355 save_objects = constants.ALWAYS_SAVE_OBJECTS + constants.RAWDATA_OBJECTS
356 if location == constants.Location.expressreco:
357 save_objects += constants.PROCESSED_OBJECTS
358
359 if show_progress_bar:
360 path.add_module("Progress")
361
362 # Limit streaming objects for parallel processing
363 basf2.set_streamobjs(save_objects)
364
365 if args.no_output:
366 return
367
368 output_buffer_module_name = ""
369 if location == constants.Location.expressreco:
370 output_buffer_module_name = "Ds2Sample"
371 elif location == constants.Location.hlt:
372 output_buffer_module_name = "Ds2Rbuf"
373 else:
374 basf2.B2FATAL(f"Does not know location {location}")
375
376 if not args.output_file:
377 path.add_module(output_buffer_module_name, RingBufferName=args.output_buffer_name,
378 saveObjs=save_objects)
379 else:
380 if args.output_file.endswith(".sroot"):
381 path.add_module("SeqRootOutput", saveObjs=save_objects, outputFileName=args.output_file)
382 else:
383 # We are storing everything on purpose!
384 path.add_module("RootOutput", outputFileName=args.output_file)
385
386
387def finalize_zmq_path(path, args, location):
388 """
389 Add the required output modules for expressreco/HLT
390 """
391 save_objects = constants.ALWAYS_SAVE_OBJECTS + constants.RAWDATA_OBJECTS
392 if location == constants.Location.expressreco:
393 save_objects += constants.PROCESSED_OBJECTS
394
395 # Limit streaming objects for parallel processing
396 basf2.set_streamobjs(save_objects)
397
398 if location == constants.Location.expressreco:
399 path.add_module("HLTDs2ZMQ", output=args.output, raw=False, outputConfirmation=False)
400 elif location == constants.Location.hlt:
401 path.add_module("HLTDs2ZMQ", output=args.output, raw=True, outputConfirmation=True)
402 else:
403 basf2.B2FATAL(f"Does not know location {location}")