8#include <daq/hbasf2/connections/ZMQHistogramOutput.h>
10#include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
12#include <framework/pcore/EvtMessage.h>
13#include <framework/logging/Logger.h>
16#include <TBufferJSON.h>
18#include <boost/format.hpp>
19#include <boost/algorithm/string/replace.hpp>
26template<
class AConnectionClass>
29 if (message->isMessage(EMessageTypes::c_lastEventMessage)) {
31 mergeAndSend(EMessageTypes::c_lastEventMessage);
34 }
else if (message->isMessage(EMessageTypes::c_terminateMessage)) {
36 mergeAndSend(EMessageTypes::c_terminateMessage);
41 auto identity = message->getIdentity();
42 auto& dataMessage = message->getDataMessage();
43 auto& additionalMessage = message->getAdditionalDataMessage();
47 std::string additionalMessageAsString(additionalMessage.data<
const char>(), additionalMessage.size());
48 TBufferJSON::FromJSON(eventMetaDataPtr, additionalMessageAsString.c_str());
50 if (m_storedExperiment and * m_storedExperiment != eventMetaDataPtr->
getExperiment()) {
51 B2ERROR(
"Having received histograms with different experiment numbers! Not a good sign!");
52 AConnectionClass::increment(
"different_event_meta_data");
56 if (m_storedRun and * m_storedRun != eventMetaDataPtr->
getRun()) {
57 B2ERROR(
"Having received histograms with different run numbers! Not a good sign!");
58 AConnectionClass::increment(
"different_event_meta_data");
64 m_storedRun = eventMetaDataPtr->
getRun();
66 if (message->isMessage(Belle2::EMessageTypes::c_compressedDataMessage)) {
67 int uncompressedSize = LZ4_decompress_safe(dataMessage.data<
char>(), &m_uncompressedBuffer[0],
68 dataMessage.size(), m_maximalUncompressedBufferSize);
69 B2ASSERT(
"Decompression failed", uncompressedSize > 0);
71 std::unique_ptr<Belle2::EvtMessage> msg(
new Belle2::EvtMessage(&m_uncompressedBuffer[0]));
73 AConnectionClass::average(
"uncompressed_size", msg->size());
76 "After decompression, the size is " << uncompressedSize <<
" and the message itself says " << msg->size());
78 if (not histogram.
empty()) {
79 m_storedMessages[identity] = std::move(histogram);
81 }
else if (message->isMessage(Belle2::EMessageTypes::c_rawDataMessage)) {
82 std::unique_ptr<Belle2::EvtMessage> msg(
new Belle2::EvtMessage(dataMessage.data<
char>()));
84 if (not histogram.
empty()) {
85 m_storedMessages[identity] = std::move(histogram);
88 B2FATAL(
"Unknown message type!");
91 AConnectionClass::log(
"stored_identities",
static_cast<long>(m_storedMessages.size()));
94template<
class AConnectionClass>
97 AConnectionClass::mergeAndSend(m_storedMessages, m_storedExperiment, m_storedRun, messageType);
100template<
class AConnectionClass>
103 AConnectionClass::increment(
"histogram_clears");
104 AConnectionClass::logTime(
"last_clear");
106 AConnectionClass::clear();
108 m_storedMessages.clear();
109 m_storedExperiment.reset();
Class to manage streamed object.
Utility to store received histograms (hierarchical tree structures) from clients (as an event message...
bool empty() const
Check if there are no stored histograms.
Add the common functionality to the histogram output classes.
void handleEvent(std::unique_ptr< ZMQIdMessage > message)
Handle a new message to be "sent" (what this means is up to the base class) as described above.
void clear()
Forward a clear call to the base class and clear the stored messages. Should be called on run start.
void mergeAndSend(EMessageTypes messageType=EMessageTypes::c_eventMessage)
Forward a merge call to the base class handing over the stored messages.
EMessageTypes
Type the messages can have.
Abstract base class for different kinds of events.