Belle II Software development
ZMQHistogramOutput.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include <daq/hbasf2/connections/ZMQHistogramOutput.h>
9
10#include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
11
12#include <framework/pcore/EvtMessage.h>
13#include <framework/logging/Logger.h>
14
15#include <THashList.h>
16#include <TBufferJSON.h>
17
18#include <lz4.h>
19#include <zmq.hpp>
20
21using namespace Belle2;
22
23template<class AConnectionClass>
24void ZMQHistogramOutput<AConnectionClass>::handleEvent(std::unique_ptr<ZMQIdMessage> message)
25{
26 if (message->isMessage(EMessageTypes::c_lastEventMessage)) {
27 // This run is over, so merge everything a final time and send it out
28 mergeAndSend(EMessageTypes::c_lastEventMessage);
29 clear();
30 return;
31 } else if (message->isMessage(EMessageTypes::c_terminateMessage)) {
32 // Everything is over, so send out a terminate message
33 mergeAndSend(EMessageTypes::c_terminateMessage);
34 clear();
35 return;
36 }
37
38 auto identity = message->getIdentity();
39 auto& dataMessage = message->getDataMessage();
40 auto& additionalMessage = message->getAdditionalDataMessage();
41
42 // Check if we get messages with the same event number
43 EventMetaData* eventMetaDataPtr = nullptr;
44 std::string additionalMessageAsString(additionalMessage.data<const char>(), additionalMessage.size());
45 TBufferJSON::FromJSON(eventMetaDataPtr, additionalMessageAsString.c_str());
46
47 if (m_storedExperiment and * m_storedExperiment != eventMetaDataPtr->getExperiment()) {
48 B2ERROR("Having received histograms with different experiment numbers! Not a good sign!");
49 AConnectionClass::increment("different_event_meta_data");
50 clear();
51 return;
52 }
53 if (m_storedRun and * m_storedRun != eventMetaDataPtr->getRun()) {
54 B2ERROR("Having received histograms with different run numbers! Not a good sign!");
55 AConnectionClass::increment("different_event_meta_data");
56 clear();
57 return;
58 }
59
60 m_storedExperiment = eventMetaDataPtr->getExperiment();
61 m_storedRun = eventMetaDataPtr->getRun();
63 if (message->isMessage(Belle2::EMessageTypes::c_compressedDataMessage)) {
64 int uncompressedSize = LZ4_decompress_safe(dataMessage.data<char>(), &m_uncompressedBuffer[0],
65 dataMessage.size(), m_maximalUncompressedBufferSize);
66 B2ASSERT("Decompression failed", uncompressedSize > 0);
67
68 std::unique_ptr<Belle2::EvtMessage> msg(new Belle2::EvtMessage(&m_uncompressedBuffer[0]));
69
70 AConnectionClass::average("uncompressed_size", msg->size());
71
72 B2DEBUG(10,
73 "After decompression, the size is " << uncompressedSize << " and the message itself says " << msg->size());
74 HistogramMapping histogram(std::move(msg));
75 if (not histogram.empty()) {
76 m_storedMessages[identity] = std::move(histogram);
77 }
78 } else if (message->isMessage(Belle2::EMessageTypes::c_rawDataMessage)) {
79 std::unique_ptr<Belle2::EvtMessage> msg(new Belle2::EvtMessage(dataMessage.data<char>()));
80 HistogramMapping histogram(std::move(msg));
81 if (not histogram.empty()) {
82 m_storedMessages[identity] = std::move(histogram);
83 }
84 } else {
85 B2FATAL("Unknown message type!");
86 }
87
88 AConnectionClass::log("stored_identities", static_cast<long>(m_storedMessages.size()));
89}
90
91template<class AConnectionClass>
93{
94 AConnectionClass::mergeAndSend(m_storedMessages, m_storedExperiment, m_storedRun, messageType);
95}
96
97template<class AConnectionClass>
99{
100 AConnectionClass::increment("histogram_clears");
101 AConnectionClass::logTime("last_clear");
102
103 AConnectionClass::clear();
104
105 m_storedMessages.clear();
106 m_storedExperiment.reset();
107 m_storedRun.reset();
108}
109
Store event, run, and experiment numbers.
int getRun() const
Run Getter.
int getExperiment() const
Experiment Getter.
Class to manage streamed object.
Definition EvtMessage.h:59
Utility to store received histograms (hierarchical tree structures) from clients (as an event message...
Add the common functionality to the histogram output classes.
std::optional< int > m_storedRun
If already received: the run number of the data (on mismatch, everything is cleared)
std::optional< int > m_storedExperiment
If already received: the experiment number of the data (on mismatch, everything is cleared)
void handleEvent(std::unique_ptr< ZMQIdMessage > message)
Handle a new message to be "sent" (what this means is up to the base class) as described above.
std::map< std::string, HistogramMapping > m_storedMessages
void clear()
Forward a clear call to the base class and clear the stored messages. Should be called on run start.
void mergeAndSend(EMessageTypes messageType=EMessageTypes::c_eventMessage)
Forward a merge call to the base class handing over the stored messages.
EMessageTypes
Type the messages can have.
Abstract base class for different kinds of events.