Belle II Software development
HLTDQM2ZMQ.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include <daq/hbasf2/modules/HLTDQM2ZMQ.h>
9#include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
10#include <framework/pcore/RbTuple.h>
11#include <framework/core/HistoModule.h>
12
13using namespace std;
14using namespace Belle2;
15
16REG_MODULE(HLTDQM2ZMQ);
17
19{
21 "Module to collect DQM histograms (written out by HistoModules) and "
22 "send them every time period to a running ZMQ DQM server "
23 "(either a finalhistoserver or a proxyhistorver). "
24 "The address as well as the send interval are module parameters. "
25 "As the old DQM module, this module works by streaming everything in the current ROOT main "
26 "directory, which is either a TDirectory or a TH1. For the specific implementation on how "
27 "the streaming is done, please see the HLTStreamHelper class. "
28 "The histogram sending is handled via a confirmed connection (output in this case), "
29 "so all the usual conventions for a confirmed connection apply. "
30 "This module does only makes sense to run on the HLT, it is not useful for local "
31 "file writeout."
32 );
34
35 addParam("output", m_param_output, "ZMQ address to send the histograms to (the local histo server)");
36 addParam("sendOutInterval", m_param_sendOutInterval, "Time interval in seconds to send out the histograms. "
37 "Please note that the full stack of DQM histo servers"
38 "could delay this, as each of them have a timeout.",
40}
41
43{
44 try {
45 if (m_firstEvent) {
47 m_parent.reset(new ZMQParent);
49 m_start = std::chrono::system_clock::now();
50
51 m_firstEvent = false;
52 }
53
54 auto currentTime = std::chrono::system_clock::now();
55 auto timeDifference = std::chrono::duration_cast<std::chrono::seconds>(currentTime - m_start).count();
56 if (timeDifference > m_param_sendOutInterval) {
58 m_start = std::chrono::system_clock::now();
59 }
60 } catch (zmq::error_t& error) {
61 if (error.num() == EINTR) {
62 // Well, that is probably ok. It will be handled by the framework, just go out here.
63 B2DEBUG(10, "Received an signal interrupt during the event call. Will return");
64 return;
65 }
66 // This is an unexpected error: better report it.
67 B2ERROR("ZMQ Error while calling the event: " << error.num());
68 }
69}
70
72{
74 return;
75 }
76 const auto& modules = RbTupleManager::Instance().getHistDefiningModules();
77 for (const auto& module : modules) {
78 B2INFO(module->getName() << " is a histo module");
79 auto* histoModule = dynamic_cast<HistoModule*>(module);
80 B2ASSERT("The added module is not a histogram module!", histoModule);
81 histoModule->defineHisto();
82 }
83
85}
86
88{
89 if (m_firstEvent) {
90 return;
91 }
92
93 try {
94 B2DEBUG(10, "Sending out old run message");
96 auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_lastEventMessage);
97 m_output->handleEvent(std::move(message), false, 1000);
98 } catch (zmq::error_t& error) {
99 if (error.num() == EINTR) {
100 // Well, that is probably ok. It will be handled by the framework, just go out here.
101 B2DEBUG(10, "Received an signal interrupt during the event call. Will return");
102 return;
103 }
104 // This is an unexpected error: better report it.
105 B2ERROR("ZMQ Error while calling the event: " << error.num());
106 }
107
108 // TODO: we need to get rid of the histograms, or?
109}
110
112{
113 if (m_firstEvent) {
114 return;
115 }
116
117 try {
118 B2DEBUG(10, "Sending out terminate message");
119 auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_terminateMessage);
120 m_output->handleEvent(std::move(message));
121 } catch (zmq::error_t& error) {
122 if (error.num() == EINTR) {
123 // Well, that is probably ok. It will be handled by the framework, just go out here.
124 B2DEBUG(10, "Received an signal interrupt during the event call. Will return");
125 return;
126 }
127 // This is an unexpected error: better report it.
128 B2ERROR("ZMQ Error while calling the event: " << error.num());
129 }
130}
131
133{
134 if (m_firstEvent) {
135 return;
136 }
137
138 auto msg = m_streamHelper.streamHistograms();
139 m_output->handleEvent(std::move(msg), false, 1000);
140}
bool m_firstEvent
Are we still in the first event?
Definition: HLTDQM2ZMQ.h:81
std::string m_param_output
Module parameter: histogram server address.
Definition: HLTDQM2ZMQ.h:67
void sendOutHistograms()
Helper function to serialize and send out the histograms.
Definition: HLTDQM2ZMQ.cc:132
void event() override
On the first event, initialize the connection and the streamer.
Definition: HLTDQM2ZMQ.cc:42
void endRun() override
Stream the histograms one last time and send out a run end message. We rely on all histogram modules ...
Definition: HLTDQM2ZMQ.cc:87
std::shared_ptr< ZMQParent > m_parent
ZMQ Parent needed for the connections.
Definition: HLTDQM2ZMQ.h:72
void terminate() override
Stream the histograms one last time and send out a terminate message. We rely on all histogram module...
Definition: HLTDQM2ZMQ.cc:111
HLTDQM2ZMQModule()
Register the module parameters.
Definition: HLTDQM2ZMQ.cc:18
void beginRun() override
Call the defineHisto function of all histogram modules registered at the RbTupleManager singleton.
Definition: HLTDQM2ZMQ.cc:71
bool m_histogramsDefined
Are the histograms already defined (e.g. defineHisto is called)? TODO: should we reset this after the...
Definition: HLTDQM2ZMQ.h:83
HLTStreamHelper m_streamHelper
Streamer utility. TODO: in principle we do not need this, could be static!
Definition: HLTDQM2ZMQ.h:77
std::unique_ptr< ZMQConfirmedOutput > m_output
Confirmed connection to the histogram server.
Definition: HLTDQM2ZMQ.h:74
unsigned int m_param_sendOutInterval
Module parameter: send out interval in seconds.
Definition: HLTDQM2ZMQ.h:69
std::chrono::system_clock::time_point m_start
Point in time when the current interval counting started.
Definition: HLTDQM2ZMQ.h:85
std::unique_ptr< ZMQNoIdMessage > streamHistograms(bool compressed=true)
Stream all objects derived from TH1 into a message. Only the last subfolder is streamed by prefixing ...
void initialize()
Initialize this class. Call this e.g. in the first event.
HistoModule.h is supposed to be used instead of Module.h for the modules with histogram definitions t...
Definition: HistoModule.h:29
Base class for Modules.
Definition: Module.h:72
void setDescription(const std::string &description)
Sets the description of the module.
Definition: Module.cc:214
void setPropertyFlags(unsigned int propertyFlags)
Sets the flags for the module properties.
Definition: Module.cc:208
@ c_ParallelProcessingCertified
This module can be run in parallel processing mode safely (All I/O must be done through the data stor...
Definition: Module.h:80
static RbTupleManager & Instance()
Access to singleton.
Definition: RbTuple.cc:40
const std::vector< Module * > & getHistDefiningModules() const
Return the list of modules that have defined histograms.
Definition: RbTuple.h:66
Output part of a confirmed connection.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
A helper class for creating ZMQ sockets keeping track of the ZMQ context and terminating it if needed...
Definition: ZMQParent.h:39
void addParam(const std::string &name, T &paramVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
Definition: Module.h:560
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
Abstract base class for different kinds of events.
STL namespace.