Belle II Software development
ZMQHistogramConnection.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include <daq/hbasf2/connections/ZMQHistogramConnection.h>
9
10#include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
11
12#include <framework/logging/Logger.h>
13
14#include <TH1.h>
15#include <TBufferJSON.h>
16#include <TMemFile.h>
17
18#include <lz4.h>
19
20#include <boost/range/combine.hpp>
21#include <boost/format.hpp>
22#include <boost/algorithm/string/replace.hpp>
23
24using namespace Belle2;
25
26ZMQHistoServerToFileOutput::ZMQHistoServerToFileOutput(unsigned int maximalUncompressedBufferSize,
27 const std::string& dqmFileName,
28 const std::string& rootFileName) :
29 m_dqmMemFileName(dqmFileName), m_rootFileName(rootFileName)
30{
31
32 // We do not free this on purpose!
33 if (not m_dqmMemFileName.empty()) {
34 m_sharedMemory = new DqmSharedMem(m_dqmMemFileName.c_str(), maximalUncompressedBufferSize);
35 }
36
37 log("histogram_merges", 0l);
38 log("last_merged_histograms", 0l);
39 log("average_merged_histograms", 0l);
40 log("last_merge", "");
41 log("last_written_file_name", "");
42 log("memory_file_size", 0l);
43}
44
45void ZMQHistoServerToFileOutput::mergeAndSend(const std::map<std::string, HistogramMapping>& storedMessages,
46 const std::optional<unsigned int>& experiment,
47 const std::optional<unsigned int>& run, EMessageTypes /*messageType*/)
48{
49 if (storedMessages.empty()) {
50 return;
51 }
52
53 B2ASSERT("Experiment and run must be set at this stage", experiment and run);
54
55 increment("histogram_merges");
56
57 // We do not care if this is the run end, or run start or anything. We just write it out.
58 HistogramMapping mergeHistograms;
59
60 TMemFile memFile(m_dqmMemFileName.c_str(), "RECREATE");
61 memFile.cd();
62
63 log("last_merged_histograms", static_cast<long>(storedMessages.size()));
64 average("average_merged_histograms", static_cast<double>(storedMessages.size()));
65
66 logTime("last_merge");
67 for (const auto& keyValue : storedMessages) {
68 const auto& histogram = keyValue.second;
69 mergeHistograms += histogram;
70 }
71
72 memFile.Write();
73
74 average("memory_file_size", memFile.GetSize());
75
76 if (m_sharedMemory) {
77 m_sharedMemory->lock();
78 B2ASSERT("Writing to shared memory failed!",
79 memFile.CopyTo(m_sharedMemory->ptr(), memFile.GetSize()) == memFile.GetSize());
80 m_sharedMemory->unlock();
81 }
82
83 // Also write the memory content out to a regular ROOT file
84 auto outputFileName = boost::replace_all_copy(m_rootFileName, "{run_number}", (boost::format("%05d") % *run).str());
85 boost::replace_all(outputFileName, "{experiment_number}", (boost::format("%04d") % *experiment).str());
86 memFile.Cp(outputFileName.c_str(), false);
87
88 log("last_written_file_name", outputFileName);
89
90 mergeHistograms.clear();
91}
92
94{
95 B2FATAL("There should be no data coming here!");
96}
97
98std::vector<zmq::socket_t*> ZMQHistoServerToFileOutput::getSockets() const
99{
100 return {};
101}
102
104{
105 // Clear the shared memory by writing an empty ROOT file into it
106 if (m_sharedMemory) {
107 TMemFile memFile(m_dqmMemFileName.c_str(), "RECREATE");
108 memFile.Close();
109 m_sharedMemory->lock();
110 B2ASSERT("Writing to shared memory failed!",
111 memFile.CopyTo(m_sharedMemory->ptr(), memFile.GetSize()) == memFile.GetSize());
112 m_sharedMemory->unlock();
113 }
114}
115
116ZMQHistoServerToZMQOutput::ZMQHistoServerToZMQOutput(const std::string& outputAddress, const std::shared_ptr<ZMQParent>& parent) :
117 m_output(outputAddress, parent)
118{
119 m_output.log("histogram_merges", 0l);
120 m_output.log("last_merged_histograms", 0l);
121 m_output.log("average_merged_histograms", 0l);
122 m_output.log("last_merge", "");
123 m_output.log("size_before_compression", 0.0);
124 m_output.log("size_after_compression", 0.0);
125}
126
127void ZMQHistoServerToZMQOutput::mergeAndSend(const std::map<std::string, HistogramMapping>& storedMessages,
128 const std::optional<unsigned int>& experiment,
129 const std::optional<unsigned int>& run, EMessageTypes messageType)
130{
131 if (messageType == EMessageTypes::c_lastEventMessage) {
132 // merge one last time
133 mergeAndSend(storedMessages, experiment, run, EMessageTypes::c_eventMessage);
134 // and then send out a stop signal by ourself
135 auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_lastEventMessage);
136 m_output.handleEvent(std::move(message));
137 return;
138 } else if (messageType == EMessageTypes::c_terminateMessage) {
139 // merge one last time
140 mergeAndSend(storedMessages, experiment, run, EMessageTypes::c_eventMessage);
141 // and send out a terminate message
142 auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_terminateMessage);
143 m_output.handleEvent(std::move(message));
144 return;
145 }
146
147 B2ASSERT("This should be an event message!", messageType == EMessageTypes::c_eventMessage);
148
149 // Makes no sense to send out an empty event
150 if (storedMessages.empty()) {
151 return;
152 }
153
154 B2ASSERT("Experiment and run must be set at this stage", experiment and run);
155
156 m_output.increment("histogram_merges");
157
158 HistogramMapping mergeHistograms;
159
160 m_output.log("last_merged_histograms", static_cast<long>(storedMessages.size()));
161 m_output.average("average_merged_histograms", static_cast<double>(storedMessages.size()));
162 m_output.logTime("last_merge");
163
164 for (const auto& keyValue : storedMessages) {
165 const auto& histogram = keyValue.second;
166 mergeHistograms += histogram;
167 }
168
169 auto eventMessage = mergeHistograms.toMessage();
170
171 if (m_outputBuffer.empty()) {
173 }
174
175 m_output.average("size_before_compression", eventMessage->size());
176 int size = m_maximalCompressedSize;
177 size = LZ4_compress_default(eventMessage->buffer(), &m_outputBuffer[0], eventMessage->size(), size);
178 B2ASSERT("Compression failed", size > 0);
179 m_output.average("size_after_compression", size);
180
181 zmq::message_t message(&m_outputBuffer[0], size);
182
183 EventMetaData eventMetaData(0, *run, *experiment);
184 auto eventInformationString = TBufferJSON::ToJSON(&eventMetaData);
185 zmq::message_t additionalEventMessage(eventInformationString.Data(), eventInformationString.Length());
186
187 auto zmqMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_compressedDataMessage, std::move(message),
188 std::move(additionalEventMessage));
189 m_output.handleEvent(std::move(zmqMessage), true, 20000);
190}
191
192ZMQHistoServerToRawOutput::ZMQHistoServerToRawOutput(const std::string& outputAddress, const std::shared_ptr<ZMQParent>& parent) :
193 m_output(outputAddress, false, parent)
194{
195 m_output.log("histogram_merges", 0l);
196 m_output.log("last_merged_histograms", 0l);
197 m_output.log("average_merged_histograms", 0l);
198 m_output.log("last_merge", "");
199 m_output.log("size_before_compression", 0.0);
200}
201
202void ZMQHistoServerToRawOutput::mergeAndSend(const std::map<std::string, HistogramMapping>& storedMessages,
203 const std::optional<unsigned int>& experiment,
204 const std::optional<unsigned int>& run, EMessageTypes messageType)
205{
206 if (messageType == EMessageTypes::c_lastEventMessage) {
207 // merge one last time
208 mergeAndSend(storedMessages, experiment, run, EMessageTypes::c_eventMessage);
209 // but do not send out any message
210 return;
211 } else if (messageType == EMessageTypes::c_terminateMessage) {
212 // merge one last time
213 mergeAndSend(storedMessages, experiment, run, EMessageTypes::c_eventMessage);
214 return;
215 }
216
217 B2ASSERT("This should be an event message!", messageType == EMessageTypes::c_eventMessage);
218
219 // Makes no sense to send out an empty event
220 if (storedMessages.empty()) {
221 return;
222 }
223
224 B2ASSERT("Experiment and run must be set at this stage", experiment and run);
225
226 m_output.increment("histogram_merges");
227
228 HistogramMapping mergeHistograms;
229
230 m_output.log("last_merged_histograms", static_cast<long>(storedMessages.size()));
231 m_output.average("average_merged_histograms", static_cast<double>(storedMessages.size()));
232 m_output.logTime("last_merge");
233
234 for (const auto& keyValue : storedMessages) {
235 const auto& histogram = keyValue.second;
236 mergeHistograms += histogram;
237 }
238
239 auto eventMessage = mergeHistograms.toMessage();
240
241 m_output.average("size_before_compression", eventMessage->size());
242
243 zmq::message_t message(eventMessage->buffer(), eventMessage->size());
244 m_output.handleEvent(std::move(message));
245}
Store event, run, and experiment numbers.
Definition: EventMetaData.h:33
Utility to store received histograms (hierarchical tree structures) from clients (as an event message...
void clear()
Clear all histograms in the internal map also deleting the pointers.
std::unique_ptr< Belle2::EvtMessage > toMessage() const
Construct an EvtMessage by serializing the content of the internal histogram storage....
void handleEvent(std::unique_ptr< ZMQNoIdMessage > message, bool requireConfirmation=true, int maximalWaitTime=10000)
Send the message to the output (a message without a ID as there is only a single output).
std::string m_rootFileName
Output file name (possible with placeholders)
DqmSharedMem * m_sharedMemory
The SHM file. Please note that we do not call its destructor on purpose.
void handleIncomingData()
There should be never incoming data, so raise an exception if called anyways.
ZMQHistoServerToFileOutput(unsigned int maximalUncompressedBufferSize, const std::string &dqmFileName, const std::string &rootFileName)
Create a new connection initializing the DQMMemFile.
std::string m_dqmMemFileName
Name of the shared memory.
void mergeAndSend(const std::map< std::string, HistogramMapping > &storedMessages, const std::optional< unsigned int > &experiment, const std::optional< unsigned int > &run, EMessageTypes messageType)
Merge the given histograms into a single set of histograms and store them to file/shm.
void clear()
Clear the shared memory.
std::vector< zmq::socket_t * > getSockets() const final
No sockets to poll on, so return an empty list.
ZMQHistoServerToRawOutput(const std::string &outputAddress, const std::shared_ptr< ZMQParent > &parent)
Create a new raw output with the given address.
ZMQRawOutput m_output
The output connection used for sending the histograms.
void mergeAndSend(const std::map< std::string, HistogramMapping > &storedMessages, const std::optional< unsigned int > &experiment, const std::optional< unsigned int > &run, EMessageTypes messageType)
Merge the histograms and send them via the connection. Stop/Terminate messages are not sent.
ZMQConfirmedOutput m_output
The output connection used for sending the histograms.
std::vector< char > m_outputBuffer
Buffer used for compression.
ZMQHistoServerToZMQOutput(const std::string &outputAddress, const std::shared_ptr< ZMQParent > &parent)
Initialize the ZMQConfirmedOutput with the given address.
void mergeAndSend(const std::map< std::string, HistogramMapping > &storedMessages, const std::optional< unsigned int > &experiment, const std::optional< unsigned int > &run, EMessageTypes messageType)
Merge the histograms and send them via the connection. Stop/Terminate messages are sent after that.
unsigned int m_maximalCompressedSize
Maximal size of the compression buffer.
void logTime(const std::string &key)
Store the current time as a string under the given key.
Definition: ZMQLogger.cc:42
void increment(const std::string &key)
Increment the value with the given key (only numerical values). If not present, set to 1.
Definition: ZMQLogger.cc:32
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
virtual void handleEvent(zmq::message_t message)
Pass on the message - maybe by prefixing it with a htonl-converted data size in bytes.
EMessageTypes
Type the messages can have.
void log(const std::string &key, const AClass &value)
Store a value under a certain key. Different types of values can be stored, namely long,...
Definition: ZMQLogger.h:96
void average(const std::string &key, double value)
Instead of storeing the double value directly under the given key, store the average of the last MAX_...
Definition: ZMQLogger.h:102
Abstract base class for different kinds of events.