15from softwaretrigger
import constants
16from pxd
import add_roi_payload_assembler, add_roi_finder
18from reconstruction
import add_reconstruction, add_cosmics_reconstruction
19from softwaretrigger
import path_utils
20from geometry
import check_components
21from rawdata
import add_unpackers
24def setup_basf2_and_db(zmq=False):
26 Setup local database usage for HLT
28 parser = argparse.ArgumentParser(description='basf2 for online')
31 parser.add_argument(
"--input", required=
True, type=str, help=
"ZMQ Address of the distributor process")
32 parser.add_argument(
"--output", required=
True, type=str, help=
"ZMQ Address of the collector process")
33 parser.add_argument(
"--dqm", required=
True, type=str, help=
"ZMQ Address of the histoserver process")
35 parser.add_argument(
'input_buffer_name', type=str,
36 help=
'Input Ring Buffer names')
37 parser.add_argument(
'output_buffer_name', type=str,
38 help=
'Output Ring Buffer name')
39 parser.add_argument(
'histo_port', type=int,
40 help=
'Port of the HistoManager to connect to')
41 parser.add_argument(
'--input-file', type=str,
42 help=
"Input sroot file, if set no RingBuffer input will be used",
44 parser.add_argument(
'--output-file', type=str,
45 help=
"Filename for SeqRoot output, if set no RingBuffer output will be used",
47 parser.add_argument(
'--histo-output-file', type=str,
48 help=
"Filename for histogram output",
50 parser.add_argument(
'--no-output',
51 help=
"Don't write any output files",
52 action=
"store_true", default=
False)
54 parser.add_argument(
'--number-processes', type=int, default=multiprocessing.cpu_count() - 5,
55 help=
'Number of parallel processes to use')
56 parser.add_argument(
'--local-db-path', type=str,
57 help=
"set path to the local payload locations to use for the ConditionDB",
58 default=constants.DEFAULT_DB_FILE_LOCATION)
59 parser.add_argument(
'--local-db-tag', type=str, nargs=
"*",
60 help=
"Use the local db with a specific tag (can be applied multiple times, order is relevant)")
61 parser.add_argument(
'--central-db-tag', type=str, nargs=
"*",
62 help=
"Use the central db with a specific tag (can be applied multiple times, order is relevant)")
63 parser.add_argument(
'--udp-hostname', type=str,
64 help=
"set hostname for UDP logging connection", default=
None)
65 parser.add_argument(
'--udp-port', type=int,
66 help=
"set port number for UDP logging connection", default=
None)
68 args = parser.parse_args()
71 basf2.conditions.override_globaltags()
72 if args.central_db_tag:
73 for central_tag
in args.central_db_tag:
74 basf2.conditions.prepend_globaltag(central_tag)
77 for local_tag
in args.local_db_tag:
78 basf2.conditions.prepend_globaltag(local_tag)
80 basf2.conditions.globaltags = [
"online"]
81 basf2.conditions.metadata_providers = [
"file://" + basf2.find_file(args.local_db_path +
"/metadata.sqlite")]
82 basf2.conditions.payload_locations = [basf2.find_file(args.local_db_path)]
85 basf2.set_nprocesses(args.number_processes)
88 basf2.set_log_level(basf2.LogLevel.ERROR)
91 basf2.logging.enable_escape_newlines =
True
94 if (args.udp_hostname
is not None)
and (args.udp_port
is not None):
95 basf2.logging.add_udp(args.udp_hostname, args.udp_port)
98 basf2.set_realm(
"online")
103def start_path(args, location):
105 Create and return a path used
for HLT
and ExpressReco running
107 path = basf2.create_path()
109 input_buffer_module_name = ""
110 if location == constants.Location.expressreco:
111 input_buffer_module_name =
"Rbuf2Ds"
112 elif location == constants.Location.hlt:
113 input_buffer_module_name =
"Raw2Ds"
115 basf2.B2FATAL(f
"Does not know location {location}")
118 if not args.input_file:
119 path.add_module(input_buffer_module_name, RingBufferName=args.input_buffer_name)
121 if args.input_file.endswith(
".sroot"):
122 path.add_module(
'SeqRootInput', inputFileName=args.input_file)
124 path.add_module(
'RootInput', inputFileName=args.input_file)
127 if not args.histo_output_file:
128 path.add_module(
'DqmHistoManager', Port=args.histo_port, DumpInterval=1000, workDirName=tempfile.gettempdir()+
"/")
130 workdir = os.path.dirname(args.histo_output_file)
131 filename = os.path.basename(args.histo_output_file)
132 path.add_module(
'HistoManager', histoFileName=filename, workDirName=workdir)
137def start_zmq_path(args, location):
139 reco_path = basf2.Path()
141 if location == constants.Location.expressreco:
142 input_module = path.add_module(
"HLTZMQ2Ds", input=args.input, addExpressRecoObjects=
True)
143 elif location == constants.Location.hlt:
144 input_module = path.add_module(
"HLTZMQ2Ds", input=args.input)
146 basf2.B2FATAL(f
"Does not know location {location}")
148 input_module.if_value(
"==0", reco_path, basf2.AfterConditionPath.CONTINUE)
149 reco_path.add_module(
"HLTDQM2ZMQ", output=args.dqm, sendOutInterval=30)
151 return path, reco_path
154def add_hlt_processing(path,
155 run_type=constants.RunTypes.beam,
156 softwaretrigger_mode=constants.SoftwareTriggerModes.filter,
159 unpacker_components=None,
160 reco_components=None,
161 create_hlt_unit_histograms=True,
164 Add all modules for processing on HLT filter machines
168 if run_type == constants.RunTypes.cosmic:
169 basf2.declare_cosmics()
172 if run_type == constants.RunTypes.beam:
178 path.add_module(
'StatisticsSummary').set_name(
'Sum_Wait')
180 if unpacker_components
is None:
181 unpacker_components = constants.DEFAULT_HLT_COMPONENTS
182 if reco_components
is None:
183 reco_components = constants.DEFAULT_HLT_COMPONENTS
185 check_components(unpacker_components)
186 check_components(reco_components)
192 path.add_module(
"PruneDataStore", matchEntries=constants.HLT_INPUT_OBJECTS)
195 path_utils.add_geometry_if_not_present(path)
196 path.add_module(
'StatisticsSummary').set_name(
'Sum_Initialization')
199 add_unpackers(path, components=unpacker_components, writeKLMDigitRaws=
True)
200 path.add_module(
'StatisticsSummary').set_name(
'Sum_Unpackers')
203 accept_path = basf2.Path()
206 path_utils.add_pre_filter_reconstruction(path, run_type=run_type, components=reco_components, **kwargs)
209 path_utils.add_filter_software_trigger(path, store_array_debug_prescale=1)
212 path_utils.add_hlt_dqm(path, run_type=run_type, components=reco_components, dqm_mode=constants.DQMModes.before_filter,
213 create_hlt_unit_histograms=create_hlt_unit_histograms)
216 if softwaretrigger_mode == constants.SoftwareTriggerModes.filter:
218 hlt_filter_module = path_utils.add_filter_module(path)
222 path_utils.hlt_event_abort(hlt_filter_module,
"==0", ROOT.Belle2.EventMetaData.c_HLTDiscard)
224 hlt_filter_module.if_value(
"==1", accept_path, basf2.AfterConditionPath.CONTINUE)
225 elif softwaretrigger_mode == constants.SoftwareTriggerModes.monitor:
227 path.add_path(accept_path)
229 basf2.B2FATAL(f
"The software trigger mode {softwaretrigger_mode} is not supported.")
232 path_utils.add_post_filter_reconstruction(accept_path, run_type=run_type, components=reco_components)
235 add_roi_finder(accept_path)
236 accept_path.add_module(
'StatisticsSummary').set_name(
'Sum_ROI_Finder')
239 path_utils.add_hlt_dqm(
242 components=reco_components,
243 dqm_mode=constants.DQMModes.filtered,
244 create_hlt_unit_histograms=create_hlt_unit_histograms)
251 pxd_ignores_hlt_decision = (softwaretrigger_mode == constants.SoftwareTriggerModes.monitor)
252 add_roi_payload_assembler(path, ignore_hlt_decision=pxd_ignores_hlt_decision)
253 path.add_module(
'StatisticsSummary').set_name(
'Sum_ROI_Payload_Assembler')
256 path_utils.add_hlt_dqm(path, run_type=run_type, components=reco_components, dqm_mode=constants.DQMModes.all_events,
257 create_hlt_unit_histograms=create_hlt_unit_histograms)
261 path_utils.add_store_only_rawdata_path(path)
262 path.add_module(
'StatisticsSummary').set_name(
'Sum_Close_Event')
265def add_expressreco_processing(path,
266 run_type=constants.RunTypes.beam,
267 select_only_accepted_events=False,
270 unpacker_components=None,
271 reco_components=None,
272 do_reconstruction=True,
275 Add all modules for processing on the ExpressReco machines
279 if run_type == constants.RunTypes.cosmic:
280 basf2.declare_cosmics()
283 if run_type == constants.RunTypes.beam:
286 if unpacker_components
is None:
287 unpacker_components = constants.DEFAULT_EXPRESSRECO_COMPONENTS
288 if reco_components
is None:
289 reco_components = constants.DEFAULT_EXPRESSRECO_COMPONENTS
291 check_components(unpacker_components)
292 check_components(reco_components)
297 if select_only_accepted_events:
298 skim_module = path.add_module(
"TriggerSkim", triggerLines=[
"software_trigger_cut&all&total_result"], resultOnMissing=0)
299 skim_module.if_value(
"==0", basf2.Path(), basf2.AfterConditionPath.END)
305 path.add_module(
"PruneDataStore", matchEntries=constants.EXPRESSRECO_INPUT_OBJECTS)
307 path_utils.add_geometry_if_not_present(path)
308 add_unpackers(path, components=unpacker_components, writeKLMDigitRaws=
True)
311 basf2.set_module_parameters(path,
"PXDPostErrorChecker", CriticalErrorMask=0)
313 if do_reconstruction:
314 if run_type == constants.RunTypes.beam:
315 add_reconstruction(path, components=reco_components, pruneTracks=
False,
316 skipGeometryAdding=
True, add_trigger_calculation=
False, **kwargs)
317 elif run_type == constants.RunTypes.cosmic:
318 add_cosmics_reconstruction(path, components=reco_components, pruneTracks=
False,
319 skipGeometryAdding=
True, **kwargs)
321 basf2.B2FATAL(f
"Run Type {run_type} not supported.")
323 basf2.set_module_parameters(path,
"SVDTimeGrouping", forceGroupingFromDB=
False,
324 isEnabledIn6Samples=
True, isEnabledIn3Samples=
True)
326 path_utils.add_expressreco_dqm(path, run_type, components=reco_components)
329 path.add_module(
"PruneDataStore", matchEntries=constants.ALWAYS_SAVE_OBJECTS + constants.RAWDATA_OBJECTS +
330 constants.PROCESSED_OBJECTS)
333def finalize_path(path, args, location, show_progress_bar=True):
335 Add the required output modules for expressreco/HLT
337 save_objects = constants.ALWAYS_SAVE_OBJECTS + constants.RAWDATA_OBJECTS
338 if location == constants.Location.expressreco:
339 save_objects += constants.PROCESSED_OBJECTS
341 if show_progress_bar:
342 path.add_module(
"Progress")
345 basf2.set_streamobjs(save_objects)
350 output_buffer_module_name =
""
351 if location == constants.Location.expressreco:
352 output_buffer_module_name =
"Ds2Sample"
353 elif location == constants.Location.hlt:
354 output_buffer_module_name =
"Ds2Rbuf"
356 basf2.B2FATAL(f
"Does not know location {location}")
358 if not args.output_file:
359 path.add_module(output_buffer_module_name, RingBufferName=args.output_buffer_name,
360 saveObjs=save_objects)
362 if args.output_file.endswith(
".sroot"):
363 path.add_module(
"SeqRootOutput", saveObjs=save_objects, outputFileName=args.output_file)
366 path.add_module(
"RootOutput", outputFileName=args.output_file)
369def finalize_zmq_path(path, args, location):
371 Add the required output modules for expressreco/HLT
373 save_objects = constants.ALWAYS_SAVE_OBJECTS + constants.RAWDATA_OBJECTS
374 if location == constants.Location.expressreco:
375 save_objects += constants.PROCESSED_OBJECTS
378 basf2.set_streamobjs(save_objects)
380 if location == constants.Location.expressreco:
381 path.add_module(
"HLTDs2ZMQ", output=args.output, raw=
False, outputConfirmation=
False)
382 elif location == constants.Location.hlt:
383 path.add_module(
"HLTDs2ZMQ", output=args.output, raw=
True, outputConfirmation=
True)
385 basf2.B2FATAL(f
"Does not know location {location}")