1 #include <daq/hbasf2/modules/HLTDQM2ZMQ.h>
2 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
3 #include <framework/pcore/RbTuple.h>
4 #include <framework/core/HistoModule.h>
14 "Module to collect DQM histograms (written out by HistoModules) and "
15 "send them every time period to a running ZMQ DQM server "
16 "(either a finalhistoserver or a proxyhistorver). "
17 "The address as well as the send interval are module parameters. "
18 "As the old DQM module, this module works by streaming everything in the current ROOT main "
19 "directory, which is either a TDirectory or a TH1. For the specific implementation on how "
20 "the streaming is done, please see the HLTStreamHelper class. "
21 "The histogram sending is handled via a confirmed connection (output in this case), "
22 "so all the usual conventions for a confirmed connection apply. "
23 "This module does only makes sense to run on the HLT, it is not useful for local "
26 setPropertyFlags(EModulePropFlags::c_ParallelProcessingCertified);
28 addParam(
"output", m_param_output,
"ZMQ address to send the histograms to (the local histo server)");
29 addParam(
"sendOutInterval", m_param_sendOutInterval,
"Time interval in seconds to send out the histograms. "
30 "Please note that the full stack of DQM histo servers"
31 "could delay this, as each of them have a timeout.",
32 m_param_sendOutInterval);
35 void HLTDQM2ZMQModule::event()
39 m_streamHelper.initialize();
42 m_start = std::chrono::system_clock::now();
47 auto currentTime = std::chrono::system_clock::now();
48 auto timeDifference = std::chrono::duration_cast<std::chrono::seconds>(currentTime - m_start).count();
49 if (timeDifference > m_param_sendOutInterval) {
51 m_start = std::chrono::system_clock::now();
53 }
catch (zmq::error_t& error) {
54 if (error.num() == EINTR) {
56 B2DEBUG(10,
"Received an signal interrupt during the event call. Will return");
60 B2ERROR(
"ZMQ Error while calling the event: " << error.num());
64 void HLTDQM2ZMQModule::beginRun()
66 if (m_histogramsDefined) {
69 const auto& modules = RbTupleManager::Instance().getHistDefiningModules();
70 for (
const auto& module : modules) {
71 B2INFO(module->getName() <<
" is a histo module");
72 auto* histoModule =
dynamic_cast<HistoModule*
>(module);
73 B2ASSERT(
"The added module is not a histogram module!", histoModule);
74 histoModule->defineHisto();
77 m_histogramsDefined =
true;
80 void HLTDQM2ZMQModule::endRun()
87 B2DEBUG(10,
"Sending out old run message");
89 auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_lastEventMessage);
90 m_output->handleEvent(std::move(message),
false, 1000);
91 }
catch (zmq::error_t& error) {
92 if (error.num() == EINTR) {
94 B2DEBUG(10,
"Received an signal interrupt during the event call. Will return");
98 B2ERROR(
"ZMQ Error while calling the event: " << error.num());
104 void HLTDQM2ZMQModule::terminate()
111 B2DEBUG(10,
"Sending out terminate message");
112 auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_terminateMessage);
113 m_output->handleEvent(std::move(message));
114 }
catch (zmq::error_t& error) {
115 if (error.num() == EINTR) {
117 B2DEBUG(10,
"Received an signal interrupt during the event call. Will return");
121 B2ERROR(
"ZMQ Error while calling the event: " << error.num());
125 void HLTDQM2ZMQModule::sendOutHistograms()
131 auto msg = m_streamHelper.streamHistograms();
132 m_output->handleEvent(std::move(msg),
false, 1000);