Belle II Software development
ZMQHistogramOutput.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include <daq/hbasf2/connections/ZMQHistogramOutput.h>
9
10#include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
11
12#include <framework/pcore/EvtMessage.h>
13#include <framework/logging/Logger.h>
14
15#include <THashList.h>
16#include <TBufferJSON.h>
17
18#include <boost/format.hpp>
19#include <boost/algorithm/string/replace.hpp>
20
21#include <lz4.h>
22#include <zmq.hpp>
23
24using namespace Belle2;
25
26template<class AConnectionClass>
27void ZMQHistogramOutput<AConnectionClass>::handleEvent(std::unique_ptr<ZMQIdMessage> message)
28{
29 if (message->isMessage(EMessageTypes::c_lastEventMessage)) {
30 // This run is over, so merge everything a final time and send it out
31 mergeAndSend(EMessageTypes::c_lastEventMessage);
32 clear();
33 return;
34 } else if (message->isMessage(EMessageTypes::c_terminateMessage)) {
35 // Everything is over, so send out a terminate message
36 mergeAndSend(EMessageTypes::c_terminateMessage);
37 clear();
38 return;
39 }
40
41 auto identity = message->getIdentity();
42 auto& dataMessage = message->getDataMessage();
43 auto& additionalMessage = message->getAdditionalDataMessage();
44
45 // Check if we get messages with the same event number
46 EventMetaData* eventMetaDataPtr = nullptr;
47 std::string additionalMessageAsString(additionalMessage.data<const char>(), additionalMessage.size());
48 TBufferJSON::FromJSON(eventMetaDataPtr, additionalMessageAsString.c_str());
49
50 if (m_storedExperiment and * m_storedExperiment != eventMetaDataPtr->getExperiment()) {
51 B2ERROR("Having received histograms with different experiment numbers! Not a good sign!");
52 AConnectionClass::increment("different_event_meta_data");
53 clear();
54 return;
55 }
56 if (m_storedRun and * m_storedRun != eventMetaDataPtr->getRun()) {
57 B2ERROR("Having received histograms with different run numbers! Not a good sign!");
58 AConnectionClass::increment("different_event_meta_data");
59 clear();
60 return;
61 }
62
63 m_storedExperiment = eventMetaDataPtr->getExperiment();
64 m_storedRun = eventMetaDataPtr->getRun();
65
66 if (message->isMessage(Belle2::EMessageTypes::c_compressedDataMessage)) {
67 int uncompressedSize = LZ4_decompress_safe(dataMessage.data<char>(), &m_uncompressedBuffer[0],
68 dataMessage.size(), m_maximalUncompressedBufferSize);
69 B2ASSERT("Decompression failed", uncompressedSize > 0);
70
71 std::unique_ptr<Belle2::EvtMessage> msg(new Belle2::EvtMessage(&m_uncompressedBuffer[0]));
72
73 AConnectionClass::average("uncompressed_size", msg->size());
74
75 B2DEBUG(10,
76 "After decompression, the size is " << uncompressedSize << " and the message itself says " << msg->size());
77 HistogramMapping histogram(std::move(msg));
78 if (not histogram.empty()) {
79 m_storedMessages[identity] = std::move(histogram);
80 }
81 } else if (message->isMessage(Belle2::EMessageTypes::c_rawDataMessage)) {
82 std::unique_ptr<Belle2::EvtMessage> msg(new Belle2::EvtMessage(dataMessage.data<char>()));
83 HistogramMapping histogram(std::move(msg));
84 if (not histogram.empty()) {
85 m_storedMessages[identity] = std::move(histogram);
86 }
87 } else {
88 B2FATAL("Unknown message type!");
89 }
90
91 AConnectionClass::log("stored_identities", static_cast<long>(m_storedMessages.size()));
92}
93
94template<class AConnectionClass>
96{
97 AConnectionClass::mergeAndSend(m_storedMessages, m_storedExperiment, m_storedRun, messageType);
98}
99
100template<class AConnectionClass>
102{
103 AConnectionClass::increment("histogram_clears");
104 AConnectionClass::logTime("last_clear");
105
106 AConnectionClass::clear();
107
108 m_storedMessages.clear();
109 m_storedExperiment.reset();
110 m_storedRun.reset();
111}
112
Store event, run, and experiment numbers.
Definition: EventMetaData.h:33
int getRun() const
Run Getter.
int getExperiment() const
Experiment Getter.
Class to manage streamed object.
Definition: EvtMessage.h:59
Utility to store received histograms (hierarchical tree structures) from clients (as an event message...
bool empty() const
Check if there are no stored histograms.
Add the common functionality to the histogram output classes.
void handleEvent(std::unique_ptr< ZMQIdMessage > message)
Handle a new message to be "sent" (what this means is up to the base class) as described above.
void clear()
Forward a clear call to the base class and clear the stored messages. Should be called on run start.
void mergeAndSend(EMessageTypes messageType=EMessageTypes::c_eventMessage)
Forward a merge call to the base class handing over the stored messages.
EMessageTypes
Type the messages can have.
Abstract base class for different kinds of events.