10 #include <daq/hbasf2/connections/ZMQHistogramConnection.h>
12 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
14 #include <framework/logging/Logger.h>
17 #include <TBufferJSON.h>
21 #include <boost/range/combine.hpp>
22 #include <boost/format.hpp>
23 #include <boost/algorithm/string/replace.hpp>
28 const std::string& dqmFileName,
29 const std::string& rootFileName) :
30 m_dqmMemFileName(dqmFileName), m_rootFileName(rootFileName)
38 log(
"histogram_merges", 0l);
39 log(
"last_merged_histograms", 0l);
40 log(
"average_merged_histograms", 0l);
41 log(
"last_merge",
"");
42 log(
"last_written_file_name",
"");
43 log(
"memory_file_size", 0l);
47 const std::optional<unsigned int>& experiment,
48 const std::optional<unsigned int>& run,
EMessageTypes messageType)
50 if (storedMessages.empty()) {
54 B2ASSERT(
"Experiment and run must be set at this stage", experiment and run);
64 log(
"last_merged_histograms",
static_cast<long>(storedMessages.size()));
65 average(
"average_merged_histograms",
static_cast<double>(storedMessages.size()));
68 for (
const auto& keyValue : storedMessages) {
69 const auto& histogram = keyValue.second;
70 mergeHistograms += histogram;
75 average(
"memory_file_size", memFile.GetSize());
79 B2ASSERT(
"Writing to shared memory failed!",
80 memFile.CopyTo(
m_sharedMemory->ptr(), memFile.GetSize()) == memFile.GetSize());
85 auto outputFileName = boost::replace_all_copy(
m_rootFileName,
"{run_number}", (boost::format(
"%05d") % *run).str());
86 boost::replace_all(outputFileName,
"{experiment_number}", (boost::format(
"%04d") % *experiment).str());
87 memFile.Cp(outputFileName.c_str(),
false);
89 log(
"last_written_file_name", outputFileName);
91 mergeHistograms.
clear();
96 B2FATAL(
"There should be no data coming here!");
111 B2ASSERT(
"Writing to shared memory failed!",
112 memFile.CopyTo(
m_sharedMemory->ptr(), memFile.GetSize()) == memFile.GetSize());
118 m_output(outputAddress, parent)
129 const std::optional<unsigned int>& experiment,
130 const std::optional<unsigned int>& run,
EMessageTypes messageType)
132 if (messageType == EMessageTypes::c_lastEventMessage) {
134 mergeAndSend(storedMessages, experiment, run, EMessageTypes::c_eventMessage);
139 }
else if (messageType == EMessageTypes::c_terminateMessage) {
141 mergeAndSend(storedMessages, experiment, run, EMessageTypes::c_eventMessage);
148 B2ASSERT(
"This should be an event message!", messageType == EMessageTypes::c_eventMessage);
151 if (storedMessages.empty()) {
155 B2ASSERT(
"Experiment and run must be set at this stage", experiment and run);
161 m_output.
log(
"last_merged_histograms",
static_cast<long>(storedMessages.size()));
162 m_output.
average(
"average_merged_histograms",
static_cast<double>(storedMessages.size()));
165 for (
const auto& keyValue : storedMessages) {
166 const auto& histogram = keyValue.second;
167 mergeHistograms += histogram;
170 auto eventMessage = mergeHistograms.
toMessage();
178 size = LZ4_compress_default(eventMessage->buffer(), &
m_outputBuffer[0], eventMessage->size(), size);
179 B2ASSERT(
"Compression failed", size > 0);
185 auto eventInformationString = TBufferJSON::ToJSON(&eventMetaData);
186 zmq::message_t additionalEventMessage(eventInformationString.Data(), eventInformationString.Length());
189 std::move(additionalEventMessage));
194 m_output(outputAddress, false, parent)
204 const std::optional<unsigned int>& experiment,
205 const std::optional<unsigned int>& run,
EMessageTypes messageType)
207 if (messageType == EMessageTypes::c_lastEventMessage) {
209 mergeAndSend(storedMessages, experiment, run, EMessageTypes::c_eventMessage);
212 }
else if (messageType == EMessageTypes::c_terminateMessage) {
214 mergeAndSend(storedMessages, experiment, run, EMessageTypes::c_eventMessage);
218 B2ASSERT(
"This should be an event message!", messageType == EMessageTypes::c_eventMessage);
221 if (storedMessages.empty()) {
225 B2ASSERT(
"Experiment and run must be set at this stage", experiment and run);
231 m_output.
log(
"last_merged_histograms",
static_cast<long>(storedMessages.size()));
232 m_output.
average(
"average_merged_histograms",
static_cast<double>(storedMessages.size()));
235 for (
const auto& keyValue : storedMessages) {
236 const auto& histogram = keyValue.second;
237 mergeHistograms += histogram;
240 auto eventMessage = mergeHistograms.
toMessage();
244 zmq::message_t message(eventMessage->buffer(), eventMessage->size());