15from softwaretrigger
import constants, path_utils
16from geometry
import check_components
17from pxd
import add_pxd_percentframe
18from rawdata
import add_unpackers
19from reconstruction
import add_reconstruction, add_cosmics_reconstruction
20from tracking
import add_roiFinder, add_roi_payload_assembler
23def setup_basf2_and_db(event_distribution_mode=constants.EventDistributionModes.ringbuffer):
25 Setup local database usage for HLT
27 parser = argparse.ArgumentParser(description=
'basf2 for online')
29 if event_distribution_mode == constants.EventDistributionModes.ringbuffer:
30 parser.add_argument(
'input_buffer_name', type=str,
31 help=
'Input Ring Buffer names')
32 parser.add_argument(
'output_buffer_name', type=str,
33 help=
'Output Ring Buffer name')
34 parser.add_argument(
'histo_port', type=int,
35 help=
'Port of the HistoManager to connect to')
36 parser.add_argument(
'--input-file', type=str,
37 help=
"Input sroot file, if set no RingBuffer input will be used",
39 parser.add_argument(
'--output-file', type=str,
40 help=
"Filename for SeqRoot output, if set no RingBuffer output will be used",
42 parser.add_argument(
'--histo-output-file', type=str,
43 help=
"Filename for histogram output",
45 parser.add_argument(
'--no-output',
46 help=
"Don't write any output files",
47 action=
"store_true", default=
False)
49 parser.add_argument(
"--input", required=
True, type=str, help=
"ZMQ Address of the distributor process")
50 parser.add_argument(
"--output", required=
True, type=str, help=
"ZMQ Address of the collector process")
51 parser.add_argument(
"--dqm", required=
True, type=str, help=
"ZMQ Address of the histoserver process")
52 if event_distribution_mode == constants.EventDistributionModes.zmqbasf2:
53 parser.add_argument(
'--input-file', type=str,
54 help=
"Input (s)root file, ZMQ input address is ignored if this is set",
56 parser.add_argument(
'--output-file', type=str,
57 help=
"Filename for (s)root output, ZMQ input address is ignored if this is set",
59 parser.add_argument(
'--histo-output-file', type=str,
60 help=
"Filename for histogram output, ZMQ histoserver address is ignored if this is set",
63 parser.add_argument(
'--number-processes', type=int, default=multiprocessing.cpu_count() - 5,
64 help=
'Number of parallel processes to use')
65 parser.add_argument(
'--local-db-path', type=str,
66 help=
"set path to the local payload locations to use for the ConditionDB",
67 default=constants.DEFAULT_DB_FILE_LOCATION)
68 parser.add_argument(
'--local-db-tag', type=str, nargs=
"*",
69 help=
"Use the local db with a specific tag (can be applied multiple times, order is relevant)")
70 parser.add_argument(
'--central-db-tag', type=str, nargs=
"*",
71 help=
"Use the central db with a specific tag (can be applied multiple times, order is relevant)")
72 parser.add_argument(
'--udp-hostname', type=str,
73 help=
"set hostname for UDP logging connection", default=
None)
74 parser.add_argument(
'--udp-port', type=int,
75 help=
"set port number for UDP logging connection", default=
None)
77 args = parser.parse_args()
80 basf2.conditions.override_globaltags()
81 if args.central_db_tag:
82 for central_tag
in args.central_db_tag:
83 basf2.conditions.prepend_globaltag(central_tag)
86 for local_tag
in args.local_db_tag:
87 basf2.conditions.prepend_globaltag(local_tag)
89 basf2.conditions.globaltags = [
"online"]
90 basf2.conditions.metadata_providers = [
"file://" + basf2.find_file(args.local_db_path +
"/metadata.sqlite")]
91 basf2.conditions.payload_locations = [basf2.find_file(args.local_db_path)]
94 basf2.set_nprocesses(args.number_processes)
97 basf2.set_log_level(basf2.LogLevel.ERROR)
100 basf2.logging.enable_escape_newlines =
True
103 if (args.udp_hostname
is not None)
and (args.udp_port
is not None):
104 basf2.logging.add_udp(args.udp_hostname, args.udp_port)
107 basf2.set_realm(
"online")
112def start_path(args, location):
114 Create and return a path used for HLT and ExpressReco running
116 path = basf2.create_path()
118 input_buffer_module_name =
""
119 if location == constants.Location.expressreco:
120 input_buffer_module_name =
"Rbuf2Ds"
121 elif location == constants.Location.hlt:
122 input_buffer_module_name =
"Raw2Ds"
124 basf2.B2FATAL(f
"Does not know location {location}")
127 if not args.input_file:
128 path.add_module(input_buffer_module_name, RingBufferName=args.input_buffer_name)
130 if args.input_file.endswith(
".sroot"):
131 path.add_module(
'SeqRootInput', inputFileName=args.input_file)
133 path.add_module(
'RootInput', inputFileName=args.input_file)
136 if not args.histo_output_file:
137 path.add_module(
'DqmHistoManager', Port=args.histo_port, DumpInterval=1000, workDirName=tempfile.gettempdir()+
"/")
139 workdir = os.path.dirname(args.histo_output_file)
140 filename = os.path.basename(args.histo_output_file)
141 path.add_module(
'HistoManager', histoFileName=filename, workDirName=workdir)
146def start_zmq_path(args, location, event_distribution_mode):
148 reco_path = basf2.Path()
150 if not args.input_file:
151 if location == constants.Location.expressreco:
152 input_module = path.add_module(
"HLTZMQ2Ds", input=args.input, addExpressRecoObjects=
True)
153 elif location == constants.Location.hlt:
154 input_module = path.add_module(
"HLTZMQ2Ds", input=args.input)
156 basf2.B2FATAL(f
"Does not know location {location}")
158 if args.input_file.endswith(
".sroot"):
159 path.add_module(
'SeqRootInput', inputFileName=args.input_file)
161 path.add_module(
'RootInput', inputFileName=args.input_file)
164 if not args.histo_output_file:
165 if event_distribution_mode == constants.EventDistributionModes.zmq:
166 input_module.if_value(
"==0", reco_path, basf2.AfterConditionPath.CONTINUE)
167 reco_path.add_module(
"HLTDQM2ZMQ", output=args.dqm, sendOutInterval=30)
169 elif event_distribution_mode == constants.EventDistributionModes.zmqbasf2:
170 path.add_module(
"HLTDQM2ZMQ", output=args.dqm, sendOutInterval=30)
172 basf2.B2FATAL(f
"Does not know event_distribution_mode {event_distribution_mode}")
174 workdir = os.path.dirname(args.histo_output_file)
175 filename = os.path.basename(args.histo_output_file)
176 path.add_module(
'HistoManager', histoFileName=filename, workDirName=workdir)
178 return path, reco_path
181def add_hlt_processing(path,
182 run_type=constants.RunTypes.beam,
183 softwaretrigger_mode=constants.SoftwareTriggerModes.filter,
186 unpacker_components=None,
187 reco_components=None,
188 create_hlt_unit_histograms=True,
189 switch_off_slow_modules_for_online=True,
193 Add all modules for processing on HLT filter machines
197 if run_type == constants.RunTypes.cosmic:
198 basf2.declare_cosmics()
201 if run_type == constants.RunTypes.beam:
204 if dqm_run_type
is None:
205 dqm_run_type = run_type
210 path.add_module(
'StatisticsSummary').set_name(
'Sum_Wait')
212 if unpacker_components
is None:
213 unpacker_components = constants.DEFAULT_HLT_COMPONENTS
214 if reco_components
is None:
215 reco_components = constants.DEFAULT_HLT_COMPONENTS
217 check_components(unpacker_components)
218 check_components(reco_components)
224 path.add_module(
"PruneDataStore", matchEntries=constants.HLT_INPUT_OBJECTS)
227 path_utils.add_geometry_if_not_present(path)
228 path.add_module(
'StatisticsSummary').set_name(
'Sum_Initialization')
231 add_unpackers(path, components=unpacker_components, writeKLMDigitRaws=
True)
232 path.add_module(
'StatisticsSummary').set_name(
'Sum_Unpackers')
235 path_utils.add_prefilter_module(path)
238 accept_path = basf2.Path()
241 path_utils.add_pre_filter_reconstruction(
244 components=reco_components,
245 switch_off_slow_modules_for_online=switch_off_slow_modules_for_online,
250 path_utils.add_filter_software_trigger(path, store_array_debug_prescale=1)
253 path_utils.add_hlt_dqm(path, run_type=dqm_run_type, components=reco_components, dqm_mode=constants.DQMModes.before_filter,
254 create_hlt_unit_histograms=create_hlt_unit_histograms)
257 if softwaretrigger_mode == constants.SoftwareTriggerModes.filter:
259 hlt_filter_module = path_utils.add_filter_module(path)
262 path_utils.hlt_event_abort(hlt_filter_module,
"==0", ROOT.Belle2.EventMetaData.c_HLTDiscard)
264 hlt_filter_module.if_value(
"==1", accept_path, basf2.AfterConditionPath.CONTINUE)
265 elif softwaretrigger_mode == constants.SoftwareTriggerModes.monitor:
267 path.add_path(accept_path)
269 basf2.B2FATAL(f
"The software trigger mode {softwaretrigger_mode} is not supported.")
272 path_utils.add_post_filter_reconstruction(
275 components=reco_components,
276 switch_off_slow_modules_for_online=switch_off_slow_modules_for_online,
281 add_roiFinder(accept_path)
282 accept_path.add_module(
'StatisticsSummary').set_name(
'Sum_ROI_Finder')
285 path_utils.add_hlt_dqm(
287 run_type=dqm_run_type,
288 components=reco_components,
289 dqm_mode=constants.DQMModes.filtered,
290 create_hlt_unit_histograms=create_hlt_unit_histograms)
297 pxd_ignores_hlt_decision = (softwaretrigger_mode == constants.SoftwareTriggerModes.monitor)
298 add_roi_payload_assembler(path, ignore_hlt_decision=pxd_ignores_hlt_decision)
299 path.add_module(
'StatisticsSummary').set_name(
'Sum_ROI_Payload_Assembler')
302 path_utils.add_hlt_dqm(path, run_type=dqm_run_type, components=reco_components, dqm_mode=constants.DQMModes.all_events,
303 create_hlt_unit_histograms=create_hlt_unit_histograms)
307 path_utils.add_store_only_rawdata_path(path)
308 path.add_module(
'StatisticsSummary').set_name(
'Sum_Close_Event')
311def add_hlt_passthrough(path):
313 Add all modules for operating HLT machines in passthrough mode.
315 add_pxd_percentframe(path, fraction=0.1, random_position=
True)
316 add_roi_payload_assembler(path, ignore_hlt_decision=
True)
319def add_expressreco_processing(path,
320 run_type=constants.RunTypes.beam,
321 select_only_accepted_events=False,
324 unpacker_components=None,
325 reco_components=None,
326 do_reconstruction=True,
327 switch_off_slow_modules_for_online=True,
331 Add all modules for processing on the ExpressReco machines
335 if run_type == constants.RunTypes.cosmic:
336 basf2.declare_cosmics()
339 if run_type == constants.RunTypes.beam:
342 if dqm_run_type
is None:
343 dqm_run_type = run_type
345 if unpacker_components
is None:
346 unpacker_components = constants.DEFAULT_EXPRESSRECO_COMPONENTS
347 if reco_components
is None:
348 reco_components = constants.DEFAULT_EXPRESSRECO_COMPONENTS
350 check_components(unpacker_components)
351 check_components(reco_components)
356 if select_only_accepted_events:
357 skim_module = path.add_module(
"TriggerSkim", triggerLines=[
"software_trigger_cut&all&total_result"], resultOnMissing=0)
358 skim_module.if_value(
"==0", basf2.Path(), basf2.AfterConditionPath.END)
364 path.add_module(
"PruneDataStore", matchEntries=constants.EXPRESSRECO_INPUT_OBJECTS)
366 path_utils.add_geometry_if_not_present(path)
367 add_unpackers(path, components=unpacker_components, writeKLMDigitRaws=
True)
370 basf2.set_module_parameters(path,
"PXDPostErrorChecker", CriticalErrorMask=0)
372 if do_reconstruction:
373 if run_type == constants.RunTypes.beam:
374 add_reconstruction(path,
375 components=reco_components,
377 skipGeometryAdding=
True,
378 add_trigger_calculation=
False,
379 switch_off_slow_modules_for_online=switch_off_slow_modules_for_online,
381 elif run_type == constants.RunTypes.cosmic:
382 add_cosmics_reconstruction(path, components=reco_components, pruneTracks=
False,
383 skipGeometryAdding=
True, **kwargs)
385 basf2.B2FATAL(f
"Run Type {run_type} not supported.")
387 basf2.set_module_parameters(path,
"SVDTimeGrouping", forceGroupingFromDB=
False,
388 isEnabledIn6Samples=
True, isEnabledIn3Samples=
True)
390 path_utils.add_expressreco_dqm(path, dqm_run_type, components=reco_components)
393 path.add_module(
"PruneDataStore", matchEntries=constants.ALWAYS_SAVE_OBJECTS + constants.RAWDATA_OBJECTS +
394 constants.PROCESSED_OBJECTS)
397def finalize_path(path, args, location, show_progress_bar=True):
399 Add the required output modules for expressreco/HLT
401 save_objects = constants.ALWAYS_SAVE_OBJECTS + constants.RAWDATA_OBJECTS
402 if location == constants.Location.expressreco:
403 save_objects += constants.PROCESSED_OBJECTS
405 if show_progress_bar:
406 path.add_module(
"Progress")
409 basf2.set_streamobjs(save_objects)
414 output_buffer_module_name =
""
415 if location == constants.Location.expressreco:
416 output_buffer_module_name =
"Ds2Sample"
417 elif location == constants.Location.hlt:
418 output_buffer_module_name =
"Ds2Rbuf"
420 basf2.B2FATAL(f
"Does not know location {location}")
422 if not args.output_file:
423 path.add_module(output_buffer_module_name, RingBufferName=args.output_buffer_name,
424 saveObjs=save_objects)
426 if args.output_file.endswith(
".sroot"):
427 path.add_module(
"SeqRootOutput", saveObjs=save_objects, outputFileName=args.output_file)
430 path.add_module(
"RootOutput", outputFileName=args.output_file)
433def finalize_zmq_path(path, args, location):
435 Add the required output modules for expressreco/HLT
437 save_objects = constants.ALWAYS_SAVE_OBJECTS + constants.RAWDATA_OBJECTS
438 if location == constants.Location.expressreco:
439 save_objects += constants.PROCESSED_OBJECTS
442 basf2.set_streamobjs(save_objects)
444 if not args.output_file:
445 if location == constants.Location.expressreco:
446 path.add_module(
"HLTDs2ZMQ", output=args.output, raw=
False, outputConfirmation=
False)
447 elif location == constants.Location.hlt:
448 path.add_module(
"HLTDs2ZMQ", output=args.output, raw=
True, outputConfirmation=
True)
450 basf2.B2FATAL(f
"Does not know location {location}")
452 if args.output_file.endswith(
".sroot"):
453 path.add_module(
"SeqRootOutput", saveObjs=save_objects, outputFileName=args.output_file)
456 path.add_module(
"RootOutput", outputFileName=args.output_file)