8#include <daq/hbasf2/connections/ZMQHistogramConnection.h>
10#include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
12#include <framework/logging/Logger.h>
15#include <TBufferJSON.h>
20#include <boost/range/combine.hpp>
21#include <boost/format.hpp>
22#include <boost/algorithm/string/replace.hpp>
28 const std::string& rootFileName) :
29 m_dqmMemFileName(dqmFileName), m_rootFileName(rootFileName)
31 log(
"histogram_merges", 0l);
32 log(
"last_merged_histograms", 0l);
33 log(
"average_merged_histograms", 0l);
34 log(
"last_merge",
"");
35 log(
"last_written_file_name",
"");
36 log(
"memory_file_size", 0l);
40 const std::optional<unsigned int>& experiment,
43 if (storedMessages.empty()) {
47 B2ASSERT(
"Experiment and run must be set at this stage", experiment and run);
52 if (!memFile.IsOpen()) {
53 B2ASSERT(
"Writing to shared memory failed! ", (
"/dev/shm/tmp_" +
m_dqmMemFileName).c_str());
61 log(
"last_merged_histograms",
static_cast<long>(storedMessages.size()));
62 average(
"average_merged_histograms",
static_cast<double>(storedMessages.size()));
65 for (
const auto& keyValue : storedMessages) {
66 const auto& histogram = keyValue.second;
67 mergeHistograms += histogram;
72 mergeHistograms.
clear();
76 average(
"memory_file_size", memFile.GetSize());
79 auto outputFileName = boost::replace_all_copy(
m_rootFileName,
"{run_number}", (boost::format(
"%05d") % *run).str());
80 boost::replace_all(outputFileName,
"{experiment_number}", (boost::format(
"%04d") % *experiment).str());
81 if (!std::filesystem::copy_file(
"/dev/shm/tmp_" +
m_dqmMemFileName, outputFileName,
82 std::filesystem::copy_options::overwrite_existing)) {
83 perror(
"Copy from shm file failed ");
86 log(
"last_written_file_name", outputFileName);
93 B2FATAL(
"There should be no data coming here!");
105 if (!memFile.IsOpen()) {
106 B2ASSERT(
"Writing to shared memory failed! ", (
"/dev/shm/tmp_" +
m_dqmMemFileName).c_str());
113 m_output(outputAddress, parent)
124 const std::optional<unsigned int>& experiment,
125 const std::optional<unsigned int>& run,
EMessageTypes messageType)
127 if (messageType == EMessageTypes::c_lastEventMessage) {
129 mergeAndSend(storedMessages, experiment, run, EMessageTypes::c_eventMessage);
134 }
else if (messageType == EMessageTypes::c_terminateMessage) {
136 mergeAndSend(storedMessages, experiment, run, EMessageTypes::c_eventMessage);
143 B2ASSERT(
"This should be an event message!", messageType == EMessageTypes::c_eventMessage);
146 if (storedMessages.empty()) {
150 B2ASSERT(
"Experiment and run must be set at this stage", experiment and run);
156 m_output.
log(
"last_merged_histograms",
static_cast<long>(storedMessages.size()));
157 m_output.
average(
"average_merged_histograms",
static_cast<double>(storedMessages.size()));
160 for (
const auto& keyValue : storedMessages) {
161 const auto& histogram = keyValue.second;
162 mergeHistograms += histogram;
165 auto eventMessage = mergeHistograms.
toMessage();
173 size = LZ4_compress_default(eventMessage->buffer(), &
m_outputBuffer[0], eventMessage->size(), size);
174 B2ASSERT(
"Compression failed", size > 0);
180 auto eventInformationString = TBufferJSON::ToJSON(&eventMetaData);
181 zmq::message_t additionalEventMessage(eventInformationString.Data(), eventInformationString.Length());
184 std::move(additionalEventMessage));
189 m_output(outputAddress, false, parent)
199 const std::optional<unsigned int>& experiment,
200 const std::optional<unsigned int>& run,
EMessageTypes messageType)
202 if (messageType == EMessageTypes::c_lastEventMessage) {
204 mergeAndSend(storedMessages, experiment, run, EMessageTypes::c_eventMessage);
207 }
else if (messageType == EMessageTypes::c_terminateMessage) {
209 mergeAndSend(storedMessages, experiment, run, EMessageTypes::c_eventMessage);
213 B2ASSERT(
"This should be an event message!", messageType == EMessageTypes::c_eventMessage);
216 if (storedMessages.empty()) {
220 B2ASSERT(
"Experiment and run must be set at this stage", experiment and run);
226 m_output.
log(
"last_merged_histograms",
static_cast<long>(storedMessages.size()));
227 m_output.
average(
"average_merged_histograms",
static_cast<double>(storedMessages.size()));
230 for (
const auto& keyValue : storedMessages) {
231 const auto& histogram = keyValue.second;
232 mergeHistograms += histogram;
235 auto eventMessage = mergeHistograms.
toMessage();
239 zmq::message_t message(eventMessage->buffer(), eventMessage->size());
Utility to store received histograms (hierarchical tree structures) from clients (as an event message...
void clear()
Clear all histograms in the internal map also deleting the pointers.
std::unique_ptr< Belle2::EvtMessage > toMessage() const
Construct an EvtMessage by serializing the content of the internal histogram storage....
void handleEvent(std::unique_ptr< ZMQNoIdMessage > message, bool requireConfirmation=true, int maximalWaitTime=10000)
Send the message to the output (a message without a ID as there is only a single output).
std::string m_rootFileName
Output file name (possible with placeholders)
void handleIncomingData()
There should be never incoming data, so raise an exception if called anyways.
std::string m_dqmMemFileName
Name of the shared memory.
void mergeAndSend(const std::map< std::string, HistogramMapping > &storedMessages, const std::optional< unsigned int > &experiment, const std::optional< unsigned int > &run, EMessageTypes messageType)
Merge the given histograms into a single set of histograms and store them to file/shm.
void clear()
Clear the shared memory.
std::vector< zmq::socket_t * > getSockets() const final
No sockets to poll on, so return an empty list.
ZMQHistoServerToFileOutput(const std::string &dqmFileName, const std::string &rootFileName)
Create a new connection initializing the DQMMemFile.
ZMQHistoServerToRawOutput(const std::string &outputAddress, const std::shared_ptr< ZMQParent > &parent)
Create a new raw output with the given address.
ZMQRawOutput m_output
The output connection used for sending the histograms.
void mergeAndSend(const std::map< std::string, HistogramMapping > &storedMessages, const std::optional< unsigned int > &experiment, const std::optional< unsigned int > &run, EMessageTypes messageType)
Merge the histograms and send them via the connection. Stop/Terminate messages are not sent.
ZMQConfirmedOutput m_output
The output connection used for sending the histograms.
std::vector< char > m_outputBuffer
Buffer used for compression.
ZMQHistoServerToZMQOutput(const std::string &outputAddress, const std::shared_ptr< ZMQParent > &parent)
Initialize the ZMQConfirmedOutput with the given address.
void mergeAndSend(const std::map< std::string, HistogramMapping > &storedMessages, const std::optional< unsigned int > &experiment, const std::optional< unsigned int > &run, EMessageTypes messageType)
Merge the histograms and send them via the connection. Stop/Terminate messages are sent after that.
unsigned int m_maximalCompressedSize
Maximal size of the compression buffer.
void logTime(const std::string &key)
Store the current time as a string under the given key.
void increment(const std::string &key)
Increment the value with the given key (only numerical values). If not present, set to 1.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
virtual void handleEvent(zmq::message_t message)
Pass on the message - maybe by prefixing it with a htonl-converted data size in bytes.
EMessageTypes
Type the messages can have.
void log(const std::string &key, const AClass &value)
Store a value under a certain key. Different types of values can be stored, namely long,...
void average(const std::string &key, double value)
Instead of storing the double value directly under the given key, store the average of the last MAX_S...
Abstract base class for different kinds of events.