Belle II Software  release-08-01-10
ZMQHistogramConnection.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include <daq/hbasf2/connections/ZMQHistogramConnection.h>
9 
10 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
11 
12 #include <framework/logging/Logger.h>
13 
14 #include <TH1.h>
15 #include <TBufferJSON.h>
16 #include <TMemFile.h>
17 
18 #include <lz4.h>
19 
20 #include <boost/range/combine.hpp>
21 #include <boost/format.hpp>
22 #include <boost/algorithm/string/replace.hpp>
23 
24 using namespace Belle2;
25 
26 ZMQHistoServerToFileOutput::ZMQHistoServerToFileOutput(unsigned int maximalUncompressedBufferSize,
27  const std::string& dqmFileName,
28  const std::string& rootFileName) :
29  m_dqmMemFileName(dqmFileName), m_rootFileName(rootFileName)
30 {
31 
32  // We do not free this on purpose!
33  if (not m_dqmMemFileName.empty()) {
34  m_sharedMemory = new DqmSharedMem(m_dqmMemFileName.c_str(), maximalUncompressedBufferSize);
35  }
36 
37  log("histogram_merges", 0l);
38  log("last_merged_histograms", 0l);
39  log("average_merged_histograms", 0l);
40  log("last_merge", "");
41  log("last_written_file_name", "");
42  log("memory_file_size", 0l);
43 }
44 
45 void ZMQHistoServerToFileOutput::mergeAndSend(const std::map<std::string, HistogramMapping>& storedMessages,
46  const std::optional<unsigned int>& experiment,
47  const std::optional<unsigned int>& run, EMessageTypes /*messageType*/)
48 {
49  if (storedMessages.empty()) {
50  return;
51  }
52 
53  B2ASSERT("Experiment and run must be set at this stage", experiment and run);
54 
55  increment("histogram_merges");
56 
57  // We do not care if this is the run end, or run start or anything. We just write it out.
58  HistogramMapping mergeHistograms;
59 
60  TMemFile memFile(m_dqmMemFileName.c_str(), "RECREATE");
61  memFile.cd();
62 
63  log("last_merged_histograms", static_cast<long>(storedMessages.size()));
64  average("average_merged_histograms", static_cast<double>(storedMessages.size()));
65 
66  logTime("last_merge");
67  for (const auto& keyValue : storedMessages) {
68  const auto& histogram = keyValue.second;
69  mergeHistograms += histogram;
70  }
71 
72  memFile.Write();
73 
74  average("memory_file_size", memFile.GetSize());
75 
76  if (m_sharedMemory) {
77  m_sharedMemory->lock();
78  B2ASSERT("Writing to shared memory failed!",
79  memFile.CopyTo(m_sharedMemory->ptr(), memFile.GetSize()) == memFile.GetSize());
80  m_sharedMemory->unlock();
81  }
82 
83  // Also write the memory content out to a regular ROOT file
84  auto outputFileName = boost::replace_all_copy(m_rootFileName, "{run_number}", (boost::format("%05d") % *run).str());
85  boost::replace_all(outputFileName, "{experiment_number}", (boost::format("%04d") % *experiment).str());
86  memFile.Cp(outputFileName.c_str(), false);
87 
88  log("last_written_file_name", outputFileName);
89 
90  mergeHistograms.clear();
91 }
92 
94 {
95  B2FATAL("There should be no data coming here!");
96 }
97 
98 std::vector<zmq::socket_t*> ZMQHistoServerToFileOutput::getSockets() const
99 {
100  return {};
101 }
102 
104 {
105  // Clear the shared memory by writing an empty ROOT file into it
106  if (m_sharedMemory) {
107  TMemFile memFile(m_dqmMemFileName.c_str(), "RECREATE");
108  memFile.Close();
109  m_sharedMemory->lock();
110  B2ASSERT("Writing to shared memory failed!",
111  memFile.CopyTo(m_sharedMemory->ptr(), memFile.GetSize()) == memFile.GetSize());
112  m_sharedMemory->unlock();
113  }
114 }
115 
116 ZMQHistoServerToZMQOutput::ZMQHistoServerToZMQOutput(const std::string& outputAddress, const std::shared_ptr<ZMQParent>& parent) :
117  m_output(outputAddress, parent)
118 {
119  m_output.log("histogram_merges", 0l);
120  m_output.log("last_merged_histograms", 0l);
121  m_output.log("average_merged_histograms", 0l);
122  m_output.log("last_merge", "");
123  m_output.log("size_before_compression", 0.0);
124  m_output.log("size_after_compression", 0.0);
125 }
126 
127 void ZMQHistoServerToZMQOutput::mergeAndSend(const std::map<std::string, HistogramMapping>& storedMessages,
128  const std::optional<unsigned int>& experiment,
129  const std::optional<unsigned int>& run, EMessageTypes messageType)
130 {
131  if (messageType == EMessageTypes::c_lastEventMessage) {
132  // merge one last time
133  mergeAndSend(storedMessages, experiment, run, EMessageTypes::c_eventMessage);
134  // and then send out a stop signal by ourself
135  auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_lastEventMessage);
136  m_output.handleEvent(std::move(message));
137  return;
138  } else if (messageType == EMessageTypes::c_terminateMessage) {
139  // merge one last time
140  mergeAndSend(storedMessages, experiment, run, EMessageTypes::c_eventMessage);
141  // and send out a terminate message
142  auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_terminateMessage);
143  m_output.handleEvent(std::move(message));
144  return;
145  }
146 
147  B2ASSERT("This should be an event message!", messageType == EMessageTypes::c_eventMessage);
148 
149  // Makes no sense to send out an empty event
150  if (storedMessages.empty()) {
151  return;
152  }
153 
154  B2ASSERT("Experiment and run must be set at this stage", experiment and run);
155 
156  m_output.increment("histogram_merges");
157 
158  HistogramMapping mergeHistograms;
159 
160  m_output.log("last_merged_histograms", static_cast<long>(storedMessages.size()));
161  m_output.average("average_merged_histograms", static_cast<double>(storedMessages.size()));
162  m_output.logTime("last_merge");
163 
164  for (const auto& keyValue : storedMessages) {
165  const auto& histogram = keyValue.second;
166  mergeHistograms += histogram;
167  }
168 
169  auto eventMessage = mergeHistograms.toMessage();
170 
171  if (m_outputBuffer.empty()) {
173  }
174 
175  m_output.average("size_before_compression", eventMessage->size());
176  int size = m_maximalCompressedSize;
177  size = LZ4_compress_default(eventMessage->buffer(), &m_outputBuffer[0], eventMessage->size(), size);
178  B2ASSERT("Compression failed", size > 0);
179  m_output.average("size_after_compression", size);
180 
181  zmq::message_t message(&m_outputBuffer[0], size);
182 
183  EventMetaData eventMetaData(0, *run, *experiment);
184  auto eventInformationString = TBufferJSON::ToJSON(&eventMetaData);
185  zmq::message_t additionalEventMessage(eventInformationString.Data(), eventInformationString.Length());
186 
187  auto zmqMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_compressedDataMessage, std::move(message),
188  std::move(additionalEventMessage));
189  m_output.handleEvent(std::move(zmqMessage), true, 20000);
190 }
191 
192 ZMQHistoServerToRawOutput::ZMQHistoServerToRawOutput(const std::string& outputAddress, const std::shared_ptr<ZMQParent>& parent) :
193  m_output(outputAddress, false, parent)
194 {
195  m_output.log("histogram_merges", 0l);
196  m_output.log("last_merged_histograms", 0l);
197  m_output.log("average_merged_histograms", 0l);
198  m_output.log("last_merge", "");
199  m_output.log("size_before_compression", 0.0);
200 }
201 
202 void ZMQHistoServerToRawOutput::mergeAndSend(const std::map<std::string, HistogramMapping>& storedMessages,
203  const std::optional<unsigned int>& experiment,
204  const std::optional<unsigned int>& run, EMessageTypes messageType)
205 {
206  if (messageType == EMessageTypes::c_lastEventMessage) {
207  // merge one last time
208  mergeAndSend(storedMessages, experiment, run, EMessageTypes::c_eventMessage);
209  // but do not send out any message
210  return;
211  } else if (messageType == EMessageTypes::c_terminateMessage) {
212  // merge one last time
213  mergeAndSend(storedMessages, experiment, run, EMessageTypes::c_eventMessage);
214  return;
215  }
216 
217  B2ASSERT("This should be an event message!", messageType == EMessageTypes::c_eventMessage);
218 
219  // Makes no sense to send out an empty event
220  if (storedMessages.empty()) {
221  return;
222  }
223 
224  B2ASSERT("Experiment and run must be set at this stage", experiment and run);
225 
226  m_output.increment("histogram_merges");
227 
228  HistogramMapping mergeHistograms;
229 
230  m_output.log("last_merged_histograms", static_cast<long>(storedMessages.size()));
231  m_output.average("average_merged_histograms", static_cast<double>(storedMessages.size()));
232  m_output.logTime("last_merge");
233 
234  for (const auto& keyValue : storedMessages) {
235  const auto& histogram = keyValue.second;
236  mergeHistograms += histogram;
237  }
238 
239  auto eventMessage = mergeHistograms.toMessage();
240 
241  m_output.average("size_before_compression", eventMessage->size());
242 
243  zmq::message_t message(eventMessage->buffer(), eventMessage->size());
244  m_output.handleEvent(std::move(message));
245 }
Store event, run, and experiment numbers.
Definition: EventMetaData.h:33
Utility to store received histograms (hierarchical tree structures) from clients (as an event message...
void clear()
Clear all histograms in the internal map also deleting the pointers.
std::unique_ptr< Belle2::EvtMessage > toMessage() const
Construct an EvtMessage by serializing the content of the internal histogram storage....
void handleEvent(std::unique_ptr< ZMQNoIdMessage > message, bool requireConfirmation=true, int maximalWaitTime=10000)
Send the message to the output (a message without a ID as there is only a single output).
std::string m_rootFileName
Output file name (possible with placeholders)
DqmSharedMem * m_sharedMemory
The SHM file. Please note that we do not call its destructor on purpose.
void handleIncomingData()
There should be never incoming data, so raise an exception if called anyways.
ZMQHistoServerToFileOutput(unsigned int maximalUncompressedBufferSize, const std::string &dqmFileName, const std::string &rootFileName)
Create a new connection initializing the DQMMemFile.
std::string m_dqmMemFileName
Name of the shared memory.
void mergeAndSend(const std::map< std::string, HistogramMapping > &storedMessages, const std::optional< unsigned int > &experiment, const std::optional< unsigned int > &run, EMessageTypes messageType)
Merge the given histograms into a single set of histograms and store them to file/shm.
void clear()
Clear the shared memory.
std::vector< zmq::socket_t * > getSockets() const final
No sockets to poll on, so return an empty list.
ZMQHistoServerToRawOutput(const std::string &outputAddress, const std::shared_ptr< ZMQParent > &parent)
Create a new raw output with the given address.
ZMQRawOutput m_output
The output connection used for sending the histograms.
void mergeAndSend(const std::map< std::string, HistogramMapping > &storedMessages, const std::optional< unsigned int > &experiment, const std::optional< unsigned int > &run, EMessageTypes messageType)
Merge the histograms and send them via the connection. Stop/Terminate messages are not sent.
ZMQConfirmedOutput m_output
The output connection used for sending the histograms.
std::vector< char > m_outputBuffer
Buffer used for compression.
ZMQHistoServerToZMQOutput(const std::string &outputAddress, const std::shared_ptr< ZMQParent > &parent)
Initialize the ZMQConfirmedOutput with the given address.
void mergeAndSend(const std::map< std::string, HistogramMapping > &storedMessages, const std::optional< unsigned int > &experiment, const std::optional< unsigned int > &run, EMessageTypes messageType)
Merge the histograms and send them via the connection. Stop/Terminate messages are sent after that.
unsigned int m_maximalCompressedSize
Maximal size of the compression buffer.
void logTime(const std::string &key)
Store the current time as a string under the given key.
Definition: ZMQLogger.cc:42
void increment(const std::string &key)
Increment the value with the given key (only numerical values). If not present, set to 1.
Definition: ZMQLogger.cc:32
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
virtual void handleEvent(zmq::message_t message)
Pass on the message - maybe by prefixing it with a htonl-converted data size in bytes.
EMessageTypes
Type the messages can have.
void log(const std::string &key, const AClass &value)
Store a value under a certain key. Different types of values can be stored, namely long,...
Definition: ZMQLogger.h:96
void average(const std::string &key, double value)
Instead of storeing the double value directly under the given key, store the average of the last MAX_...
Definition: ZMQLogger.h:102
Abstract base class for different kinds of events.