8 from softwaretrigger
import constants
9 from pxd
import add_roi_payload_assembler, add_roi_finder
11 from reconstruction
import add_reconstruction, add_cosmics_reconstruction
12 from softwaretrigger
import path_utils
14 from geometry
import check_components
15 from rawdata
import add_unpackers
18 def setup_basf2_and_db(zmq=False):
20 Setup local database usage for HLT
22 parser = argparse.ArgumentParser(description=
'basf2 for online')
25 parser.add_argument(
"--input", required=
True, type=str, help=
"ZMQ Address of the distributor process")
26 parser.add_argument(
"--output", required=
True, type=str, help=
"ZMQ Address of the collector process")
27 parser.add_argument(
"--dqm", required=
True, type=str, help=
"ZMQ Address of the histoserver process")
29 parser.add_argument(
'input_buffer_name', type=str,
30 help=
'Input Ring Buffer names')
31 parser.add_argument(
'output_buffer_name', type=str,
32 help=
'Output Ring Buffer name')
33 parser.add_argument(
'histo_port', type=int,
34 help=
'Port of the HistoManager to connect to')
35 parser.add_argument(
'--input-file', type=str,
36 help=
"Input sroot file, if set no RingBuffer input will be used",
38 parser.add_argument(
'--output-file', type=str,
39 help=
"Filename for SeqRoot output, if set no RingBuffer output will be used",
41 parser.add_argument(
'--histo-output-file', type=str,
42 help=
"Filename for histogram output",
44 parser.add_argument(
'--no-output',
45 help=
"Don't write any output files",
46 action=
"store_true", default=
False)
48 parser.add_argument(
'--number-processes', type=int, default=multiprocessing.cpu_count() - 5,
49 help=
'Number of parallel processes to use')
50 parser.add_argument(
'--local-db-path', type=str,
51 help=
"set path to the local payload locations to use for the ConditionDB",
52 default=constants.DEFAULT_DB_FILE_LOCATION)
53 parser.add_argument(
'--central-db-tag', type=str, nargs=
"*",
54 help=
"Use the central db with a specific tag (can be applied multiple times, order is relevant)")
56 args = parser.parse_args()
59 basf2.reset_database()
60 basf2.conditions.override_globaltags()
61 if args.central_db_tag:
62 for central_tag
in args.central_db_tag:
63 basf2.conditions.prepend_globaltag(central_tag)
65 basf2.conditions.globaltags = [
"online"]
66 basf2.conditions.metadata_providers = [
"file://" + basf2.find_file(args.local_db_path +
"/metadata.sqlite")]
67 basf2.conditions.payload_locations = [basf2.find_file(args.local_db_path)]
70 basf2.set_nprocesses(args.number_processes)
73 basf2.set_log_level(basf2.LogLevel.ERROR)
76 basf2.logging.enable_escape_newlines =
True
81 def start_path(args, location):
83 Create and return a path used for HLT and ExpressReco running
85 path = basf2.create_path()
87 input_buffer_module_name =
""
88 if location == constants.Location.expressreco:
89 input_buffer_module_name =
"Rbuf2Ds"
90 elif location == constants.Location.hlt:
91 input_buffer_module_name =
"Raw2Ds"
93 basf2.B2FATAL(f
"Does not know location {location}")
96 if not args.input_file:
97 path.add_module(input_buffer_module_name, RingBufferName=args.input_buffer_name)
99 if args.input_file.endswith(
".sroot"):
100 path.add_module(
'SeqRootInput', inputFileName=args.input_file)
102 path.add_module(
'RootInput', inputFileName=args.input_file)
105 if not args.histo_output_file:
106 path.add_module(
'DqmHistoManager', Port=args.histo_port, DumpInterval=1000, workDirName=
"/tmp/")
108 path.add_module(
'HistoManager', histoFileName=args.histo_output_file)
113 def start_zmq_path(args, location):
115 reco_path = basf2.Path()
117 if location == constants.Location.expressreco:
118 input_module = path.add_module(
"HLTZMQ2Ds", input=args.input, addExpressRecoObjects=
True)
119 elif location == constants.Location.hlt:
120 input_module = path.add_module(
"HLTZMQ2Ds", input=args.input)
122 basf2.B2FATAL(f
"Does not know location {location}")
124 input_module.if_value(
"==0", reco_path, basf2.AfterConditionPath.CONTINUE)
125 reco_path.add_module(
"HLTDQM2ZMQ", output=args.dqm, sendOutInterval=30)
127 return path, reco_path
130 def add_hlt_processing(path,
131 run_type=constants.RunTypes.beam,
132 softwaretrigger_mode=constants.SoftwareTriggerModes.filter,
135 unpacker_components=None,
136 reco_components=None,
137 create_hlt_unit_histograms=True,
140 Add all modules for processing on HLT filter machines
142 path.add_module(
'StatisticsSummary').set_name(
'Sum_Wait')
144 if unpacker_components
is None:
145 unpacker_components = constants.DEFAULT_HLT_COMPONENTS
146 if reco_components
is None:
147 reco_components = constants.DEFAULT_HLT_COMPONENTS
149 check_components(unpacker_components)
150 check_components(reco_components)
156 path.add_module(
"PruneDataStore", matchEntries=constants.HLT_INPUT_OBJECTS)
159 path_utils.add_geometry_if_not_present(path)
160 path.add_module(
'StatisticsSummary').set_name(
'Sum_Initialization')
163 add_unpackers(path, components=unpacker_components, writeKLMDigitRaws=
True)
164 path.add_module(
'StatisticsSummary').set_name(
'Sum_Unpackers')
167 accept_path = basf2.Path()
170 path_utils.add_filter_reconstruction(path, run_type=run_type, components=reco_components, **kwargs)
173 path_utils.add_hlt_dqm(path, run_type=run_type, components=reco_components, dqm_mode=constants.DQMModes.before_filter,
174 create_hlt_unit_histograms=create_hlt_unit_histograms)
177 if softwaretrigger_mode == constants.SoftwareTriggerModes.filter:
179 hlt_filter_module = path_utils.add_filter_module(path)
183 path_utils.hlt_event_abort(hlt_filter_module,
"==0", ROOT.Belle2.EventMetaData.c_HLTDiscard)
185 hlt_filter_module.if_value(
"==1", accept_path, basf2.AfterConditionPath.CONTINUE)
186 elif softwaretrigger_mode == constants.SoftwareTriggerModes.monitor:
188 path.add_path(accept_path)
190 basf2.B2FATAL(f
"The software trigger mode {softwaretrigger_mode} is not supported.")
193 path_utils.add_post_filter_reconstruction(accept_path, run_type=run_type, components=reco_components)
196 add_roi_finder(accept_path)
197 accept_path.add_module(
'StatisticsSummary').set_name(
'Sum_ROI_Finder')
200 path_utils.add_hlt_dqm(
203 components=reco_components,
204 dqm_mode=constants.DQMModes.filtered,
205 create_hlt_unit_histograms=create_hlt_unit_histograms)
212 pxd_ignores_hlt_decision = (softwaretrigger_mode == constants.SoftwareTriggerModes.monitor)
213 add_roi_payload_assembler(path, ignore_hlt_decision=pxd_ignores_hlt_decision)
214 path.add_module(
'StatisticsSummary').set_name(
'Sum_ROI_Payload_Assembler')
217 path_utils.add_hlt_dqm(path, run_type=run_type, components=reco_components, dqm_mode=constants.DQMModes.all_events,
218 create_hlt_unit_histograms=create_hlt_unit_histograms)
222 path_utils.add_store_only_rawdata_path(path)
223 path.add_module(
'StatisticsSummary').set_name(
'Sum_Close_Event')
226 def add_expressreco_processing(path,
227 run_type=constants.RunTypes.beam,
228 select_only_accepted_events=False,
231 unpacker_components=None,
232 reco_components=None,
233 do_reconstruction=True,
236 Add all modules for processing on the ExpressReco machines
238 if unpacker_components
is None:
239 unpacker_components = constants.DEFAULT_EXPRESSRECO_COMPONENTS
240 if reco_components
is None:
241 reco_components = constants.DEFAULT_EXPRESSRECO_COMPONENTS
243 check_components(unpacker_components)
244 check_components(reco_components)
249 if select_only_accepted_events:
250 skim_module = path.add_module(
"TriggerSkim", triggerLines=[
"software_trigger_cut&all&total_result"], resultOnMissing=0)
251 skim_module.if_value(
"==0", basf2.Path(), basf2.AfterConditionPath.END)
257 path.add_module(
"PruneDataStore", matchEntries=constants.EXPRESSRECO_INPUT_OBJECTS)
259 path_utils.add_geometry_if_not_present(path)
260 add_unpackers(path, components=unpacker_components, writeKLMDigitRaws=
True)
263 basf2.set_module_parameters(path,
"PXDPostErrorChecker", CriticalErrorMask=0)
265 if do_reconstruction:
266 if run_type == constants.RunTypes.beam:
267 add_reconstruction(path, components=reco_components, pruneTracks=
False,
268 skipGeometryAdding=
True, add_trigger_calculation=
False, **kwargs)
269 elif run_type == constants.RunTypes.cosmic:
270 add_cosmics_reconstruction(path, components=reco_components, pruneTracks=
False,
271 skipGeometryAdding=
True, **kwargs)
273 basf2.B2FATAL(
"Run Type {} not supported.".format(run_type))
275 path_utils.add_expressreco_dqm(path, run_type, components=reco_components)
295 path.add_module(
"PruneDataStore", matchEntries=constants.ALWAYS_SAVE_OBJECTS + constants.RAWDATA_OBJECTS +
296 constants.PROCESSED_OBJECTS)
299 def finalize_path(path, args, location, show_progress_bar=True):
301 Add the required output modules for expressreco/HLT
303 save_objects = constants.ALWAYS_SAVE_OBJECTS + constants.RAWDATA_OBJECTS
304 if location == constants.Location.expressreco:
305 save_objects += constants.PROCESSED_OBJECTS
307 if show_progress_bar:
308 path.add_module(
"Progress")
311 basf2.set_streamobjs(save_objects)
316 output_buffer_module_name =
""
317 if location == constants.Location.expressreco:
318 output_buffer_module_name =
"Ds2Sample"
319 elif location == constants.Location.hlt:
320 output_buffer_module_name =
"Ds2Rbuf"
322 basf2.B2FATAL(f
"Does not know location {location}")
324 if not args.output_file:
325 path.add_module(output_buffer_module_name, RingBufferName=args.output_buffer_name,
326 saveObjs=save_objects)
328 if args.output_file.endswith(
".sroot"):
329 path.add_module(
"SeqRootOutput", saveObjs=save_objects, outputFileName=args.output_file)
332 path.add_module(
"RootOutput", outputFileName=args.output_file)
335 def finalize_zmq_path(path, args, location):
337 Add the required output modules for expressreco/HLT
339 save_objects = constants.ALWAYS_SAVE_OBJECTS + constants.RAWDATA_OBJECTS
340 if location == constants.Location.expressreco:
341 save_objects += constants.PROCESSED_OBJECTS
344 basf2.set_streamobjs(save_objects)
346 if location == constants.Location.expressreco:
347 path.add_module(
"HLTDs2ZMQ", output=args.output, raw=
False)
348 elif location == constants.Location.hlt:
349 path.add_module(
"HLTDs2ZMQ", output=args.output, raw=
True)
351 basf2.B2FATAL(f
"Does not know location {location}")