8#include <daq/hbasf2/connections/ZMQHistogramConnection.h>
10#include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
12#include <framework/logging/Logger.h>
15#include <TBufferJSON.h>
20#include <boost/range/combine.hpp>
21#include <boost/format.hpp>
22#include <boost/algorithm/string/replace.hpp>
27 const std::string& dqmFileName,
28 const std::string& rootFileName) :
29 m_dqmMemFileName(dqmFileName), m_rootFileName(rootFileName)
37 log(
"histogram_merges", 0l);
38 log(
"last_merged_histograms", 0l);
39 log(
"average_merged_histograms", 0l);
40 log(
"last_merge",
"");
41 log(
"last_written_file_name",
"");
42 log(
"memory_file_size", 0l);
46 const std::optional<unsigned int>& experiment,
49 if (storedMessages.empty()) {
53 B2ASSERT(
"Experiment and run must be set at this stage", experiment and run);
63 log(
"last_merged_histograms",
static_cast<long>(storedMessages.size()));
64 average(
"average_merged_histograms",
static_cast<double>(storedMessages.size()));
67 for (
const auto& keyValue : storedMessages) {
68 const auto& histogram = keyValue.second;
69 mergeHistograms += histogram;
74 average(
"memory_file_size", memFile.GetSize());
78 B2ASSERT(
"Writing to shared memory failed!",
79 memFile.CopyTo(
m_sharedMemory->ptr(), memFile.GetSize()) == memFile.GetSize());
84 auto outputFileName = boost::replace_all_copy(
m_rootFileName,
"{run_number}", (boost::format(
"%05d") % *run).str());
85 boost::replace_all(outputFileName,
"{experiment_number}", (boost::format(
"%04d") % *experiment).str());
86 memFile.Cp(outputFileName.c_str(),
false);
88 log(
"last_written_file_name", outputFileName);
90 mergeHistograms.
clear();
95 B2FATAL(
"There should be no data coming here!");
110 B2ASSERT(
"Writing to shared memory failed!",
111 memFile.CopyTo(
m_sharedMemory->ptr(), memFile.GetSize()) == memFile.GetSize());
117 m_output(outputAddress, parent)
128 const std::optional<unsigned int>& experiment,
129 const std::optional<unsigned int>& run,
EMessageTypes messageType)
131 if (messageType == EMessageTypes::c_lastEventMessage) {
133 mergeAndSend(storedMessages, experiment, run, EMessageTypes::c_eventMessage);
138 }
else if (messageType == EMessageTypes::c_terminateMessage) {
140 mergeAndSend(storedMessages, experiment, run, EMessageTypes::c_eventMessage);
147 B2ASSERT(
"This should be an event message!", messageType == EMessageTypes::c_eventMessage);
150 if (storedMessages.empty()) {
154 B2ASSERT(
"Experiment and run must be set at this stage", experiment and run);
160 m_output.
log(
"last_merged_histograms",
static_cast<long>(storedMessages.size()));
161 m_output.
average(
"average_merged_histograms",
static_cast<double>(storedMessages.size()));
164 for (
const auto& keyValue : storedMessages) {
165 const auto& histogram = keyValue.second;
166 mergeHistograms += histogram;
169 auto eventMessage = mergeHistograms.
toMessage();
177 size = LZ4_compress_default(eventMessage->buffer(), &
m_outputBuffer[0], eventMessage->size(), size);
178 B2ASSERT(
"Compression failed", size > 0);
184 auto eventInformationString = TBufferJSON::ToJSON(&eventMetaData);
185 zmq::message_t additionalEventMessage(eventInformationString.Data(), eventInformationString.Length());
188 std::move(additionalEventMessage));
193 m_output(outputAddress, false, parent)
203 const std::optional<unsigned int>& experiment,
204 const std::optional<unsigned int>& run,
EMessageTypes messageType)
206 if (messageType == EMessageTypes::c_lastEventMessage) {
208 mergeAndSend(storedMessages, experiment, run, EMessageTypes::c_eventMessage);
211 }
else if (messageType == EMessageTypes::c_terminateMessage) {
213 mergeAndSend(storedMessages, experiment, run, EMessageTypes::c_eventMessage);
217 B2ASSERT(
"This should be an event message!", messageType == EMessageTypes::c_eventMessage);
220 if (storedMessages.empty()) {
224 B2ASSERT(
"Experiment and run must be set at this stage", experiment and run);
230 m_output.
log(
"last_merged_histograms",
static_cast<long>(storedMessages.size()));
231 m_output.
average(
"average_merged_histograms",
static_cast<double>(storedMessages.size()));
234 for (
const auto& keyValue : storedMessages) {
235 const auto& histogram = keyValue.second;
236 mergeHistograms += histogram;
239 auto eventMessage = mergeHistograms.
toMessage();
243 zmq::message_t message(eventMessage->buffer(), eventMessage->size());
Utility to store received histograms (hierarchical tree structures) from clients (as an event message...
void clear()
Clear all histograms in the internal map also deleting the pointers.
std::unique_ptr< Belle2::EvtMessage > toMessage() const
Construct an EvtMessage by serializing the content of the internal histogram storage....
void handleEvent(std::unique_ptr< ZMQNoIdMessage > message, bool requireConfirmation=true, int maximalWaitTime=10000)
Send the message to the output (a message without a ID as there is only a single output).
std::string m_rootFileName
Output file name (possible with placeholders)
DqmSharedMem * m_sharedMemory
The SHM file. Please note that we do not call its destructor on purpose.
void handleIncomingData()
There should be never incoming data, so raise an exception if called anyways.
ZMQHistoServerToFileOutput(unsigned int maximalUncompressedBufferSize, const std::string &dqmFileName, const std::string &rootFileName)
Create a new connection initializing the DQMMemFile.
std::string m_dqmMemFileName
Name of the shared memory.
void mergeAndSend(const std::map< std::string, HistogramMapping > &storedMessages, const std::optional< unsigned int > &experiment, const std::optional< unsigned int > &run, EMessageTypes messageType)
Merge the given histograms into a single set of histograms and store them to file/shm.
void clear()
Clear the shared memory.
std::vector< zmq::socket_t * > getSockets() const final
No sockets to poll on, so return an empty list.
ZMQHistoServerToRawOutput(const std::string &outputAddress, const std::shared_ptr< ZMQParent > &parent)
Create a new raw output with the given address.
ZMQRawOutput m_output
The output connection used for sending the histograms.
void mergeAndSend(const std::map< std::string, HistogramMapping > &storedMessages, const std::optional< unsigned int > &experiment, const std::optional< unsigned int > &run, EMessageTypes messageType)
Merge the histograms and send them via the connection. Stop/Terminate messages are not sent.
ZMQConfirmedOutput m_output
The output connection used for sending the histograms.
std::vector< char > m_outputBuffer
Buffer used for compression.
ZMQHistoServerToZMQOutput(const std::string &outputAddress, const std::shared_ptr< ZMQParent > &parent)
Initialize the ZMQConfirmedOutput with the given address.
void mergeAndSend(const std::map< std::string, HistogramMapping > &storedMessages, const std::optional< unsigned int > &experiment, const std::optional< unsigned int > &run, EMessageTypes messageType)
Merge the histograms and send them via the connection. Stop/Terminate messages are sent after that.
unsigned int m_maximalCompressedSize
Maximal size of the compression buffer.
void logTime(const std::string &key)
Store the current time as a string under the given key.
void increment(const std::string &key)
Increment the value with the given key (only numerical values). If not present, set to 1.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
virtual void handleEvent(zmq::message_t message)
Pass on the message - maybe by prefixing it with a htonl-converted data size in bytes.
EMessageTypes
Type the messages can have.
void log(const std::string &key, const AClass &value)
Store a value under a certain key. Different types of values can be stored, namely long,...
void average(const std::string &key, double value)
Instead of storeing the double value directly under the given key, store the average of the last MAX_...
Abstract base class for different kinds of events.