10 #include <daq/hbasf2/connections/ZMQHistogramOutput.h>
12 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
14 #include <framework/pcore/EvtMessage.h>
15 #include <daq/dqm/DqmMemFile.h>
16 #include <framework/logging/Logger.h>
18 #include <THashList.h>
19 #include <TBufferJSON.h>
21 #include <boost/format.hpp>
22 #include <boost/algorithm/string/replace.hpp>
29 template<
class AConnectionClass>
32 if (message->isMessage(EMessageTypes::c_lastEventMessage)) {
34 mergeAndSend(EMessageTypes::c_lastEventMessage);
37 }
else if (message->isMessage(EMessageTypes::c_terminateMessage)) {
39 mergeAndSend(EMessageTypes::c_terminateMessage);
44 auto identity = message->getIdentity();
45 auto& dataMessage = message->getDataMessage();
46 auto& additionalMessage = message->getAdditionalDataMessage();
50 std::string additionalMessageAsString(additionalMessage.data<
const char>(), additionalMessage.size());
51 TBufferJSON::FromJSON(eventMetaDataPtr, additionalMessageAsString.c_str());
53 if (m_storedExperiment and * m_storedExperiment != eventMetaDataPtr->
getExperiment()) {
54 B2ERROR(
"Having received histograms with different experiment numbers! Not a good sign!");
55 AConnectionClass::increment(
"different_event_meta_data");
59 if (m_storedRun and * m_storedRun != eventMetaDataPtr->
getRun()) {
60 B2ERROR(
"Having received histograms with different run numbers! Not a good sign!");
61 AConnectionClass::increment(
"different_event_meta_data");
67 m_storedRun = eventMetaDataPtr->
getRun();
69 if (message->isMessage(Belle2::EMessageTypes::c_compressedDataMessage)) {
70 int uncompressedSize = LZ4_decompress_safe(dataMessage.data<
char>(), &m_uncompressedBuffer[0],
71 dataMessage.size(), m_maximalUncompressedBufferSize);
72 B2ASSERT(
"Decompression failed", uncompressedSize > 0);
74 std::unique_ptr<Belle2::EvtMessage> msg(
new Belle2::EvtMessage(&m_uncompressedBuffer[0]));
76 AConnectionClass::average(
"uncompressed_size", msg->
size());
79 "After decompression, the size is " << uncompressedSize <<
" and the message itself says " << msg->
size());
81 if (not histogram.
empty()) {
82 m_storedMessages[identity] = std::move(histogram);
84 }
else if (message->isMessage(Belle2::EMessageTypes::c_rawDataMessage)) {
85 std::unique_ptr<Belle2::EvtMessage> msg(
new Belle2::EvtMessage(dataMessage.data<
char>()));
87 if (not histogram.
empty()) {
88 m_storedMessages[identity] = std::move(histogram);
91 B2FATAL(
"Unknown message type!");
94 AConnectionClass::log(
"stored_identities",
static_cast<long>(m_storedMessages.size()));
97 template<
class AConnectionClass>
100 AConnectionClass::mergeAndSend(m_storedMessages, m_storedExperiment, m_storedRun, messageType);
103 template<
class AConnectionClass>
106 AConnectionClass::increment(
"histogram_clears");
107 AConnectionClass::logTime(
"last_clear");
109 AConnectionClass::clear();
111 m_storedMessages.clear();
112 m_storedExperiment.reset();