Belle II Software  release-08-01-10
HLTDQM2ZMQ.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include <daq/hbasf2/modules/HLTDQM2ZMQ.h>
9 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
10 #include <framework/pcore/RbTuple.h>
11 #include <framework/core/HistoModule.h>
12 
13 using namespace std;
14 using namespace Belle2;
15 
16 REG_MODULE(HLTDQM2ZMQ)
17 
19 {
20  setDescription(
21  "Module to collect DQM histograms (written out by HistoModules) and "
22  "send them every time period to a running ZMQ DQM server "
23  "(either a finalhistoserver or a proxyhistorver). "
24  "The address as well as the send interval are module parameters. "
25  "As the old DQM module, this module works by streaming everything in the current ROOT main "
26  "directory, which is either a TDirectory or a TH1. For the specific implementation on how "
27  "the streaming is done, please see the HLTStreamHelper class. "
28  "The histogram sending is handled via a confirmed connection (output in this case), "
29  "so all the usual conventions for a confirmed connection apply. "
30  "This module does only makes sense to run on the HLT, it is not useful for local "
31  "file writeout."
32  );
33  setPropertyFlags(EModulePropFlags::c_ParallelProcessingCertified);
34 
35  addParam("output", m_param_output, "ZMQ address to send the histograms to (the local histo server)");
36  addParam("sendOutInterval", m_param_sendOutInterval, "Time interval in seconds to send out the histograms. "
37  "Please note that the full stack of DQM histo servers"
38  "could delay this, as each of them have a timeout.",
39  m_param_sendOutInterval);
40 }
41 
42 void HLTDQM2ZMQModule::event()
43 {
44  try {
45  if (m_firstEvent) {
46  m_streamHelper.initialize();
47  m_parent.reset(new ZMQParent);
48  m_output.reset(new ZMQConfirmedOutput(m_param_output, m_parent));
49  m_start = std::chrono::system_clock::now();
50 
51  m_firstEvent = false;
52  }
53 
54  auto currentTime = std::chrono::system_clock::now();
55  auto timeDifference = std::chrono::duration_cast<std::chrono::seconds>(currentTime - m_start).count();
56  if (timeDifference > m_param_sendOutInterval) {
57  sendOutHistograms();
58  m_start = std::chrono::system_clock::now();
59  }
60  } catch (zmq::error_t& error) {
61  if (error.num() == EINTR) {
62  // Well, that is probably ok. It will be handled by the framework, just go out here.
63  B2DEBUG(10, "Received an signal interrupt during the event call. Will return");
64  return;
65  }
66  // This is an unexpected error: better report it.
67  B2ERROR("ZMQ Error while calling the event: " << error.num());
68  }
69 }
70 
71 void HLTDQM2ZMQModule::beginRun()
72 {
73  if (m_histogramsDefined) {
74  return;
75  }
76  const auto& modules = RbTupleManager::Instance().getHistDefiningModules();
77  for (const auto& module : modules) {
78  B2INFO(module->getName() << " is a histo module");
79  auto* histoModule = dynamic_cast<HistoModule*>(module);
80  B2ASSERT("The added module is not a histogram module!", histoModule);
81  histoModule->defineHisto();
82  }
83 
84  m_histogramsDefined = true;
85 }
86 
87 void HLTDQM2ZMQModule::endRun()
88 {
89  if (m_firstEvent) {
90  return;
91  }
92 
93  try {
94  B2DEBUG(10, "Sending out old run message");
95  sendOutHistograms();
96  auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_lastEventMessage);
97  m_output->handleEvent(std::move(message), false, 1000);
98  } catch (zmq::error_t& error) {
99  if (error.num() == EINTR) {
100  // Well, that is probably ok. It will be handled by the framework, just go out here.
101  B2DEBUG(10, "Received an signal interrupt during the event call. Will return");
102  return;
103  }
104  // This is an unexpected error: better report it.
105  B2ERROR("ZMQ Error while calling the event: " << error.num());
106  }
107 
108  // TODO: we need to get rid of the histograms, or?
109 }
110 
111 void HLTDQM2ZMQModule::terminate()
112 {
113  if (m_firstEvent) {
114  return;
115  }
116 
117  try {
118  B2DEBUG(10, "Sending out terminate message");
119  auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_terminateMessage);
120  m_output->handleEvent(std::move(message));
121  } catch (zmq::error_t& error) {
122  if (error.num() == EINTR) {
123  // Well, that is probably ok. It will be handled by the framework, just go out here.
124  B2DEBUG(10, "Received an signal interrupt during the event call. Will return");
125  return;
126  }
127  // This is an unexpected error: better report it.
128  B2ERROR("ZMQ Error while calling the event: " << error.num());
129  }
130 }
131 
132 void HLTDQM2ZMQModule::sendOutHistograms()
133 {
134  if (m_firstEvent) {
135  return;
136  }
137 
138  auto msg = m_streamHelper.streamHistograms();
139  m_output->handleEvent(std::move(msg), false, 1000);
140 }
Module to collect DQM histograms (written out by HistoModules) and send them every time period to a r...
Definition: HLTDQM2ZMQ.h:41
HistoModule.h is supposed to be used instead of Module.h for the modules with histogram definitions t...
Definition: HistoModule.h:29
Base class for Modules.
Definition: Module.h:72
Output part of a confirmed connection.
A helper class for creating ZMQ sockets keeping track of the ZMQ context and terminating it if needed...
Definition: ZMQParent.h:39
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
Abstract base class for different kinds of events.