Belle II Software  release-05-01-25
ZMQHistogramOutput.cc
1 /**************************************************************************
2  * BASF2 (Belle Analysis Framework 2) *
3  * Copyright(C) 2019 - Belle II Collaboration *
4  * *
5  * Author: The Belle II Collaboration *
6  * Contributors: Nils Braun *
7  * *
8  * This software is provided "as is" without any warranty. *
9  **************************************************************************/
10 #include <daq/hbasf2/connections/ZMQHistogramOutput.h>
11 
12 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
13 
14 #include <framework/pcore/EvtMessage.h>
15 #include <daq/dqm/DqmMemFile.h>
16 #include <framework/logging/Logger.h>
17 
18 #include <THashList.h>
19 #include <TBufferJSON.h>
20 
21 #include <boost/format.hpp>
22 #include <boost/algorithm/string/replace.hpp>
23 
24 #include <lz4.h>
25 #include <zmq.hpp>
26 
27 using namespace Belle2;
28 
29 template<class AConnectionClass>
30 void ZMQHistogramOutput<AConnectionClass>::handleEvent(std::unique_ptr<ZMQIdMessage> message)
31 {
32  if (message->isMessage(EMessageTypes::c_lastEventMessage)) {
33  // This run is over, so merge everything a final time and send it out
34  mergeAndSend(EMessageTypes::c_lastEventMessage);
35  clear();
36  return;
37  } else if (message->isMessage(EMessageTypes::c_terminateMessage)) {
38  // Everything is over, so send out a terminate message
39  mergeAndSend(EMessageTypes::c_terminateMessage);
40  clear();
41  return;
42  }
43 
44  auto identity = message->getIdentity();
45  auto& dataMessage = message->getDataMessage();
46  auto& additionalMessage = message->getAdditionalDataMessage();
47 
48  // Check if we get messages with the same event number
49  EventMetaData* eventMetaDataPtr = nullptr;
50  std::string additionalMessageAsString(additionalMessage.data<const char>(), additionalMessage.size());
51  TBufferJSON::FromJSON(eventMetaDataPtr, additionalMessageAsString.c_str());
52 
53  if (m_storedExperiment and * m_storedExperiment != eventMetaDataPtr->getExperiment()) {
54  B2ERROR("Having received histograms with different experiment numbers! Not a good sign!");
55  AConnectionClass::increment("different_event_meta_data");
56  clear();
57  return;
58  }
59  if (m_storedRun and * m_storedRun != eventMetaDataPtr->getRun()) {
60  B2ERROR("Having received histograms with different run numbers! Not a good sign!");
61  AConnectionClass::increment("different_event_meta_data");
62  clear();
63  return;
64  }
65 
66  m_storedExperiment = eventMetaDataPtr->getExperiment();
67  m_storedRun = eventMetaDataPtr->getRun();
68 
69  if (message->isMessage(Belle2::EMessageTypes::c_compressedDataMessage)) {
70  int uncompressedSize = LZ4_decompress_safe(dataMessage.data<char>(), &m_uncompressedBuffer[0],
71  dataMessage.size(), m_maximalUncompressedBufferSize);
72  B2ASSERT("Decompression failed", uncompressedSize > 0);
73 
74  std::unique_ptr<Belle2::EvtMessage> msg(new Belle2::EvtMessage(&m_uncompressedBuffer[0]));
75 
76  AConnectionClass::average("uncompressed_size", msg->size());
77 
78  B2DEBUG(10,
79  "After decompression, the size is " << uncompressedSize << " and the message itself says " << msg->size());
80  HistogramMapping histogram(std::move(msg));
81  if (not histogram.empty()) {
82  m_storedMessages[identity] = std::move(histogram);
83  }
84  } else if (message->isMessage(Belle2::EMessageTypes::c_rawDataMessage)) {
85  std::unique_ptr<Belle2::EvtMessage> msg(new Belle2::EvtMessage(dataMessage.data<char>()));
86  HistogramMapping histogram(std::move(msg));
87  if (not histogram.empty()) {
88  m_storedMessages[identity] = std::move(histogram);
89  }
90  } else {
91  B2FATAL("Unknown message type!");
92  }
93 
94  AConnectionClass::log("stored_identities", static_cast<long>(m_storedMessages.size()));
95 }
96 
97 template<class AConnectionClass>
99 {
100  AConnectionClass::mergeAndSend(m_storedMessages, m_storedExperiment, m_storedRun, messageType);
101 }
102 
103 template<class AConnectionClass>
105 {
106  AConnectionClass::increment("histogram_clears");
107  AConnectionClass::logTime("last_clear");
108 
109  AConnectionClass::clear();
110 
111  m_storedMessages.clear();
112  m_storedExperiment.reset();
113  m_storedRun.reset();
114 }
115 
Belle2::EMessageTypes
EMessageTypes
Type the messages can have.
Definition: ZMQDefinitions.h:26
Belle2::EvtMessage
Class to manage streamed object.
Definition: EvtMessage.h:60
Belle2::HistogramMapping
Utility to store received histograms (hierarchical tree structures) from clients (as an event message...
Definition: HistogramMapping.h:40
Belle2::HistogramMapping::empty
bool empty() const
Check if there are no stored histograms.
Definition: HistogramMapping.cc:78
Belle2::EvtMessage::size
int size() const
Get size of message including headers.
Definition: EvtMessage.cc:95
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::EventMetaData::getExperiment
int getExperiment() const
Experiment Getter.
Definition: EventMetaData.h:174
Belle2::EventMetaData::getRun
int getRun() const
Run Getter.
Definition: EventMetaData.h:161
Belle2::EventMetaData
Store event, run, and experiment numbers.
Definition: EventMetaData.h:43
Belle2::ZMQHistogramOutput::handleEvent
void handleEvent(std::unique_ptr< ZMQIdMessage > message)
Handle a new message to be "sent" (what this means is up to the base class) as described above.
Definition: ZMQHistogramOutput.cc:30
Belle2::ZMQHistogramOutput::mergeAndSend
void mergeAndSend(EMessageTypes messageType=EMessageTypes::c_eventMessage)
Forward a merge call to the base class handing over the stored messages.
Definition: ZMQHistogramOutput.cc:98
Belle2::ZMQHistogramOutput::clear
void clear()
Forward a clear call to the base class and clear the stored messages. Should be called on run start.
Definition: ZMQHistogramOutput.cc:104
Belle2::ZMQHistogramOutput
Add the common functionality to the histogram output classes.
Definition: ZMQHistogramOutput.h:66