Belle II Software  release-06-00-14
ZMQHistogramConnection.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include <daq/hbasf2/connections/ZMQHistogramConnection.h>
9 
10 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
11 
12 #include <framework/logging/Logger.h>
13 
14 #include <TH1.h>
15 #include <TBufferJSON.h>
16 
17 #include <lz4.h>
18 
19 #include <boost/range/combine.hpp>
20 #include <boost/format.hpp>
21 #include <boost/algorithm/string/replace.hpp>
22 
23 using namespace Belle2;
24 
25 ZMQHistoServerToFileOutput::ZMQHistoServerToFileOutput(unsigned int maximalUncompressedBufferSize,
26  const std::string& dqmFileName,
27  const std::string& rootFileName) :
28  m_dqmMemFileName(dqmFileName), m_rootFileName(rootFileName)
29 {
30 
31  // We do not free this on purpose!
32  if (not m_dqmMemFileName.empty()) {
33  m_sharedMemory = new SharedMem(m_dqmMemFileName.c_str(), maximalUncompressedBufferSize);
34  }
35 
36  log("histogram_merges", 0l);
37  log("last_merged_histograms", 0l);
38  log("average_merged_histograms", 0l);
39  log("last_merge", "");
40  log("last_written_file_name", "");
41  log("memory_file_size", 0l);
42 }
43 
44 void ZMQHistoServerToFileOutput::mergeAndSend(const std::map<std::string, HistogramMapping>& storedMessages,
45  const std::optional<unsigned int>& experiment,
46  const std::optional<unsigned int>& run, EMessageTypes messageType)
47 {
48  if (storedMessages.empty()) {
49  return;
50  }
51 
52  B2ASSERT("Experiment and run must be set at this stage", experiment and run);
53 
54  increment("histogram_merges");
55 
56  // We do not care if this is the run end, or run start or anything. We just write it out.
57  HistogramMapping mergeHistograms;
58 
59  TMemFile memFile(m_dqmMemFileName.c_str(), "RECREATE");
60  memFile.cd();
61 
62  log("last_merged_histograms", static_cast<long>(storedMessages.size()));
63  average("average_merged_histograms", static_cast<double>(storedMessages.size()));
64 
65  logTime("last_merge");
66  for (const auto& keyValue : storedMessages) {
67  const auto& histogram = keyValue.second;
68  mergeHistograms += histogram;
69  }
70 
71  memFile.Write();
72 
73  average("memory_file_size", memFile.GetSize());
74 
75  if (m_sharedMemory) {
76  m_sharedMemory->lock();
77  B2ASSERT("Writing to shared memory failed!",
78  memFile.CopyTo(m_sharedMemory->ptr(), memFile.GetSize()) == memFile.GetSize());
79  m_sharedMemory->unlock();
80  }
81 
82  // Also write the memory content out to a regular ROOT file
83  auto outputFileName = boost::replace_all_copy(m_rootFileName, "{run_number}", (boost::format("%05d") % *run).str());
84  boost::replace_all(outputFileName, "{experiment_number}", (boost::format("%04d") % *experiment).str());
85  memFile.Cp(outputFileName.c_str(), false);
86 
87  log("last_written_file_name", outputFileName);
88 
89  mergeHistograms.clear();
90 }
91 
93 {
94  B2FATAL("There should be no data coming here!");
95 }
96 
97 std::vector<zmq::socket_t*> ZMQHistoServerToFileOutput::getSockets() const
98 {
99  return {};
100 }
101 
103 {
104  // Clear the shared memory by writing an empty ROOT file into it
105  if (m_sharedMemory) {
106  TMemFile memFile(m_dqmMemFileName.c_str(), "RECREATE");
107  memFile.Close();
108  m_sharedMemory->lock();
109  B2ASSERT("Writing to shared memory failed!",
110  memFile.CopyTo(m_sharedMemory->ptr(), memFile.GetSize()) == memFile.GetSize());
111  m_sharedMemory->unlock();
112  }
113 }
114 
115 ZMQHistoServerToZMQOutput::ZMQHistoServerToZMQOutput(const std::string& outputAddress, const std::shared_ptr<ZMQParent>& parent) :
116  m_output(outputAddress, parent)
117 {
118  m_output.log("histogram_merges", 0l);
119  m_output.log("last_merged_histograms", 0l);
120  m_output.log("average_merged_histograms", 0l);
121  m_output.log("last_merge", "");
122  m_output.log("size_before_compression", 0.0);
123  m_output.log("size_after_compression", 0.0);
124 }
125 
126 void ZMQHistoServerToZMQOutput::mergeAndSend(const std::map<std::string, HistogramMapping>& storedMessages,
127  const std::optional<unsigned int>& experiment,
128  const std::optional<unsigned int>& run, EMessageTypes messageType)
129 {
130  if (messageType == EMessageTypes::c_lastEventMessage) {
131  // merge one last time
132  mergeAndSend(storedMessages, experiment, run, EMessageTypes::c_eventMessage);
133  // and then send out a stop signal by ourself
134  auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_lastEventMessage);
135  m_output.handleEvent(std::move(message));
136  return;
137  } else if (messageType == EMessageTypes::c_terminateMessage) {
138  // merge one last time
139  mergeAndSend(storedMessages, experiment, run, EMessageTypes::c_eventMessage);
140  // and send out a terminate message
141  auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_terminateMessage);
142  m_output.handleEvent(std::move(message));
143  return;
144  }
145 
146  B2ASSERT("This should be an event message!", messageType == EMessageTypes::c_eventMessage);
147 
148  // Makes no sense to send out an empty event
149  if (storedMessages.empty()) {
150  return;
151  }
152 
153  B2ASSERT("Experiment and run must be set at this stage", experiment and run);
154 
155  m_output.increment("histogram_merges");
156 
157  HistogramMapping mergeHistograms;
158 
159  m_output.log("last_merged_histograms", static_cast<long>(storedMessages.size()));
160  m_output.average("average_merged_histograms", static_cast<double>(storedMessages.size()));
161  m_output.logTime("last_merge");
162 
163  for (const auto& keyValue : storedMessages) {
164  const auto& histogram = keyValue.second;
165  mergeHistograms += histogram;
166  }
167 
168  auto eventMessage = mergeHistograms.toMessage();
169 
170  if (m_outputBuffer.empty()) {
172  }
173 
174  m_output.average("size_before_compression", eventMessage->size());
175  int size = m_maximalCompressedSize;
176  size = LZ4_compress_default(eventMessage->buffer(), &m_outputBuffer[0], eventMessage->size(), size);
177  B2ASSERT("Compression failed", size > 0);
178  m_output.average("size_after_compression", size);
179 
180  zmq::message_t message(&m_outputBuffer[0], size);
181 
182  EventMetaData eventMetaData(0, *run, *experiment);
183  auto eventInformationString = TBufferJSON::ToJSON(&eventMetaData);
184  zmq::message_t additionalEventMessage(eventInformationString.Data(), eventInformationString.Length());
185 
186  auto zmqMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_compressedDataMessage, std::move(message),
187  std::move(additionalEventMessage));
188  m_output.handleEvent(std::move(zmqMessage), true, 20000);
189 }
190 
191 ZMQHistoServerToRawOutput::ZMQHistoServerToRawOutput(const std::string& outputAddress, const std::shared_ptr<ZMQParent>& parent) :
192  m_output(outputAddress, false, parent)
193 {
194  m_output.log("histogram_merges", 0l);
195  m_output.log("last_merged_histograms", 0l);
196  m_output.log("average_merged_histograms", 0l);
197  m_output.log("last_merge", "");
198  m_output.log("size_before_compression", 0.0);
199 }
200 
201 void ZMQHistoServerToRawOutput::mergeAndSend(const std::map<std::string, HistogramMapping>& storedMessages,
202  const std::optional<unsigned int>& experiment,
203  const std::optional<unsigned int>& run, EMessageTypes messageType)
204 {
205  if (messageType == EMessageTypes::c_lastEventMessage) {
206  // merge one last time
207  mergeAndSend(storedMessages, experiment, run, EMessageTypes::c_eventMessage);
208  // but do not send out any message
209  return;
210  } else if (messageType == EMessageTypes::c_terminateMessage) {
211  // merge one last time
212  mergeAndSend(storedMessages, experiment, run, EMessageTypes::c_eventMessage);
213  return;
214  }
215 
216  B2ASSERT("This should be an event message!", messageType == EMessageTypes::c_eventMessage);
217 
218  // Makes no sense to send out an empty event
219  if (storedMessages.empty()) {
220  return;
221  }
222 
223  B2ASSERT("Experiment and run must be set at this stage", experiment and run);
224 
225  m_output.increment("histogram_merges");
226 
227  HistogramMapping mergeHistograms;
228 
229  m_output.log("last_merged_histograms", static_cast<long>(storedMessages.size()));
230  m_output.average("average_merged_histograms", static_cast<double>(storedMessages.size()));
231  m_output.logTime("last_merge");
232 
233  for (const auto& keyValue : storedMessages) {
234  const auto& histogram = keyValue.second;
235  mergeHistograms += histogram;
236  }
237 
238  auto eventMessage = mergeHistograms.toMessage();
239 
240  m_output.average("size_before_compression", eventMessage->size());
241 
242  zmq::message_t message(eventMessage->buffer(), eventMessage->size());
243  m_output.handleEvent(std::move(message));
244 }
Store event, run, and experiment numbers.
Definition: EventMetaData.h:33
Utility to store received histograms (hierarchical tree structures) from clients (as an event message...
void clear()
Clear all histograms in the internal map also deleting the pointers.
std::unique_ptr< Belle2::EvtMessage > toMessage() const
Construct an EvtMessage by serializing the content of the internal histogram storage....
void handleEvent(std::unique_ptr< ZMQNoIdMessage > message, bool requireConfirmation=true, int maximalWaitTime=10000)
Send the message to the output (a message without a ID as there is only a single output).
std::string m_rootFileName
Output file name (possible with placeholders)
void handleIncomingData()
There should be never incoming data, so raise an exception if called anyways.
ZMQHistoServerToFileOutput(unsigned int maximalUncompressedBufferSize, const std::string &dqmFileName, const std::string &rootFileName)
Create a new connection initializing the DQMMemFile.
std::string m_dqmMemFileName
Name of the shared memory.
void mergeAndSend(const std::map< std::string, HistogramMapping > &storedMessages, const std::optional< unsigned int > &experiment, const std::optional< unsigned int > &run, EMessageTypes messageType)
Merge the given histograms into a single set of histograms and store them to file/shm.
void clear()
Clear the shared memory.
std::vector< zmq::socket_t * > getSockets() const final
No sockets to poll on, so return an empty list.
SharedMem * m_sharedMemory
The SHM file. Please note that we do not call its destructor on purpose.
ZMQHistoServerToRawOutput(const std::string &outputAddress, const std::shared_ptr< ZMQParent > &parent)
Create a new raw output with the given address.
ZMQRawOutput m_output
The output connection used for sending the histograms.
void mergeAndSend(const std::map< std::string, HistogramMapping > &storedMessages, const std::optional< unsigned int > &experiment, const std::optional< unsigned int > &run, EMessageTypes messageType)
Merge the histograms and send them via the connection. Stop/Terminate messages are not sent.
ZMQConfirmedOutput m_output
The output connection used for sending the histograms.
std::vector< char > m_outputBuffer
Buffer used for compression.
ZMQHistoServerToZMQOutput(const std::string &outputAddress, const std::shared_ptr< ZMQParent > &parent)
Initialize the ZMQConfirmedOutput with the given address.
void mergeAndSend(const std::map< std::string, HistogramMapping > &storedMessages, const std::optional< unsigned int > &experiment, const std::optional< unsigned int > &run, EMessageTypes messageType)
Merge the histograms and send them via the connection. Stop/Terminate messages are sent after that.
unsigned int m_maximalCompressedSize
Maximal size of the compression buffer.
void logTime(const std::string &key)
Store the current time as a string under the given key.
Definition: ZMQLogger.cc:42
void increment(const std::string &key)
Increment the value with the given key (only numerical values). If not present, set to 1.
Definition: ZMQLogger.cc:32
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
virtual void handleEvent(zmq::message_t message)
Pass on the message - maybe by prefixing it with a htonl-converted data size in bytes.
EMessageTypes
Type the messages can have.
void log(const std::string &key, const AClass &value)
Store a value under a certain key. Different types of values can be stored, namely long,...
Definition: ZMQLogger.h:96
void average(const std::string &key, double value)
Instead of storeing the double value directly under the given key, store the average of the last MAX_...
Definition: ZMQLogger.h:102
Abstract base class for different kinds of events.