Belle II Software development
ZMQHistogramConnection.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include <daq/hbasf2/connections/ZMQHistogramConnection.h>
9
10#include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
11
12#include <framework/logging/Logger.h>
13
14#include <TH1.h>
15#include <TBufferJSON.h>
16#include <TFile.h>
17
18#include <lz4.h>
19
20#include <boost/range/combine.hpp>
21#include <boost/format.hpp>
22#include <boost/algorithm/string/replace.hpp>
23#include <filesystem>
24
25using namespace Belle2;
26
28 const std::string& rootFileName) :
29 m_dqmMemFileName(dqmFileName), m_rootFileName(rootFileName)
30{
31 log("histogram_merges", 0l);
32 log("last_merged_histograms", 0l);
33 log("average_merged_histograms", 0l);
34 log("last_merge", "");
35 log("last_written_file_name", "");
36 log("memory_file_size", 0l);
37}
38
39void ZMQHistoServerToFileOutput::mergeAndSend(const std::map<std::string, HistogramMapping>& storedMessages,
40 const std::optional<unsigned int>& experiment,
41 const std::optional<unsigned int>& run, EMessageTypes /*messageType*/)
42{
43 if (storedMessages.empty()) {
44 return;
45 }
46
47 B2ASSERT("Experiment and run must be set at this stage", experiment and run);
48
49 increment("histogram_merges");
50
51 TFile memFile(("/dev/shm/tmp_" + m_dqmMemFileName).c_str(), "RECREATE");
52 if (!memFile.IsOpen()) {
53 B2ASSERT("Writing to shared memory failed! ", ("/dev/shm/tmp_" + m_dqmMemFileName).c_str());
54 return;
55 }
56 memFile.cd();
57
58 // We do not care if this is the run end, or run start or anything. We just write it out.
59 HistogramMapping mergeHistograms;
60
61 log("last_merged_histograms", static_cast<long>(storedMessages.size()));
62 average("average_merged_histograms", static_cast<double>(storedMessages.size()));
63
64 logTime("last_merge");
65 for (const auto& keyValue : storedMessages) {
66 const auto& histogram = keyValue.second;
67 mergeHistograms += histogram;
68 }
69
70 memFile.Write();
71
72 mergeHistograms.clear();
73
74 memFile.Close();
75
76 average("memory_file_size", memFile.GetSize());
77
78 // Also write the memory content out to a regular ROOT file
79 auto outputFileName = boost::replace_all_copy(m_rootFileName, "{run_number}", (boost::format("%05d") % *run).str());
80 boost::replace_all(outputFileName, "{experiment_number}", (boost::format("%04d") % *experiment).str());
81 if (!std::filesystem::copy_file("/dev/shm/tmp_" + m_dqmMemFileName, outputFileName,
82 std::filesystem::copy_options::overwrite_existing)) {
83 perror("Copy from shm file failed ");
84 }
85
86 log("last_written_file_name", outputFileName);
87
88 std::filesystem::rename("/dev/shm/tmp_" + m_dqmMemFileName, "/dev/shm/" + m_dqmMemFileName);
89}
90
92{
93 B2FATAL("There should be no data coming here!");
94}
95
96std::vector<zmq::socket_t*> ZMQHistoServerToFileOutput::getSockets() const
97{
98 return {};
99}
100
102{
103 // Make sure that the ROOT file is empty, and then closed.
104 TFile memFile(("/dev/shm/tmp_" + m_dqmMemFileName).c_str(), "RECREATE");
105 if (!memFile.IsOpen()) {
106 B2ASSERT("Writing to shared memory failed! ", ("/dev/shm/tmp_" + m_dqmMemFileName).c_str());
107 }
108 memFile.Close();
109 std::filesystem::rename("/dev/shm/tmp_" + m_dqmMemFileName, "/dev/shm/" + m_dqmMemFileName);
110}
111
112ZMQHistoServerToZMQOutput::ZMQHistoServerToZMQOutput(const std::string& outputAddress, const std::shared_ptr<ZMQParent>& parent) :
113 m_output(outputAddress, parent)
114{
115 m_output.log("histogram_merges", 0l);
116 m_output.log("last_merged_histograms", 0l);
117 m_output.log("average_merged_histograms", 0l);
118 m_output.log("last_merge", "");
119 m_output.log("size_before_compression", 0.0);
120 m_output.log("size_after_compression", 0.0);
121}
122
123void ZMQHistoServerToZMQOutput::mergeAndSend(const std::map<std::string, HistogramMapping>& storedMessages,
124 const std::optional<unsigned int>& experiment,
125 const std::optional<unsigned int>& run, EMessageTypes messageType)
126{
127 if (messageType == EMessageTypes::c_lastEventMessage) {
128 // merge one last time
129 mergeAndSend(storedMessages, experiment, run, EMessageTypes::c_eventMessage);
130 // and then send out a stop signal by ourself
131 auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_lastEventMessage);
132 m_output.handleEvent(std::move(message));
133 return;
134 } else if (messageType == EMessageTypes::c_terminateMessage) {
135 // merge one last time
136 mergeAndSend(storedMessages, experiment, run, EMessageTypes::c_eventMessage);
137 // and send out a terminate message
138 auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_terminateMessage);
139 m_output.handleEvent(std::move(message));
140 return;
141 }
142
143 B2ASSERT("This should be an event message!", messageType == EMessageTypes::c_eventMessage);
144
145 // Makes no sense to send out an empty event
146 if (storedMessages.empty()) {
147 return;
148 }
149
150 B2ASSERT("Experiment and run must be set at this stage", experiment and run);
151
152 m_output.increment("histogram_merges");
153
154 HistogramMapping mergeHistograms;
155
156 m_output.log("last_merged_histograms", static_cast<long>(storedMessages.size()));
157 m_output.average("average_merged_histograms", static_cast<double>(storedMessages.size()));
158 m_output.logTime("last_merge");
159
160 for (const auto& keyValue : storedMessages) {
161 const auto& histogram = keyValue.second;
162 mergeHistograms += histogram;
163 }
164
165 auto eventMessage = mergeHistograms.toMessage();
166
167 if (m_outputBuffer.empty()) {
169 }
170
171 m_output.average("size_before_compression", eventMessage->size());
172 int size = m_maximalCompressedSize;
173 size = LZ4_compress_default(eventMessage->buffer(), &m_outputBuffer[0], eventMessage->size(), size);
174 B2ASSERT("Compression failed", size > 0);
175 m_output.average("size_after_compression", size);
176
177 zmq::message_t message(&m_outputBuffer[0], size);
178
179 EventMetaData eventMetaData(0, *run, *experiment);
180 auto eventInformationString = TBufferJSON::ToJSON(&eventMetaData);
181 zmq::message_t additionalEventMessage(eventInformationString.Data(), eventInformationString.Length());
182
183 auto zmqMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_compressedDataMessage, std::move(message),
184 std::move(additionalEventMessage));
185 m_output.handleEvent(std::move(zmqMessage), true, 20000);
186}
187
188ZMQHistoServerToRawOutput::ZMQHistoServerToRawOutput(const std::string& outputAddress, const std::shared_ptr<ZMQParent>& parent) :
189 m_output(outputAddress, false, parent)
190{
191 m_output.log("histogram_merges", 0l);
192 m_output.log("last_merged_histograms", 0l);
193 m_output.log("average_merged_histograms", 0l);
194 m_output.log("last_merge", "");
195 m_output.log("size_before_compression", 0.0);
196}
197
198void ZMQHistoServerToRawOutput::mergeAndSend(const std::map<std::string, HistogramMapping>& storedMessages,
199 const std::optional<unsigned int>& experiment,
200 const std::optional<unsigned int>& run, EMessageTypes messageType)
201{
202 if (messageType == EMessageTypes::c_lastEventMessage) {
203 // merge one last time
204 mergeAndSend(storedMessages, experiment, run, EMessageTypes::c_eventMessage);
205 // but do not send out any message
206 return;
207 } else if (messageType == EMessageTypes::c_terminateMessage) {
208 // merge one last time
209 mergeAndSend(storedMessages, experiment, run, EMessageTypes::c_eventMessage);
210 return;
211 }
212
213 B2ASSERT("This should be an event message!", messageType == EMessageTypes::c_eventMessage);
214
215 // Makes no sense to send out an empty event
216 if (storedMessages.empty()) {
217 return;
218 }
219
220 B2ASSERT("Experiment and run must be set at this stage", experiment and run);
221
222 m_output.increment("histogram_merges");
223
224 HistogramMapping mergeHistograms;
225
226 m_output.log("last_merged_histograms", static_cast<long>(storedMessages.size()));
227 m_output.average("average_merged_histograms", static_cast<double>(storedMessages.size()));
228 m_output.logTime("last_merge");
229
230 for (const auto& keyValue : storedMessages) {
231 const auto& histogram = keyValue.second;
232 mergeHistograms += histogram;
233 }
234
235 auto eventMessage = mergeHistograms.toMessage();
236
237 m_output.average("size_before_compression", eventMessage->size());
238
239 zmq::message_t message(eventMessage->buffer(), eventMessage->size());
240 m_output.handleEvent(std::move(message));
241}
Store event, run, and experiment numbers.
Utility to store received histograms (hierarchical tree structures) from clients (as an event message...
void clear()
Clear all histograms in the internal map also deleting the pointers.
std::unique_ptr< Belle2::EvtMessage > toMessage() const
Construct an EvtMessage by serializing the content of the internal histogram storage....
std::string m_rootFileName
Output file name (possible with placeholders)
void handleIncomingData()
There should be never incoming data, so raise an exception if called anyways.
std::string m_dqmMemFileName
Name of the shared memory.
void mergeAndSend(const std::map< std::string, HistogramMapping > &storedMessages, const std::optional< unsigned int > &experiment, const std::optional< unsigned int > &run, EMessageTypes messageType)
Merge the given histograms into a single set of histograms and store them to file/shm.
void clear()
Clear the shared memory.
std::vector< zmq::socket_t * > getSockets() const final
No sockets to poll on, so return an empty list.
ZMQHistoServerToFileOutput(const std::string &dqmFileName, const std::string &rootFileName)
Create a new connection initializing the DQMMemFile.
ZMQHistoServerToRawOutput(const std::string &outputAddress, const std::shared_ptr< ZMQParent > &parent)
Create a new raw output with the given address.
ZMQRawOutput m_output
The output connection used for sending the histograms.
void mergeAndSend(const std::map< std::string, HistogramMapping > &storedMessages, const std::optional< unsigned int > &experiment, const std::optional< unsigned int > &run, EMessageTypes messageType)
Merge the histograms and send them via the connection. Stop/Terminate messages are not sent.
ZMQConfirmedOutput m_output
The output connection used for sending the histograms.
std::vector< char > m_outputBuffer
Buffer used for compression.
ZMQHistoServerToZMQOutput(const std::string &outputAddress, const std::shared_ptr< ZMQParent > &parent)
Initialize the ZMQConfirmedOutput with the given address.
void mergeAndSend(const std::map< std::string, HistogramMapping > &storedMessages, const std::optional< unsigned int > &experiment, const std::optional< unsigned int > &run, EMessageTypes messageType)
Merge the histograms and send them via the connection. Stop/Terminate messages are sent after that.
unsigned int m_maximalCompressedSize
Maximal size of the compression buffer.
void logTime(const std::string &key)
Store the current time as a string under the given key.
Definition ZMQLogger.cc:42
void increment(const std::string &key)
Increment the value with the given key (only numerical values). If not present, set to 1.
Definition ZMQLogger.cc:32
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
EMessageTypes
Type the messages can have.
void log(const std::string &key, const AClass &value)
Store a value under a certain key. Different types of values can be stored, namely long,...
Definition ZMQLogger.h:96
void average(const std::string &key, double value)
Instead of storing the double value directly under the given key, store the average of the last MAX_S...
Definition ZMQLogger.h:102
Abstract base class for different kinds of events.