8#include <daq/hbasf2/connections/ZMQHistogramOutput.h>
10#include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
12#include <framework/pcore/EvtMessage.h>
13#include <framework/logging/Logger.h>
16#include <TBufferJSON.h>
23template<
class AConnectionClass>
26 if (message->isMessage(EMessageTypes::c_lastEventMessage)) {
31 }
else if (message->isMessage(EMessageTypes::c_terminateMessage)) {
38 auto identity = message->getIdentity();
39 auto& dataMessage = message->getDataMessage();
40 auto& additionalMessage = message->getAdditionalDataMessage();
44 std::string additionalMessageAsString(additionalMessage.data<
const char>(), additionalMessage.size());
45 TBufferJSON::FromJSON(eventMetaDataPtr, additionalMessageAsString.c_str());
48 B2ERROR(
"Having received histograms with different experiment numbers! Not a good sign!");
49 AConnectionClass::increment(
"different_event_meta_data");
54 B2ERROR(
"Having received histograms with different run numbers! Not a good sign!");
55 AConnectionClass::increment(
"different_event_meta_data");
63 if (message->isMessage(Belle2::EMessageTypes::c_compressedDataMessage)) {
66 B2ASSERT(
"Decompression failed", uncompressedSize > 0);
70 AConnectionClass::average(
"uncompressed_size", msg->size());
73 "After decompression, the size is " << uncompressedSize <<
" and the message itself says " << msg->size());
75 if (not histogram.empty()) {
78 }
else if (message->isMessage(Belle2::EMessageTypes::c_rawDataMessage)) {
79 std::unique_ptr<Belle2::EvtMessage> msg(
new Belle2::EvtMessage(dataMessage.data<
char>()));
81 if (not histogram.empty()) {
82 m_storedMessages[identity] = std::move(histogram);
85 B2FATAL(
"Unknown message type!");
88 AConnectionClass::log(
"stored_identities",
static_cast<long>(m_storedMessages.size()));
91template<
class AConnectionClass>
97template<
class AConnectionClass>
100 AConnectionClass::increment(
"histogram_clears");
101 AConnectionClass::logTime(
"last_clear");
103 AConnectionClass::clear();
Class to manage streamed object.
Utility to store received histograms (hierarchical tree structures) from clients (as an event message...
Add the common functionality to the histogram output classes.
std::optional< int > m_storedRun
If already received: the run number of the data (on mismatch, everything is cleared)
std::vector< char > m_uncompressedBuffer
unsigned int m_maximalUncompressedBufferSize
std::optional< int > m_storedExperiment
If already received: the experiment number of the data (on mismatch, everything is cleared)
void handleEvent(std::unique_ptr< ZMQIdMessage > message)
Handle a new message to be "sent" (what this means is up to the base class) as described above.
std::map< std::string, HistogramMapping > m_storedMessages
void clear()
Forward a clear call to the base class and clear the stored messages. Should be called on run start.
void mergeAndSend(EMessageTypes messageType=EMessageTypes::c_eventMessage)
Forward a merge call to the base class handing over the stored messages.
EMessageTypes
Type the messages can have.
Abstract base class for different kinds of events.