Belle II Software development
ZMQHistogramConnection.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include <daq/hbasf2/connections/ZMQHistogramConnection.h>
9
10#include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
11
12#include <framework/logging/Logger.h>
13
14#include <TH1.h>
15#include <TBufferJSON.h>
16#include <TFile.h>
17
18#include <lz4.h>
19
20#include <boost/range/combine.hpp>
21#include <boost/format.hpp>
22#include <boost/algorithm/string/replace.hpp>
23#include <filesystem>
24
25using namespace Belle2;
26
28 const std::string& rootFileName) :
29 m_dqmMemFileName(dqmFileName), m_rootFileName(rootFileName)
30{
31 log("histogram_merges", 0l);
32 log("last_merged_histograms", 0l);
33 log("average_merged_histograms", 0l);
34 log("last_merge", "");
35 log("last_written_file_name", "");
36 log("memory_file_size", 0l);
37}
38
39void ZMQHistoServerToFileOutput::mergeAndSend(const std::map<std::string, HistogramMapping>& storedMessages,
40 const std::optional<unsigned int>& experiment,
41 const std::optional<unsigned int>& run, EMessageTypes /*messageType*/)
42{
43 if (storedMessages.empty()) {
44 return;
45 }
46
47 B2ASSERT("Experiment and run must be set at this stage", experiment and run);
48
49 increment("histogram_merges");
50
51 TFile memFile(("/dev/shm/tmp_" + m_dqmMemFileName).c_str(), "RECREATE");
52 if (!memFile.IsOpen()) {
53 B2ASSERT("Writing to shared memory failed! ", ("/dev/shm/tmp_" + m_dqmMemFileName).c_str());
54 return;
55 }
56 memFile.cd();
57
58 // We do not care if this is the run end, or run start or anything. We just write it out.
59 HistogramMapping mergeHistograms;
60
61 log("last_merged_histograms", static_cast<long>(storedMessages.size()));
62 average("average_merged_histograms", static_cast<double>(storedMessages.size()));
63
64 logTime("last_merge");
65 for (const auto& keyValue : storedMessages) {
66 const auto& histogram = keyValue.second;
67 mergeHistograms += histogram;
68 }
69
70 memFile.Write();
71
72 mergeHistograms.clear();
73
74 memFile.Close();
75
76 average("memory_file_size", memFile.GetSize());
77
78 // Also write the memory content out to a regular ROOT file
79 auto outputFileName = boost::replace_all_copy(m_rootFileName, "{run_number}", (boost::format("%05d") % *run).str());
80 boost::replace_all(outputFileName, "{experiment_number}", (boost::format("%04d") % *experiment).str());
81 if (!std::filesystem::copy_file("/dev/shm/tmp_" + m_dqmMemFileName, outputFileName,
82 std::filesystem::copy_options::overwrite_existing)) {
83 perror("Copy from shm file failed ");
84 }
85
86 log("last_written_file_name", outputFileName);
87
88 std::filesystem::rename("/dev/shm/tmp_" + m_dqmMemFileName, "/dev/shm/" + m_dqmMemFileName);
89}
90
92{
93 B2FATAL("There should be no data coming here!");
94}
95
96std::vector<zmq::socket_t*> ZMQHistoServerToFileOutput::getSockets() const
97{
98 return {};
99}
100
102{
103 // Make sure that the ROOT file is empty, and then closed.
104 TFile memFile(("/dev/shm/tmp_" + m_dqmMemFileName).c_str(), "RECREATE");
105 if (!memFile.IsOpen()) {
106 B2ASSERT("Writing to shared memory failed! ", ("/dev/shm/tmp_" + m_dqmMemFileName).c_str());
107 }
108 memFile.Close();
109 std::filesystem::rename("/dev/shm/tmp_" + m_dqmMemFileName, "/dev/shm/" + m_dqmMemFileName);
110}
111
112ZMQHistoServerToZMQOutput::ZMQHistoServerToZMQOutput(const std::string& outputAddress, const std::shared_ptr<ZMQParent>& parent) :
113 m_output(outputAddress, parent)
114{
115 m_output.log("histogram_merges", 0l);
116 m_output.log("last_merged_histograms", 0l);
117 m_output.log("average_merged_histograms", 0l);
118 m_output.log("last_merge", "");
119 m_output.log("size_before_compression", 0.0);
120 m_output.log("size_after_compression", 0.0);
121}
122
123void ZMQHistoServerToZMQOutput::mergeAndSend(const std::map<std::string, HistogramMapping>& storedMessages,
124 const std::optional<unsigned int>& experiment,
125 const std::optional<unsigned int>& run, EMessageTypes messageType)
126{
127 if (messageType == EMessageTypes::c_lastEventMessage) {
128 // merge one last time
129 mergeAndSend(storedMessages, experiment, run, EMessageTypes::c_eventMessage);
130 // and then send out a stop signal by ourself
131 auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_lastEventMessage);
132 m_output.handleEvent(std::move(message));
133 return;
134 } else if (messageType == EMessageTypes::c_terminateMessage) {
135 // merge one last time
136 mergeAndSend(storedMessages, experiment, run, EMessageTypes::c_eventMessage);
137 // and send out a terminate message
138 auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_terminateMessage);
139 m_output.handleEvent(std::move(message));
140 return;
141 }
142
143 B2ASSERT("This should be an event message!", messageType == EMessageTypes::c_eventMessage);
144
145 // Makes no sense to send out an empty event
146 if (storedMessages.empty()) {
147 return;
148 }
149
150 B2ASSERT("Experiment and run must be set at this stage", experiment and run);
151
152 m_output.increment("histogram_merges");
153
154 HistogramMapping mergeHistograms;
155
156 m_output.log("last_merged_histograms", static_cast<long>(storedMessages.size()));
157 m_output.average("average_merged_histograms", static_cast<double>(storedMessages.size()));
158 m_output.logTime("last_merge");
159
160 for (const auto& keyValue : storedMessages) {
161 const auto& histogram = keyValue.second;
162 mergeHistograms += histogram;
163 }
164
165 auto eventMessage = mergeHistograms.toMessage();
166
167 if (m_outputBuffer.empty()) {
169 }
170
171 m_output.average("size_before_compression", eventMessage->size());
172 int size = m_maximalCompressedSize;
173 size = LZ4_compress_default(eventMessage->buffer(), &m_outputBuffer[0], eventMessage->size(), size);
174 B2ASSERT("Compression failed", size > 0);
175 m_output.average("size_after_compression", size);
176
177 zmq::message_t message(&m_outputBuffer[0], size);
178
179 EventMetaData eventMetaData(0, *run, *experiment);
180 auto eventInformationString = TBufferJSON::ToJSON(&eventMetaData);
181 zmq::message_t additionalEventMessage(eventInformationString.Data(), eventInformationString.Length());
182
183 auto zmqMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_compressedDataMessage, std::move(message),
184 std::move(additionalEventMessage));
185 m_output.handleEvent(std::move(zmqMessage), true, 20000);
186}
187
188ZMQHistoServerToRawOutput::ZMQHistoServerToRawOutput(const std::string& outputAddress, const std::shared_ptr<ZMQParent>& parent) :
189 m_output(outputAddress, false, parent)
190{
191 m_output.log("histogram_merges", 0l);
192 m_output.log("last_merged_histograms", 0l);
193 m_output.log("average_merged_histograms", 0l);
194 m_output.log("last_merge", "");
195 m_output.log("size_before_compression", 0.0);
196}
197
198void ZMQHistoServerToRawOutput::mergeAndSend(const std::map<std::string, HistogramMapping>& storedMessages,
199 const std::optional<unsigned int>& experiment,
200 const std::optional<unsigned int>& run, EMessageTypes messageType)
201{
202 if (messageType == EMessageTypes::c_lastEventMessage) {
203 // merge one last time
204 mergeAndSend(storedMessages, experiment, run, EMessageTypes::c_eventMessage);
205 // but do not send out any message
206 return;
207 } else if (messageType == EMessageTypes::c_terminateMessage) {
208 // merge one last time
209 mergeAndSend(storedMessages, experiment, run, EMessageTypes::c_eventMessage);
210 return;
211 }
212
213 B2ASSERT("This should be an event message!", messageType == EMessageTypes::c_eventMessage);
214
215 // Makes no sense to send out an empty event
216 if (storedMessages.empty()) {
217 return;
218 }
219
220 B2ASSERT("Experiment and run must be set at this stage", experiment and run);
221
222 m_output.increment("histogram_merges");
223
224 HistogramMapping mergeHistograms;
225
226 m_output.log("last_merged_histograms", static_cast<long>(storedMessages.size()));
227 m_output.average("average_merged_histograms", static_cast<double>(storedMessages.size()));
228 m_output.logTime("last_merge");
229
230 for (const auto& keyValue : storedMessages) {
231 const auto& histogram = keyValue.second;
232 mergeHistograms += histogram;
233 }
234
235 auto eventMessage = mergeHistograms.toMessage();
236
237 m_output.average("size_before_compression", eventMessage->size());
238
239 zmq::message_t message(eventMessage->buffer(), eventMessage->size());
240 m_output.handleEvent(std::move(message));
241}
Store event, run, and experiment numbers.
Definition: EventMetaData.h:33
Utility to store received histograms (hierarchical tree structures) from clients (as an event message...
void clear()
Clear all histograms in the internal map also deleting the pointers.
std::unique_ptr< Belle2::EvtMessage > toMessage() const
Construct an EvtMessage by serializing the content of the internal histogram storage....
void handleEvent(std::unique_ptr< ZMQNoIdMessage > message, bool requireConfirmation=true, int maximalWaitTime=10000)
Send the message to the output (a message without a ID as there is only a single output).
std::string m_rootFileName
Output file name (possible with placeholders)
void handleIncomingData()
There should be never incoming data, so raise an exception if called anyways.
std::string m_dqmMemFileName
Name of the shared memory.
void mergeAndSend(const std::map< std::string, HistogramMapping > &storedMessages, const std::optional< unsigned int > &experiment, const std::optional< unsigned int > &run, EMessageTypes messageType)
Merge the given histograms into a single set of histograms and store them to file/shm.
void clear()
Clear the shared memory.
std::vector< zmq::socket_t * > getSockets() const final
No sockets to poll on, so return an empty list.
ZMQHistoServerToFileOutput(const std::string &dqmFileName, const std::string &rootFileName)
Create a new connection initializing the DQMMemFile.
ZMQHistoServerToRawOutput(const std::string &outputAddress, const std::shared_ptr< ZMQParent > &parent)
Create a new raw output with the given address.
ZMQRawOutput m_output
The output connection used for sending the histograms.
void mergeAndSend(const std::map< std::string, HistogramMapping > &storedMessages, const std::optional< unsigned int > &experiment, const std::optional< unsigned int > &run, EMessageTypes messageType)
Merge the histograms and send them via the connection. Stop/Terminate messages are not sent.
ZMQConfirmedOutput m_output
The output connection used for sending the histograms.
std::vector< char > m_outputBuffer
Buffer used for compression.
ZMQHistoServerToZMQOutput(const std::string &outputAddress, const std::shared_ptr< ZMQParent > &parent)
Initialize the ZMQConfirmedOutput with the given address.
void mergeAndSend(const std::map< std::string, HistogramMapping > &storedMessages, const std::optional< unsigned int > &experiment, const std::optional< unsigned int > &run, EMessageTypes messageType)
Merge the histograms and send them via the connection. Stop/Terminate messages are sent after that.
unsigned int m_maximalCompressedSize
Maximal size of the compression buffer.
void logTime(const std::string &key)
Store the current time as a string under the given key.
Definition: ZMQLogger.cc:42
void increment(const std::string &key)
Increment the value with the given key (only numerical values). If not present, set to 1.
Definition: ZMQLogger.cc:32
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
virtual void handleEvent(zmq::message_t message)
Pass on the message - maybe by prefixing it with a htonl-converted data size in bytes.
EMessageTypes
Type the messages can have.
void log(const std::string &key, const AClass &value)
Store a value under a certain key. Different types of values can be stored, namely long,...
Definition: ZMQLogger.h:96
void average(const std::string &key, double value)
Instead of storing the double value directly under the given key, store the average of the last MAX_S...
Definition: ZMQLogger.h:102
Abstract base class for different kinds of events.