Belle II Software  release-05-01-25
HLTDQM2ZMQ.cc
1 #include <daq/hbasf2/modules/HLTDQM2ZMQ.h>
2 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
3 #include <framework/pcore/RbTuple.h>
4 #include <framework/core/HistoModule.h>
5 
6 using namespace std;
7 using namespace Belle2;
8 
9 REG_MODULE(HLTDQM2ZMQ)
10 
12 {
13  setDescription(
14  "Module to collect DQM histograms (written out by HistoModules) and "
15  "send them every time period to a running ZMQ DQM server "
16  "(either a finalhistoserver or a proxyhistorver). "
17  "The address as well as the send interval are module parameters. "
18  "As the old DQM module, this module works by streaming everything in the current ROOT main "
19  "directory, which is either a TDirectory or a TH1. For the specific implementation on how "
20  "the streaming is done, please see the HLTStreamHelper class. "
21  "The histogram sending is handled via a confirmed connection (output in this case), "
22  "so all the usual conventions for a confirmed connection apply. "
23  "This module does only makes sense to run on the HLT, it is not useful for local "
24  "file writeout."
25  );
26  setPropertyFlags(EModulePropFlags::c_ParallelProcessingCertified);
27 
28  addParam("output", m_param_output, "ZMQ address to send the histograms to (the local histo server)");
29  addParam("sendOutInterval", m_param_sendOutInterval, "Time interval in seconds to send out the histograms. "
30  "Please note that the full stack of DQM histo servers"
31  "could delay this, as each of them have a timeout.",
32  m_param_sendOutInterval);
33 }
34 
35 void HLTDQM2ZMQModule::event()
36 {
37  try {
38  if (m_firstEvent) {
39  m_streamHelper.initialize();
40  m_parent.reset(new ZMQParent);
41  m_output.reset(new ZMQConfirmedOutput(m_param_output, m_parent));
42  m_start = std::chrono::system_clock::now();
43 
44  m_firstEvent = false;
45  }
46 
47  auto currentTime = std::chrono::system_clock::now();
48  auto timeDifference = std::chrono::duration_cast<std::chrono::seconds>(currentTime - m_start).count();
49  if (timeDifference > m_param_sendOutInterval) {
50  sendOutHistograms();
51  m_start = std::chrono::system_clock::now();
52  }
53  } catch (zmq::error_t& error) {
54  if (error.num() == EINTR) {
55  // Well, that is probably ok. It will be handled by the framework, just go out here.
56  B2DEBUG(10, "Received an signal interrupt during the event call. Will return");
57  return;
58  }
59  // This is an unexpected error: better report it.
60  B2ERROR("ZMQ Error while calling the event: " << error.num());
61  }
62 }
63 
64 void HLTDQM2ZMQModule::beginRun()
65 {
66  if (m_histogramsDefined) {
67  return;
68  }
69  const auto& modules = RbTupleManager::Instance().getHistDefiningModules();
70  for (const auto& module : modules) {
71  B2INFO(module->getName() << " is a histo module");
72  auto* histoModule = dynamic_cast<HistoModule*>(module);
73  B2ASSERT("The added module is not a histogram module!", histoModule);
74  histoModule->defineHisto();
75  }
76 
77  m_histogramsDefined = true;
78 }
79 
80 void HLTDQM2ZMQModule::endRun()
81 {
82  if (m_firstEvent) {
83  return;
84  }
85 
86  try {
87  B2DEBUG(10, "Sending out old run message");
88  sendOutHistograms();
89  auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_lastEventMessage);
90  m_output->handleEvent(std::move(message), false, 1000);
91  } catch (zmq::error_t& error) {
92  if (error.num() == EINTR) {
93  // Well, that is probably ok. It will be handled by the framework, just go out here.
94  B2DEBUG(10, "Received an signal interrupt during the event call. Will return");
95  return;
96  }
97  // This is an unexpected error: better report it.
98  B2ERROR("ZMQ Error while calling the event: " << error.num());
99  }
100 
101  // TODO: we need to get rid of the histograms, or?
102 }
103 
104 void HLTDQM2ZMQModule::terminate()
105 {
106  if (m_firstEvent) {
107  return;
108  }
109 
110  try {
111  B2DEBUG(10, "Sending out terminate message");
112  auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_terminateMessage);
113  m_output->handleEvent(std::move(message));
114  } catch (zmq::error_t& error) {
115  if (error.num() == EINTR) {
116  // Well, that is probably ok. It will be handled by the framework, just go out here.
117  B2DEBUG(10, "Received an signal interrupt during the event call. Will return");
118  return;
119  }
120  // This is an unexpected error: better report it.
121  B2ERROR("ZMQ Error while calling the event: " << error.num());
122  }
123 }
124 
125 void HLTDQM2ZMQModule::sendOutHistograms()
126 {
127  if (m_firstEvent) {
128  return;
129  }
130 
131  auto msg = m_streamHelper.streamHistograms();
132  m_output->handleEvent(std::move(msg), false, 1000);
133 }
Belle2::ZMQConfirmedOutput
Output part of a confirmed connection.
Definition: ZMQConfirmedConnection.h:116
REG_MODULE
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:652
Belle2::Module
Base class for Modules.
Definition: Module.h:74
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::ZMQParent
A helper class for creating ZMQ sockets keeping track of the ZMQ context and terminating it if needed...
Definition: ZMQParent.h:49
Belle2::HLTDQM2ZMQModule
Module to collect DQM histograms (written out by HistoModules) and send them every time period to a r...
Definition: HLTDQM2ZMQ.h:51
Belle2::HistoModule
HistoModule.h is supposed to be used instead of Module.h for the modules with histogram definitions t...
Definition: HistoModule.h:29