Belle II Software  release-06-01-15
ZMQHistogramOutput.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include <daq/hbasf2/connections/ZMQHistogramOutput.h>
9 
10 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
11 
12 #include <framework/pcore/EvtMessage.h>
13 #include <daq/dqm/DqmMemFile.h>
14 #include <framework/logging/Logger.h>
15 
16 #include <THashList.h>
17 #include <TBufferJSON.h>
18 
19 #include <boost/format.hpp>
20 #include <boost/algorithm/string/replace.hpp>
21 
22 #include <lz4.h>
23 #include <zmq.hpp>
24 
25 using namespace Belle2;
26 
27 template<class AConnectionClass>
28 void ZMQHistogramOutput<AConnectionClass>::handleEvent(std::unique_ptr<ZMQIdMessage> message)
29 {
30  if (message->isMessage(EMessageTypes::c_lastEventMessage)) {
31  // This run is over, so merge everything a final time and send it out
32  mergeAndSend(EMessageTypes::c_lastEventMessage);
33  clear();
34  return;
35  } else if (message->isMessage(EMessageTypes::c_terminateMessage)) {
36  // Everything is over, so send out a terminate message
37  mergeAndSend(EMessageTypes::c_terminateMessage);
38  clear();
39  return;
40  }
41 
42  auto identity = message->getIdentity();
43  auto& dataMessage = message->getDataMessage();
44  auto& additionalMessage = message->getAdditionalDataMessage();
45 
46  // Check if we get messages with the same event number
47  EventMetaData* eventMetaDataPtr = nullptr;
48  std::string additionalMessageAsString(additionalMessage.data<const char>(), additionalMessage.size());
49  TBufferJSON::FromJSON(eventMetaDataPtr, additionalMessageAsString.c_str());
50 
51  if (m_storedExperiment and * m_storedExperiment != eventMetaDataPtr->getExperiment()) {
52  B2ERROR("Having received histograms with different experiment numbers! Not a good sign!");
53  AConnectionClass::increment("different_event_meta_data");
54  clear();
55  return;
56  }
57  if (m_storedRun and * m_storedRun != eventMetaDataPtr->getRun()) {
58  B2ERROR("Having received histograms with different run numbers! Not a good sign!");
59  AConnectionClass::increment("different_event_meta_data");
60  clear();
61  return;
62  }
63 
64  m_storedExperiment = eventMetaDataPtr->getExperiment();
65  m_storedRun = eventMetaDataPtr->getRun();
66 
67  if (message->isMessage(Belle2::EMessageTypes::c_compressedDataMessage)) {
68  int uncompressedSize = LZ4_decompress_safe(dataMessage.data<char>(), &m_uncompressedBuffer[0],
69  dataMessage.size(), m_maximalUncompressedBufferSize);
70  B2ASSERT("Decompression failed", uncompressedSize > 0);
71 
72  std::unique_ptr<Belle2::EvtMessage> msg(new Belle2::EvtMessage(&m_uncompressedBuffer[0]));
73 
74  AConnectionClass::average("uncompressed_size", msg->size());
75 
76  B2DEBUG(10,
77  "After decompression, the size is " << uncompressedSize << " and the message itself says " << msg->size());
78  HistogramMapping histogram(std::move(msg));
79  if (not histogram.empty()) {
80  m_storedMessages[identity] = std::move(histogram);
81  }
82  } else if (message->isMessage(Belle2::EMessageTypes::c_rawDataMessage)) {
83  std::unique_ptr<Belle2::EvtMessage> msg(new Belle2::EvtMessage(dataMessage.data<char>()));
84  HistogramMapping histogram(std::move(msg));
85  if (not histogram.empty()) {
86  m_storedMessages[identity] = std::move(histogram);
87  }
88  } else {
89  B2FATAL("Unknown message type!");
90  }
91 
92  AConnectionClass::log("stored_identities", static_cast<long>(m_storedMessages.size()));
93 }
94 
95 template<class AConnectionClass>
97 {
98  AConnectionClass::mergeAndSend(m_storedMessages, m_storedExperiment, m_storedRun, messageType);
99 }
100 
101 template<class AConnectionClass>
103 {
104  AConnectionClass::increment("histogram_clears");
105  AConnectionClass::logTime("last_clear");
106 
107  AConnectionClass::clear();
108 
109  m_storedMessages.clear();
110  m_storedExperiment.reset();
111  m_storedRun.reset();
112 }
113 
Store event, run, and experiment numbers.
Definition: EventMetaData.h:33
int getRun() const
Run Getter.
int getExperiment() const
Experiment Getter.
Class to manage streamed object.
Definition: EvtMessage.h:59
Utility to store received histograms (hierarchical tree structures) from clients (as an event message...
bool empty() const
Check if there are no stored histograms.
Add the common functionality to the histogram output classes.
void handleEvent(std::unique_ptr< ZMQIdMessage > message)
Handle a new message to be "sent" (what this means is up to the base class) as described above.
void clear()
Forward a clear call to the base class and clear the stored messages. Should be called on run start.
void mergeAndSend(EMessageTypes messageType=EMessageTypes::c_eventMessage)
Forward a merge call to the base class handing over the stored messages.
EMessageTypes
Type the messages can have.
Abstract base class for different kinds of events.