Belle II Software  release-08-01-10
ZMQHistogramOutput.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include <daq/hbasf2/connections/ZMQHistogramOutput.h>
9 
10 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
11 
12 #include <framework/pcore/EvtMessage.h>
13 #include <framework/logging/Logger.h>
14 
15 #include <THashList.h>
16 #include <TBufferJSON.h>
17 
18 #include <boost/format.hpp>
19 #include <boost/algorithm/string/replace.hpp>
20 
21 #include <lz4.h>
22 #include <zmq.hpp>
23 
24 using namespace Belle2;
25 
26 template<class AConnectionClass>
27 void ZMQHistogramOutput<AConnectionClass>::handleEvent(std::unique_ptr<ZMQIdMessage> message)
28 {
29  if (message->isMessage(EMessageTypes::c_lastEventMessage)) {
30  // This run is over, so merge everything a final time and send it out
31  mergeAndSend(EMessageTypes::c_lastEventMessage);
32  clear();
33  return;
34  } else if (message->isMessage(EMessageTypes::c_terminateMessage)) {
35  // Everything is over, so send out a terminate message
36  mergeAndSend(EMessageTypes::c_terminateMessage);
37  clear();
38  return;
39  }
40 
41  auto identity = message->getIdentity();
42  auto& dataMessage = message->getDataMessage();
43  auto& additionalMessage = message->getAdditionalDataMessage();
44 
45  // Check if we get messages with the same event number
46  EventMetaData* eventMetaDataPtr = nullptr;
47  std::string additionalMessageAsString(additionalMessage.data<const char>(), additionalMessage.size());
48  TBufferJSON::FromJSON(eventMetaDataPtr, additionalMessageAsString.c_str());
49 
50  if (m_storedExperiment and * m_storedExperiment != eventMetaDataPtr->getExperiment()) {
51  B2ERROR("Having received histograms with different experiment numbers! Not a good sign!");
52  AConnectionClass::increment("different_event_meta_data");
53  clear();
54  return;
55  }
56  if (m_storedRun and * m_storedRun != eventMetaDataPtr->getRun()) {
57  B2ERROR("Having received histograms with different run numbers! Not a good sign!");
58  AConnectionClass::increment("different_event_meta_data");
59  clear();
60  return;
61  }
62 
63  m_storedExperiment = eventMetaDataPtr->getExperiment();
64  m_storedRun = eventMetaDataPtr->getRun();
65 
66  if (message->isMessage(Belle2::EMessageTypes::c_compressedDataMessage)) {
67  int uncompressedSize = LZ4_decompress_safe(dataMessage.data<char>(), &m_uncompressedBuffer[0],
68  dataMessage.size(), m_maximalUncompressedBufferSize);
69  B2ASSERT("Decompression failed", uncompressedSize > 0);
70 
71  std::unique_ptr<Belle2::EvtMessage> msg(new Belle2::EvtMessage(&m_uncompressedBuffer[0]));
72 
73  AConnectionClass::average("uncompressed_size", msg->size());
74 
75  B2DEBUG(10,
76  "After decompression, the size is " << uncompressedSize << " and the message itself says " << msg->size());
77  HistogramMapping histogram(std::move(msg));
78  if (not histogram.empty()) {
79  m_storedMessages[identity] = std::move(histogram);
80  }
81  } else if (message->isMessage(Belle2::EMessageTypes::c_rawDataMessage)) {
82  std::unique_ptr<Belle2::EvtMessage> msg(new Belle2::EvtMessage(dataMessage.data<char>()));
83  HistogramMapping histogram(std::move(msg));
84  if (not histogram.empty()) {
85  m_storedMessages[identity] = std::move(histogram);
86  }
87  } else {
88  B2FATAL("Unknown message type!");
89  }
90 
91  AConnectionClass::log("stored_identities", static_cast<long>(m_storedMessages.size()));
92 }
93 
94 template<class AConnectionClass>
96 {
97  AConnectionClass::mergeAndSend(m_storedMessages, m_storedExperiment, m_storedRun, messageType);
98 }
99 
100 template<class AConnectionClass>
102 {
103  AConnectionClass::increment("histogram_clears");
104  AConnectionClass::logTime("last_clear");
105 
106  AConnectionClass::clear();
107 
108  m_storedMessages.clear();
109  m_storedExperiment.reset();
110  m_storedRun.reset();
111 }
112 
Store event, run, and experiment numbers.
Definition: EventMetaData.h:33
int getRun() const
Run Getter.
int getExperiment() const
Experiment Getter.
Class to manage streamed object.
Definition: EvtMessage.h:59
Utility to store received histograms (hierarchical tree structures) from clients (as an event message...
bool empty() const
Check if there are no stored histograms.
Add the common functionality to the histogram output classes.
void handleEvent(std::unique_ptr< ZMQIdMessage > message)
Handle a new message to be "sent" (what this means is up to the base class) as described above.
void clear()
Forward a clear call to the base class and clear the stored messages. Should be called on run start.
void mergeAndSend(EMessageTypes messageType=EMessageTypes::c_eventMessage)
Forward a merge call to the base class handing over the stored messages.
EMessageTypes
Type the messages can have.
Abstract base class for different kinds of events.