8 #include <daq/hbasf2/connections/ZMQHistogramConnection.h>
10 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
12 #include <framework/logging/Logger.h>
15 #include <TBufferJSON.h>
19 #include <boost/range/combine.hpp>
20 #include <boost/format.hpp>
21 #include <boost/algorithm/string/replace.hpp>
26 const std::string& dqmFileName,
27 const std::string& rootFileName) :
28 m_dqmMemFileName(dqmFileName), m_rootFileName(rootFileName)
36 log(
"histogram_merges", 0l);
37 log(
"last_merged_histograms", 0l);
38 log(
"average_merged_histograms", 0l);
39 log(
"last_merge",
"");
40 log(
"last_written_file_name",
"");
41 log(
"memory_file_size", 0l);
45 const std::optional<unsigned int>& experiment,
46 const std::optional<unsigned int>& run,
EMessageTypes messageType)
48 if (storedMessages.empty()) {
52 B2ASSERT(
"Experiment and run must be set at this stage", experiment and run);
62 log(
"last_merged_histograms",
static_cast<long>(storedMessages.size()));
63 average(
"average_merged_histograms",
static_cast<double>(storedMessages.size()));
66 for (
const auto& keyValue : storedMessages) {
67 const auto& histogram = keyValue.second;
68 mergeHistograms += histogram;
73 average(
"memory_file_size", memFile.GetSize());
77 B2ASSERT(
"Writing to shared memory failed!",
78 memFile.CopyTo(
m_sharedMemory->ptr(), memFile.GetSize()) == memFile.GetSize());
83 auto outputFileName = boost::replace_all_copy(
m_rootFileName,
"{run_number}", (boost::format(
"%05d") % *run).str());
84 boost::replace_all(outputFileName,
"{experiment_number}", (boost::format(
"%04d") % *experiment).str());
85 memFile.Cp(outputFileName.c_str(),
false);
87 log(
"last_written_file_name", outputFileName);
89 mergeHistograms.
clear();
94 B2FATAL(
"There should be no data coming here!");
109 B2ASSERT(
"Writing to shared memory failed!",
110 memFile.CopyTo(
m_sharedMemory->ptr(), memFile.GetSize()) == memFile.GetSize());
116 m_output(outputAddress, parent)
127 const std::optional<unsigned int>& experiment,
128 const std::optional<unsigned int>& run,
EMessageTypes messageType)
130 if (messageType == EMessageTypes::c_lastEventMessage) {
132 mergeAndSend(storedMessages, experiment, run, EMessageTypes::c_eventMessage);
137 }
else if (messageType == EMessageTypes::c_terminateMessage) {
139 mergeAndSend(storedMessages, experiment, run, EMessageTypes::c_eventMessage);
146 B2ASSERT(
"This should be an event message!", messageType == EMessageTypes::c_eventMessage);
149 if (storedMessages.empty()) {
153 B2ASSERT(
"Experiment and run must be set at this stage", experiment and run);
159 m_output.
log(
"last_merged_histograms",
static_cast<long>(storedMessages.size()));
160 m_output.
average(
"average_merged_histograms",
static_cast<double>(storedMessages.size()));
163 for (
const auto& keyValue : storedMessages) {
164 const auto& histogram = keyValue.second;
165 mergeHistograms += histogram;
168 auto eventMessage = mergeHistograms.
toMessage();
176 size = LZ4_compress_default(eventMessage->buffer(), &
m_outputBuffer[0], eventMessage->size(), size);
177 B2ASSERT(
"Compression failed", size > 0);
183 auto eventInformationString = TBufferJSON::ToJSON(&eventMetaData);
184 zmq::message_t additionalEventMessage(eventInformationString.Data(), eventInformationString.Length());
187 std::move(additionalEventMessage));
192 m_output(outputAddress, false, parent)
202 const std::optional<unsigned int>& experiment,
203 const std::optional<unsigned int>& run,
EMessageTypes messageType)
205 if (messageType == EMessageTypes::c_lastEventMessage) {
207 mergeAndSend(storedMessages, experiment, run, EMessageTypes::c_eventMessage);
210 }
else if (messageType == EMessageTypes::c_terminateMessage) {
212 mergeAndSend(storedMessages, experiment, run, EMessageTypes::c_eventMessage);
216 B2ASSERT(
"This should be an event message!", messageType == EMessageTypes::c_eventMessage);
219 if (storedMessages.empty()) {
223 B2ASSERT(
"Experiment and run must be set at this stage", experiment and run);
229 m_output.
log(
"last_merged_histograms",
static_cast<long>(storedMessages.size()));
230 m_output.
average(
"average_merged_histograms",
static_cast<double>(storedMessages.size()));
233 for (
const auto& keyValue : storedMessages) {
234 const auto& histogram = keyValue.second;
235 mergeHistograms += histogram;
238 auto eventMessage = mergeHistograms.
toMessage();
242 zmq::message_t message(eventMessage->buffer(), eventMessage->size());
Utility to store received histograms (hierarchical tree structures) from clients (as an event message...
void clear()
Clear all histograms in the internal map also deleting the pointers.
std::unique_ptr< Belle2::EvtMessage > toMessage() const
Construct an EvtMessage by serializing the content of the internal histogram storage....
void handleEvent(std::unique_ptr< ZMQNoIdMessage > message, bool requireConfirmation=true, int maximalWaitTime=10000)
Send the message to the output (a message without a ID as there is only a single output).
std::string m_rootFileName
Output file name (possible with placeholders)
void handleIncomingData()
There should be never incoming data, so raise an exception if called anyways.
ZMQHistoServerToFileOutput(unsigned int maximalUncompressedBufferSize, const std::string &dqmFileName, const std::string &rootFileName)
Create a new connection initializing the DQMMemFile.
std::string m_dqmMemFileName
Name of the shared memory.
void mergeAndSend(const std::map< std::string, HistogramMapping > &storedMessages, const std::optional< unsigned int > &experiment, const std::optional< unsigned int > &run, EMessageTypes messageType)
Merge the given histograms into a single set of histograms and store them to file/shm.
void clear()
Clear the shared memory.
std::vector< zmq::socket_t * > getSockets() const final
No sockets to poll on, so return an empty list.
SharedMem * m_sharedMemory
The SHM file. Please note that we do not call its destructor on purpose.
ZMQHistoServerToRawOutput(const std::string &outputAddress, const std::shared_ptr< ZMQParent > &parent)
Create a new raw output with the given address.
ZMQRawOutput m_output
The output connection used for sending the histograms.
void mergeAndSend(const std::map< std::string, HistogramMapping > &storedMessages, const std::optional< unsigned int > &experiment, const std::optional< unsigned int > &run, EMessageTypes messageType)
Merge the histograms and send them via the connection. Stop/Terminate messages are not sent.
ZMQConfirmedOutput m_output
The output connection used for sending the histograms.
std::vector< char > m_outputBuffer
Buffer used for compression.
ZMQHistoServerToZMQOutput(const std::string &outputAddress, const std::shared_ptr< ZMQParent > &parent)
Initialize the ZMQConfirmedOutput with the given address.
void mergeAndSend(const std::map< std::string, HistogramMapping > &storedMessages, const std::optional< unsigned int > &experiment, const std::optional< unsigned int > &run, EMessageTypes messageType)
Merge the histograms and send them via the connection. Stop/Terminate messages are sent after that.
unsigned int m_maximalCompressedSize
Maximal size of the compression buffer.
void logTime(const std::string &key)
Store the current time as a string under the given key.
void increment(const std::string &key)
Increment the value with the given key (only numerical values). If not present, set to 1.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
virtual void handleEvent(zmq::message_t message)
Pass on the message - maybe by prefixing it with a htonl-converted data size in bytes.
EMessageTypes
Type the messages can have.
void log(const std::string &key, const AClass &value)
Store a value under a certain key. Different types of values can be stored, namely long,...
void average(const std::string &key, double value)
Instead of storeing the double value directly under the given key, store the average of the last MAX_...
Abstract base class for different kinds of events.