Belle II Software  release-05-01-25
ZMQHistogramConnection.cc
1 /**************************************************************************
2  * BASF2 (Belle Analysis Framework 2) *
3  * Copyright(C) 2019 - Belle II Collaboration *
4  * *
5  * Author: The Belle II Collaboration *
6  * Contributors: Nils Braun *
7  * *
8  * This software is provided "as is" without any warranty. *
9  **************************************************************************/
10 #include <daq/hbasf2/connections/ZMQHistogramConnection.h>
11 
12 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
13 
14 #include <framework/logging/Logger.h>
15 
16 #include <TH1.h>
17 #include <TBufferJSON.h>
18 
19 #include <lz4.h>
20 
21 #include <boost/range/combine.hpp>
22 #include <boost/format.hpp>
23 #include <boost/algorithm/string/replace.hpp>
24 
25 using namespace Belle2;
26 
27 ZMQHistoServerToFileOutput::ZMQHistoServerToFileOutput(unsigned int maximalUncompressedBufferSize,
28  const std::string& dqmFileName,
29  const std::string& rootFileName) :
30  m_dqmMemFileName(dqmFileName), m_rootFileName(rootFileName)
31 {
32 
33  // We do not free this on purpose!
34  if (not m_dqmMemFileName.empty()) {
35  m_sharedMemory = new SharedMem(m_dqmMemFileName.c_str(), maximalUncompressedBufferSize);
36  }
37 
38  log("histogram_merges", 0l);
39  log("last_merged_histograms", 0l);
40  log("average_merged_histograms", 0l);
41  log("last_merge", "");
42  log("last_written_file_name", "");
43  log("memory_file_size", 0l);
44 }
45 
46 void ZMQHistoServerToFileOutput::mergeAndSend(const std::map<std::string, HistogramMapping>& storedMessages,
47  const std::optional<unsigned int>& experiment,
48  const std::optional<unsigned int>& run, EMessageTypes messageType)
49 {
50  if (storedMessages.empty()) {
51  return;
52  }
53 
54  B2ASSERT("Experiment and run must be set at this stage", experiment and run);
55 
56  increment("histogram_merges");
57 
58  // We do not care if this is the run end, or run start or anything. We just write it out.
59  HistogramMapping mergeHistograms;
60 
61  TMemFile memFile(m_dqmMemFileName.c_str(), "RECREATE");
62  memFile.cd();
63 
64  log("last_merged_histograms", static_cast<long>(storedMessages.size()));
65  average("average_merged_histograms", static_cast<double>(storedMessages.size()));
66 
67  logTime("last_merge");
68  for (const auto& keyValue : storedMessages) {
69  const auto& histogram = keyValue.second;
70  mergeHistograms += histogram;
71  }
72 
73  memFile.Write();
74 
75  average("memory_file_size", memFile.GetSize());
76 
77  if (m_sharedMemory) {
78  m_sharedMemory->lock();
79  B2ASSERT("Writing to shared memory failed!",
80  memFile.CopyTo(m_sharedMemory->ptr(), memFile.GetSize()) == memFile.GetSize());
81  m_sharedMemory->unlock();
82  }
83 
84  // Also write the memory content out to a regular ROOT file
85  auto outputFileName = boost::replace_all_copy(m_rootFileName, "{run_number}", (boost::format("%05d") % *run).str());
86  boost::replace_all(outputFileName, "{experiment_number}", (boost::format("%04d") % *experiment).str());
87  memFile.Cp(outputFileName.c_str(), false);
88 
89  log("last_written_file_name", outputFileName);
90 
91  mergeHistograms.clear();
92 }
93 
95 {
96  B2FATAL("There should be no data coming here!");
97 }
98 
99 std::vector<zmq::socket_t*> ZMQHistoServerToFileOutput::getSockets() const
100 {
101  return {};
102 }
103 
105 {
106  // Clear the shared memory by writing an empty ROOT file into it
107  if (m_sharedMemory) {
108  TMemFile memFile(m_dqmMemFileName.c_str(), "RECREATE");
109  memFile.Close();
110  m_sharedMemory->lock();
111  B2ASSERT("Writing to shared memory failed!",
112  memFile.CopyTo(m_sharedMemory->ptr(), memFile.GetSize()) == memFile.GetSize());
113  m_sharedMemory->unlock();
114  }
115 }
116 
117 ZMQHistoServerToZMQOutput::ZMQHistoServerToZMQOutput(const std::string& outputAddress, const std::shared_ptr<ZMQParent>& parent) :
118  m_output(outputAddress, parent)
119 {
120  m_output.log("histogram_merges", 0l);
121  m_output.log("last_merged_histograms", 0l);
122  m_output.log("average_merged_histograms", 0l);
123  m_output.log("last_merge", "");
124  m_output.log("size_before_compression", 0.0);
125  m_output.log("size_after_compression", 0.0);
126 }
127 
128 void ZMQHistoServerToZMQOutput::mergeAndSend(const std::map<std::string, HistogramMapping>& storedMessages,
129  const std::optional<unsigned int>& experiment,
130  const std::optional<unsigned int>& run, EMessageTypes messageType)
131 {
132  if (messageType == EMessageTypes::c_lastEventMessage) {
133  // merge one last time
134  mergeAndSend(storedMessages, experiment, run, EMessageTypes::c_eventMessage);
135  // and then send out a stop signal by ourself
136  auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_lastEventMessage);
137  m_output.handleEvent(std::move(message));
138  return;
139  } else if (messageType == EMessageTypes::c_terminateMessage) {
140  // merge one last time
141  mergeAndSend(storedMessages, experiment, run, EMessageTypes::c_eventMessage);
142  // and send out a terminate message
143  auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_terminateMessage);
144  m_output.handleEvent(std::move(message));
145  return;
146  }
147 
148  B2ASSERT("This should be an event message!", messageType == EMessageTypes::c_eventMessage);
149 
150  // Makes no sense to send out an empty event
151  if (storedMessages.empty()) {
152  return;
153  }
154 
155  B2ASSERT("Experiment and run must be set at this stage", experiment and run);
156 
157  m_output.increment("histogram_merges");
158 
159  HistogramMapping mergeHistograms;
160 
161  m_output.log("last_merged_histograms", static_cast<long>(storedMessages.size()));
162  m_output.average("average_merged_histograms", static_cast<double>(storedMessages.size()));
163  m_output.logTime("last_merge");
164 
165  for (const auto& keyValue : storedMessages) {
166  const auto& histogram = keyValue.second;
167  mergeHistograms += histogram;
168  }
169 
170  auto eventMessage = mergeHistograms.toMessage();
171 
172  if (m_outputBuffer.empty()) {
174  }
175 
176  m_output.average("size_before_compression", eventMessage->size());
177  int size = m_maximalCompressedSize;
178  size = LZ4_compress_default(eventMessage->buffer(), &m_outputBuffer[0], eventMessage->size(), size);
179  B2ASSERT("Compression failed", size > 0);
180  m_output.average("size_after_compression", size);
181 
182  zmq::message_t message(&m_outputBuffer[0], size);
183 
184  EventMetaData eventMetaData(0, *run, *experiment);
185  auto eventInformationString = TBufferJSON::ToJSON(&eventMetaData);
186  zmq::message_t additionalEventMessage(eventInformationString.Data(), eventInformationString.Length());
187 
188  auto zmqMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_compressedDataMessage, std::move(message),
189  std::move(additionalEventMessage));
190  m_output.handleEvent(std::move(zmqMessage), true, 20000);
191 }
192 
193 ZMQHistoServerToRawOutput::ZMQHistoServerToRawOutput(const std::string& outputAddress, const std::shared_ptr<ZMQParent>& parent) :
194  m_output(outputAddress, false, parent)
195 {
196  m_output.log("histogram_merges", 0l);
197  m_output.log("last_merged_histograms", 0l);
198  m_output.log("average_merged_histograms", 0l);
199  m_output.log("last_merge", "");
200  m_output.log("size_before_compression", 0.0);
201 }
202 
203 void ZMQHistoServerToRawOutput::mergeAndSend(const std::map<std::string, HistogramMapping>& storedMessages,
204  const std::optional<unsigned int>& experiment,
205  const std::optional<unsigned int>& run, EMessageTypes messageType)
206 {
207  if (messageType == EMessageTypes::c_lastEventMessage) {
208  // merge one last time
209  mergeAndSend(storedMessages, experiment, run, EMessageTypes::c_eventMessage);
210  // but do not send out any message
211  return;
212  } else if (messageType == EMessageTypes::c_terminateMessage) {
213  // merge one last time
214  mergeAndSend(storedMessages, experiment, run, EMessageTypes::c_eventMessage);
215  return;
216  }
217 
218  B2ASSERT("This should be an event message!", messageType == EMessageTypes::c_eventMessage);
219 
220  // Makes no sense to send out an empty event
221  if (storedMessages.empty()) {
222  return;
223  }
224 
225  B2ASSERT("Experiment and run must be set at this stage", experiment and run);
226 
227  m_output.increment("histogram_merges");
228 
229  HistogramMapping mergeHistograms;
230 
231  m_output.log("last_merged_histograms", static_cast<long>(storedMessages.size()));
232  m_output.average("average_merged_histograms", static_cast<double>(storedMessages.size()));
233  m_output.logTime("last_merge");
234 
235  for (const auto& keyValue : storedMessages) {
236  const auto& histogram = keyValue.second;
237  mergeHistograms += histogram;
238  }
239 
240  auto eventMessage = mergeHistograms.toMessage();
241 
242  m_output.average("size_before_compression", eventMessage->size());
243 
244  zmq::message_t message(eventMessage->buffer(), eventMessage->size());
245  m_output.handleEvent(std::move(message));
246 }
Belle2::ZMQHistoServerToFileOutput::clear
void clear()
Clear the shared memory.
Definition: ZMQHistogramConnection.cc:104
Belle2::EMessageTypes
EMessageTypes
Type the messages can have.
Definition: ZMQDefinitions.h:26
Belle2::ZMQHistoServerToFileOutput::m_rootFileName
std::string m_rootFileName
Output file name (possible with placeholders)
Definition: ZMQHistogramConnection.h:87
Belle2::ZMQHistoServerToZMQOutput::m_output
ZMQConfirmedOutput m_output
The output connection used for sending the histograms.
Definition: ZMQHistogramConnection.h:121
Belle2::ZMQConfirmedOutput::handleEvent
void handleEvent(std::unique_ptr< ZMQNoIdMessage > message, bool requireConfirmation=true, int maximalWaitTime=10000)
Send the message to the output (a message without a ID as there is only a single output).
Definition: ZMQConfirmedConnection.cc:230
Belle2::HistogramMapping
Utility to store received histograms (hierarchical tree structures) from clients (as an event message...
Definition: HistogramMapping.h:40
Belle2::ZMQLogger::logTime
void logTime(const std::string &key)
Store the current time as a string under the given key.
Definition: ZMQLogger.cc:44
Belle2::HistogramMapping::toMessage
std::unique_ptr< Belle2::EvtMessage > toMessage() const
Construct an EvtMessage by serializing the content of the internal histogram storage....
Definition: HistogramMapping.cc:90
Belle2::ZMQHistoServerToFileOutput::getSockets
std::vector< zmq::socket_t * > getSockets() const final
No sockets to poll on, so return an empty list.
Definition: ZMQHistogramConnection.cc:99
Belle2::ZMQLogger::log
void log(const std::string &key, const AClass &value)
Store a value under a certain key. Different types of values can be stored, namely long,...
Definition: ZMQLogger.h:106
Belle2::ZMQLogger::increment
void increment(const std::string &key)
Increment the value with the given key (only numerical values). If not present, set to 1.
Definition: ZMQLogger.cc:34
Belle2::ZMQHistoServerToFileOutput::ZMQHistoServerToFileOutput
ZMQHistoServerToFileOutput(unsigned int maximalUncompressedBufferSize, const std::string &dqmFileName, const std::string &rootFileName)
Create a new connection initializing the DQMMemFile.
Definition: ZMQHistogramConnection.cc:27
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::ZMQHistoServerToRawOutput::ZMQHistoServerToRawOutput
ZMQHistoServerToRawOutput(const std::string &outputAddress, const std::shared_ptr< ZMQParent > &parent)
Create a new raw output with the given address.
Definition: ZMQHistogramConnection.cc:193
Belle2::ZMQRawOutput::handleEvent
virtual void handleEvent(zmq::message_t message)
Pass on the message - maybe by prefixing it with a htonl-converted data size in bytes.
Definition: ZMQRawConnection.cc:160
Belle2::ZMQHistoServerToZMQOutput::mergeAndSend
void mergeAndSend(const std::map< std::string, HistogramMapping > &storedMessages, const std::optional< unsigned int > &experiment, const std::optional< unsigned int > &run, EMessageTypes messageType)
Merge the histograms and send them via the connection. Stop/Terminate messages are sent after that.
Definition: ZMQHistogramConnection.cc:128
Belle2::SharedMem
Definition: SharedMem.h:20
Belle2::ZMQMessageFactory::createMessage
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
Definition: ZMQMessageFactory.h:37
Belle2::ZMQHistoServerToFileOutput::m_dqmMemFileName
std::string m_dqmMemFileName
Name of the shared memory.
Definition: ZMQHistogramConnection.h:85
Belle2::ZMQHistoServerToRawOutput::mergeAndSend
void mergeAndSend(const std::map< std::string, HistogramMapping > &storedMessages, const std::optional< unsigned int > &experiment, const std::optional< unsigned int > &run, EMessageTypes messageType)
Merge the histograms and send them via the connection. Stop/Terminate messages are not sent.
Definition: ZMQHistogramConnection.cc:203
Belle2::ZMQLogger::average
void average(const std::string &key, double value)
Instead of storeing the double value directly under the given key, store the average of the last MAX_...
Definition: ZMQLogger.h:112
Belle2::ZMQHistoServerToZMQOutput::m_outputBuffer
std::vector< char > m_outputBuffer
Buffer used for compression.
Definition: ZMQHistogramConnection.h:124
Belle2::ZMQHistoServerToZMQOutput::ZMQHistoServerToZMQOutput
ZMQHistoServerToZMQOutput(const std::string &outputAddress, const std::shared_ptr< ZMQParent > &parent)
Initialize the ZMQConfirmedOutput with the given address.
Definition: ZMQHistogramConnection.cc:117
Belle2::ZMQHistoServerToFileOutput::mergeAndSend
void mergeAndSend(const std::map< std::string, HistogramMapping > &storedMessages, const std::optional< unsigned int > &experiment, const std::optional< unsigned int > &run, EMessageTypes messageType)
Merge the given histograms into a single set of histograms and store them to file/shm.
Definition: ZMQHistogramConnection.cc:46
Belle2::EventMetaData
Store event, run, and experiment numbers.
Definition: EventMetaData.h:43
Belle2::ZMQHistoServerToZMQOutput::m_maximalCompressedSize
unsigned int m_maximalCompressedSize
Maximal size of the compression buffer.
Definition: ZMQHistogramConnection.h:126
Belle2::ZMQHistoServerToFileOutput::m_sharedMemory
SharedMem * m_sharedMemory
The SHM file. Please note that we do not call its destructor on purpose.
Definition: ZMQHistogramConnection.h:83
Belle2::HistogramMapping::clear
void clear()
Clear all histograms in the internal map also deleting the pointers.
Definition: HistogramMapping.cc:73
Belle2::ZMQHistoServerToFileOutput::handleIncomingData
void handleIncomingData()
There should be never incoming data, so raise an exception if called anyways.
Definition: ZMQHistogramConnection.cc:94
Belle2::ZMQHistoServerToRawOutput::m_output
ZMQRawOutput m_output
The output connection used for sending the histograms.
Definition: ZMQHistogramConnection.h:159