10 import multiprocessing
 
   15 from softwaretrigger 
import constants
 
   16 from pxd 
import add_roi_payload_assembler, add_roi_finder
 
   18 from reconstruction 
import add_reconstruction, add_cosmics_reconstruction
 
   19 from softwaretrigger 
import path_utils
 
   20 from geometry 
import check_components
 
   21 from rawdata 
import add_unpackers
 
   24 def setup_basf2_and_db(zmq=False):
 
   26     Setup local database usage for HLT 
   28     parser = argparse.ArgumentParser(description=
'basf2 for online')
 
   31         parser.add_argument(
"--input", required=
True, type=str, help=
"ZMQ Address of the distributor process")
 
   32         parser.add_argument(
"--output", required=
True, type=str, help=
"ZMQ Address of the collector process")
 
   33         parser.add_argument(
"--dqm", required=
True, type=str, help=
"ZMQ Address of the histoserver process")
 
   35         parser.add_argument(
'input_buffer_name', type=str,
 
   36                             help=
'Input Ring Buffer names')
 
   37         parser.add_argument(
'output_buffer_name', type=str,
 
   38                             help=
'Output Ring Buffer name')
 
   39         parser.add_argument(
'histo_port', type=int,
 
   40                             help=
'Port of the HistoManager to connect to')
 
   41         parser.add_argument(
'--input-file', type=str,
 
   42                             help=
"Input sroot file, if set no RingBuffer input will be used",
 
   44         parser.add_argument(
'--output-file', type=str,
 
   45                             help=
"Filename for SeqRoot output, if set no RingBuffer output will be used",
 
   47         parser.add_argument(
'--histo-output-file', type=str,
 
   48                             help=
"Filename for histogram output",
 
   50         parser.add_argument(
'--no-output',
 
   51                             help=
"Don't write any output files",
 
   52                             action=
"store_true", default=
False)
 
   54     parser.add_argument(
'--number-processes', type=int, default=multiprocessing.cpu_count() - 5,
 
   55                         help=
'Number of parallel processes to use')
 
   56     parser.add_argument(
'--local-db-path', type=str,
 
   57                         help=
"set path to the local payload locations to use for the ConditionDB",
 
   58                         default=constants.DEFAULT_DB_FILE_LOCATION)
 
   59     parser.add_argument(
'--central-db-tag', type=str, nargs=
"*",
 
   60                         help=
"Use the central db with a specific tag (can be applied multiple times, order is relevant)")
 
   61     parser.add_argument(
'--udp-hostname', type=str,
 
   62                         help=
"set hostname for UDP logging connection", default=
None)
 
   63     parser.add_argument(
'--udp-port', type=int,
 
   64                         help=
"set port number for UDP logging connection", default=
None)
 
   66     args = parser.parse_args()
 
   69     basf2.conditions.override_globaltags()
 
   70     if args.central_db_tag:
 
   71         for central_tag 
in args.central_db_tag:
 
   72             basf2.conditions.prepend_globaltag(central_tag)
 
   74         basf2.conditions.globaltags = [
"online"]
 
   75         basf2.conditions.metadata_providers = [
"file://" + basf2.find_file(args.local_db_path + 
"/metadata.sqlite")]
 
   76         basf2.conditions.payload_locations = [basf2.find_file(args.local_db_path)]
 
   79     basf2.set_nprocesses(args.number_processes)
 
   82     basf2.set_log_level(basf2.LogLevel.ERROR)
 
   85     basf2.logging.enable_escape_newlines = 
True 
   88     if (args.udp_hostname 
is not None) 
and (args.udp_port 
is not None):
 
   89         basf2.logging.add_udp(args.udp_hostname, args.udp_port)
 
   92     basf2.set_realm(
"online")
 
   97 def start_path(args, location):
 
   99     Create and return a path used for HLT and ExpressReco running 
  101     path = basf2.create_path()
 
  103     input_buffer_module_name = 
"" 
  104     if location == constants.Location.expressreco:
 
  105         input_buffer_module_name = 
"Rbuf2Ds" 
  106     elif location == constants.Location.hlt:
 
  107         input_buffer_module_name = 
"Raw2Ds" 
  109         basf2.B2FATAL(f
"Does not know location {location}")
 
  112     if not args.input_file:
 
  113         path.add_module(input_buffer_module_name, RingBufferName=args.input_buffer_name)
 
  115         if args.input_file.endswith(
".sroot"):
 
  116             path.add_module(
'SeqRootInput', inputFileName=args.input_file)
 
  118             path.add_module(
'RootInput', inputFileName=args.input_file)
 
  121     if not args.histo_output_file:
 
  122         path.add_module(
'DqmHistoManager', Port=args.histo_port, DumpInterval=1000, workDirName=tempfile.gettempdir()+
"/")
 
  124         workdir = os.path.dirname(args.histo_output_file)
 
  125         filename = os.path.basename(args.histo_output_file)
 
  126         path.add_module(
'HistoManager', histoFileName=filename, workDirName=workdir)
 
  131 def start_zmq_path(args, location):
 
  133     reco_path = basf2.Path()
 
  135     if location == constants.Location.expressreco:
 
  136         input_module = path.add_module(
"HLTZMQ2Ds", input=args.input, addExpressRecoObjects=
True)
 
  137     elif location == constants.Location.hlt:
 
  138         input_module = path.add_module(
"HLTZMQ2Ds", input=args.input)
 
  140         basf2.B2FATAL(f
"Does not know location {location}")
 
  142     input_module.if_value(
"==0", reco_path, basf2.AfterConditionPath.CONTINUE)
 
  143     reco_path.add_module(
"HLTDQM2ZMQ", output=args.dqm, sendOutInterval=30)
 
  145     return path, reco_path
 
  148 def add_hlt_processing(path,
 
  149                        run_type=constants.RunTypes.beam,
 
  150                        softwaretrigger_mode=constants.SoftwareTriggerModes.filter,
 
  153                        unpacker_components=None,
 
  154                        reco_components=None,
 
  155                        create_hlt_unit_histograms=True,
 
  158     Add all modules for processing on HLT filter machines 
  164     path.add_module(
'StatisticsSummary').set_name(
'Sum_Wait')
 
  166     if unpacker_components 
is None:
 
  167         unpacker_components = constants.DEFAULT_HLT_COMPONENTS
 
  168     if reco_components 
is None:
 
  169         reco_components = constants.DEFAULT_HLT_COMPONENTS
 
  171     check_components(unpacker_components)
 
  172     check_components(reco_components)
 
  178         path.add_module(
"PruneDataStore", matchEntries=constants.HLT_INPUT_OBJECTS)
 
  181     path_utils.add_geometry_if_not_present(path)
 
  182     path.add_module(
'StatisticsSummary').set_name(
'Sum_Initialization')
 
  185     add_unpackers(path, components=unpacker_components, writeKLMDigitRaws=
True)
 
  186     path.add_module(
'StatisticsSummary').set_name(
'Sum_Unpackers')
 
  189     accept_path = basf2.Path()
 
  192     path_utils.add_pre_filter_reconstruction(path, run_type=run_type, components=reco_components, **kwargs)
 
  195     path_utils.add_filter_software_trigger(path, store_array_debug_prescale=1)
 
  198     path_utils.add_hlt_dqm(path, run_type=run_type, components=reco_components, dqm_mode=constants.DQMModes.before_filter,
 
  199                            create_hlt_unit_histograms=create_hlt_unit_histograms)
 
  202     if softwaretrigger_mode == constants.SoftwareTriggerModes.filter:
 
  204         hlt_filter_module = path_utils.add_filter_module(path)
 
  208         path_utils.hlt_event_abort(hlt_filter_module, 
"==0", ROOT.Belle2.EventMetaData.c_HLTDiscard)
 
  210         hlt_filter_module.if_value(
"==1", accept_path, basf2.AfterConditionPath.CONTINUE)
 
  211     elif softwaretrigger_mode == constants.SoftwareTriggerModes.monitor:
 
  213         path.add_path(accept_path)
 
  215         basf2.B2FATAL(f
"The software trigger mode {softwaretrigger_mode} is not supported.")
 
  218     path_utils.add_post_filter_reconstruction(accept_path, run_type=run_type, components=reco_components)
 
  221     add_roi_finder(accept_path)
 
  222     accept_path.add_module(
'StatisticsSummary').set_name(
'Sum_ROI_Finder')
 
  225     path_utils.add_hlt_dqm(
 
  228         components=reco_components,
 
  229         dqm_mode=constants.DQMModes.filtered,
 
  230         create_hlt_unit_histograms=create_hlt_unit_histograms)
 
  237     pxd_ignores_hlt_decision = (softwaretrigger_mode == constants.SoftwareTriggerModes.monitor)
 
  238     add_roi_payload_assembler(path, ignore_hlt_decision=pxd_ignores_hlt_decision)
 
  239     path.add_module(
'StatisticsSummary').set_name(
'Sum_ROI_Payload_Assembler')
 
  242     path_utils.add_hlt_dqm(path, run_type=run_type, components=reco_components, dqm_mode=constants.DQMModes.all_events,
 
  243                            create_hlt_unit_histograms=create_hlt_unit_histograms)
 
  247         path_utils.add_store_only_rawdata_path(path)
 
  248     path.add_module(
'StatisticsSummary').set_name(
'Sum_Close_Event')
 
  251 def add_expressreco_processing(path,
 
  252                                run_type=constants.RunTypes.beam,
 
  253                                select_only_accepted_events=False,
 
  256                                unpacker_components=None,
 
  257                                reco_components=None,
 
  258                                do_reconstruction=True,
 
  261     Add all modules for processing on the ExpressReco machines 
  263     if unpacker_components 
is None:
 
  264         unpacker_components = constants.DEFAULT_EXPRESSRECO_COMPONENTS
 
  265     if reco_components 
is None:
 
  266         reco_components = constants.DEFAULT_EXPRESSRECO_COMPONENTS
 
  268     check_components(unpacker_components)
 
  269     check_components(reco_components)
 
  274     if select_only_accepted_events:
 
  275         skim_module = path.add_module(
"TriggerSkim", triggerLines=[
"software_trigger_cut&all&total_result"], resultOnMissing=0)
 
  276         skim_module.if_value(
"==0", basf2.Path(), basf2.AfterConditionPath.END)
 
  282         path.add_module(
"PruneDataStore", matchEntries=constants.EXPRESSRECO_INPUT_OBJECTS)
 
  284     path_utils.add_geometry_if_not_present(path)
 
  285     add_unpackers(path, components=unpacker_components, writeKLMDigitRaws=
True)
 
  288     basf2.set_module_parameters(path, 
"PXDPostErrorChecker", CriticalErrorMask=0)
 
  290     if do_reconstruction:
 
  291         if run_type == constants.RunTypes.beam:
 
  292             add_reconstruction(path, components=reco_components, pruneTracks=
False,
 
  293                                skipGeometryAdding=
True, add_trigger_calculation=
False, **kwargs)
 
  294         elif run_type == constants.RunTypes.cosmic:
 
  295             add_cosmics_reconstruction(path, components=reco_components, pruneTracks=
False,
 
  296                                        skipGeometryAdding=
True, **kwargs)
 
  298             basf2.B2FATAL(f
"Run Type {run_type} not supported.")
 
  300         basf2.set_module_parameters(path, 
"SVDTimeGrouping", forceGroupingFromDB=
False,
 
  301                                     isEnabledIn6Samples=
True, isEnabledIn3Samples=
True)
 
  303     path_utils.add_expressreco_dqm(path, run_type, components=reco_components)
 
  306         path.add_module(
"PruneDataStore", matchEntries=constants.ALWAYS_SAVE_OBJECTS + constants.RAWDATA_OBJECTS +
 
  307                         constants.PROCESSED_OBJECTS)
 
  310 def finalize_path(path, args, location, show_progress_bar=True):
 
  312     Add the required output modules for expressreco/HLT 
  314     save_objects = constants.ALWAYS_SAVE_OBJECTS + constants.RAWDATA_OBJECTS
 
  315     if location == constants.Location.expressreco:
 
  316         save_objects += constants.PROCESSED_OBJECTS
 
  318     if show_progress_bar:
 
  319         path.add_module(
"Progress")
 
  322     basf2.set_streamobjs(save_objects)
 
  327     output_buffer_module_name = 
"" 
  328     if location == constants.Location.expressreco:
 
  329         output_buffer_module_name = 
"Ds2Sample" 
  330     elif location == constants.Location.hlt:
 
  331         output_buffer_module_name = 
"Ds2Rbuf" 
  333         basf2.B2FATAL(f
"Does not know location {location}")
 
  335     if not args.output_file:
 
  336         path.add_module(output_buffer_module_name, RingBufferName=args.output_buffer_name,
 
  337                         saveObjs=save_objects)
 
  339         if args.output_file.endswith(
".sroot"):
 
  340             path.add_module(
"SeqRootOutput", saveObjs=save_objects, outputFileName=args.output_file)
 
  343             path.add_module(
"RootOutput", outputFileName=args.output_file)
 
  346 def finalize_zmq_path(path, args, location):
 
  348     Add the required output modules for expressreco/HLT 
  350     save_objects = constants.ALWAYS_SAVE_OBJECTS + constants.RAWDATA_OBJECTS
 
  351     if location == constants.Location.expressreco:
 
  352         save_objects += constants.PROCESSED_OBJECTS
 
  355     basf2.set_streamobjs(save_objects)
 
  357     if location == constants.Location.expressreco:
 
  358         path.add_module(
"HLTDs2ZMQ", output=args.output, raw=
False, outputConfirmation=
False)
 
  359     elif location == constants.Location.hlt:
 
  360         path.add_module(
"HLTDs2ZMQ", output=args.output, raw=
True, outputConfirmation=
True)
 
  362         basf2.B2FATAL(f
"Does not know location {location}")