Belle II Software development
ZMQHistoServerToFileOutput Class Reference

Output histograms into a ROOT file and shared memory after merging. More...

#include <ZMQHistogramConnection.h>

Inheritance diagram for ZMQHistoServerToFileOutput:
ZMQConnection ZMQLogger

Public Types

using ReactorFunction = std::function< void(void)>
 Typedef of a function which will be called if a connection has a message.
 

Public Member Functions

 ZMQHistoServerToFileOutput (unsigned int maximalUncompressedBufferSize, const std::string &dqmFileName, const std::string &rootFileName)
 Create a new connection initializing the DQMMemFile.
 
void mergeAndSend (const std::map< std::string, HistogramMapping > &storedMessages, const std::optional< unsigned int > &experiment, const std::optional< unsigned int > &run, EMessageTypes messageType)
 Merge the given histograms into a single set of histograms and store them to file/shm.
 
void clear ()
 Clear the shared memory.
 
std::vector< zmq::socket_t * > getSockets () const final
 No sockets to poll on, so return an empty list.
 
void handleIncomingData ()
 There should be never incoming data, so raise an exception if called anyways.
 
std::string getEndPoint () const
 Return the connection string.
 
virtual bool isReady () const
 Return true of this connection is able to send messages right now. Can be overloaded in derived classes.
 
virtual std::string getMonitoringJSON () const
 Convert the stored monitoring values to a JSON string ready for sending out via a message.
 
template<class AClass >
void log (const std::string &key, const AClass &value)
 Store a value under a certain key. Different types of values can be stored, namely long, double or string. Mixtures are not allowed for a given key.
 
void increment (const std::string &key)
 Increment the value with the given key (only numerical values). If not present, set to 1.
 
void decrement (const std::string &key)
 Decrement the value with the given key (only numerical values). If not present, set to -1.
 
template<size_t MAX_SIZE = 100>
void average (const std::string &key, double value)
 Instead of storeing the double value directly under the given key, store the average of the last MAX_SIZE values.
 
template<size_t AVERAGE_SIZE = 2000>
void timeit (const std::string &key)
 Measure the rate of calls with the same key every AVERAGE_SIZE calls (and also display the last time AVERAGE_SIZE was reached under <key>_last_measurement)
 
void logTime (const std::string &key)
 Store the current time as a string under the given key.
 

Static Public Member Functions

static bool poll (const std::map< const ZMQConnection *, ReactorFunction > &connectionList, int timeout)
 Poll on the given connections and call the attached function if a messages comes in.
 
static bool hasMessage (const ZMQConnection *connection)
 Check if the given connection as an incoming message (right now, no waiting).
 

Private Attributes

DqmSharedMemm_sharedMemory = nullptr
 The SHM file. Please note that we do not call its destructor on purpose.
 
std::string m_dqmMemFileName
 Name of the shared memory.
 
std::string m_rootFileName
 Output file name (possible with placeholders)
 
std::map< std::string, std::variant< long, double, std::string > > m_monitoring
 Internal storage of all stored values.
 
std::unordered_map< std::string, std::tuple< std::vector< double >, size_t > > m_averages
 Internal storage of the previous values when calculating averages.
 
std::unordered_map< std::string, std::tuple< unsigned long, std::chrono::system_clock::time_point > > m_timeCounters
 Internal storage how often the timeit function for a given key was called and when it has last reached MAX_SIZE.
 

Detailed Description

Output histograms into a ROOT file and shared memory after merging.

This "connection" class needs to be wrapped with a ZMQHistogramOutput before using it as a real output connection (which adds the common functionalities).

The ROOT and shared memory are written using ROOTs own serialization. The shared memory is herby filled with the content of a temporary TMemFile. On clear the shared memory is cleared (ROOT files stay what they are).

Nothing special happens on stop or terminate other than that a merge is performed.

The given file path for the root file can include placeholders "{experiment_number}" and "{run_number}" which will be replaced accordingly. If no root file output name or no shared memory file for the DQMMemFile is given, this feature is not used.

Please note that we do not call the destructor of the shared memory on purpose as this leads to problems.

Definition at line 51 of file ZMQHistogramConnection.h.

Member Typedef Documentation

◆ ReactorFunction

using ReactorFunction = std::function<void(void)>
inherited

Typedef of a function which will be called if a connection has a message.

Definition at line 34 of file ZMQConnection.h.

Constructor & Destructor Documentation

◆ ZMQHistoServerToFileOutput()

ZMQHistoServerToFileOutput ( unsigned int  maximalUncompressedBufferSize,
const std::string &  dqmFileName,
const std::string &  rootFileName 
)

Create a new connection initializing the DQMMemFile.

Definition at line 26 of file ZMQHistogramConnection.cc.

28 :
29 m_dqmMemFileName(dqmFileName), m_rootFileName(rootFileName)
30{
31
32 // We do not free this on purpose!
33 if (not m_dqmMemFileName.empty()) {
34 m_sharedMemory = new DqmSharedMem(m_dqmMemFileName.c_str(), maximalUncompressedBufferSize);
35 }
36
37 log("histogram_merges", 0l);
38 log("last_merged_histograms", 0l);
39 log("average_merged_histograms", 0l);
40 log("last_merge", "");
41 log("last_written_file_name", "");
42 log("memory_file_size", 0l);
43}
std::string m_rootFileName
Output file name (possible with placeholders)
DqmSharedMem * m_sharedMemory
The SHM file. Please note that we do not call its destructor on purpose.
std::string m_dqmMemFileName
Name of the shared memory.
void log(const std::string &key, const AClass &value)
Store a value under a certain key. Different types of values can be stored, namely long,...
Definition: ZMQLogger.h:96

Member Function Documentation

◆ clear()

void clear ( )

Clear the shared memory.

Definition at line 103 of file ZMQHistogramConnection.cc.

104{
105 // Clear the shared memory by writing an empty ROOT file into it
106 if (m_sharedMemory) {
107 TMemFile memFile(m_dqmMemFileName.c_str(), "RECREATE");
108 memFile.Close();
109 m_sharedMemory->lock();
110 B2ASSERT("Writing to shared memory failed!",
111 memFile.CopyTo(m_sharedMemory->ptr(), memFile.GetSize()) == memFile.GetSize());
112 m_sharedMemory->unlock();
113 }
114}

◆ decrement()

void decrement ( const std::string &  key)
inherited

Decrement the value with the given key (only numerical values). If not present, set to -1.

Definition at line 37 of file ZMQLogger.cc.

38{
39 std::visit(Decrementor{}, m_monitoring[key]);
40}
std::map< std::string, std::variant< long, double, std::string > > m_monitoring
Internal storage of all stored values.
Definition: ZMQLogger.h:58

◆ getEndPoint()

std::string getEndPoint ( ) const
inline

Return the connection string.

Definition at line 70 of file ZMQHistogramConnection.h.

70{ return "file://" + m_rootFileName; }

◆ getMonitoringJSON()

std::string getMonitoringJSON ( ) const
virtualinherited

Convert the stored monitoring values to a JSON string ready for sending out via a message.

Reimplemented in ZMQHistoServerToZMQOutput, ZMQHistoServerToRawOutput, ZMQROIOutput, and ZMQDataAndROIOutput.

Definition at line 15 of file ZMQLogger.cc.

16{
17 std::stringstream buffer;
18 buffer << "{";
19 bool first = true;
20 for (const auto& keyValue : m_monitoring) {
21 if (not first) {
22 buffer << ", ";
23 }
24 first = false;
25 buffer << "\"" << keyValue.first << "\": ";
26 buffer << std::visit(toJSON{}, keyValue.second);
27 }
28 buffer << "}" << std::endl;
29 return buffer.str();
30}

◆ getSockets()

std::vector< zmq::socket_t * > getSockets ( ) const
finalvirtual

No sockets to poll on, so return an empty list.

Implements ZMQConnection.

Definition at line 98 of file ZMQHistogramConnection.cc.

99{
100 return {};
101}

◆ handleIncomingData()

void handleIncomingData ( )

There should be never incoming data, so raise an exception if called anyways.

Definition at line 93 of file ZMQHistogramConnection.cc.

94{
95 B2FATAL("There should be no data coming here!");
96}

◆ hasMessage()

bool hasMessage ( const ZMQConnection connection)
staticinherited

Check if the given connection as an incoming message (right now, no waiting).

Definition at line 20 of file ZMQConnection.cc.

21{
22 // Just poll with 0 timeout and no reaction function. Hacky trick to reduce code duplication
23 const auto emptyFunction = []() {};
24 return ZMQConnection::poll({{connection, emptyFunction}}, 0);
25}
static bool poll(const std::map< const ZMQConnection *, ReactorFunction > &connectionList, int timeout)
Poll on the given connections and call the attached function if a messages comes in.

◆ increment()

void increment ( const std::string &  key)
inherited

Increment the value with the given key (only numerical values). If not present, set to 1.

Definition at line 32 of file ZMQLogger.cc.

33{
34 std::visit(Incrementor{}, m_monitoring[key]);
35}

◆ isReady()

bool isReady ( ) const
virtualinherited

Return true of this connection is able to send messages right now. Can be overloaded in derived classes.

Reimplemented in ZMQROIOutput, ZMQDataAndROIOutput, ZMQLoadBalancedOutput, ZMQRawOutput, ZMQHistoServerToZMQOutput, and ZMQHistoServerToRawOutput.

Definition at line 15 of file ZMQConnection.cc.

16{
17 return true;
18}

◆ logTime()

void logTime ( const std::string &  key)
inherited

Store the current time as a string under the given key.

Definition at line 42 of file ZMQLogger.cc.

43{
44 auto current = std::chrono::system_clock::now();
45 auto displayTime = std::chrono::system_clock::to_time_t(current);
46 log(key, std::ctime(&displayTime));
47}

◆ mergeAndSend()

void mergeAndSend ( const std::map< std::string, HistogramMapping > &  storedMessages,
const std::optional< unsigned int > &  experiment,
const std::optional< unsigned int > &  run,
EMessageTypes  messageType 
)

Merge the given histograms into a single set of histograms and store them to file/shm.

Definition at line 45 of file ZMQHistogramConnection.cc.

48{
49 if (storedMessages.empty()) {
50 return;
51 }
52
53 B2ASSERT("Experiment and run must be set at this stage", experiment and run);
54
55 increment("histogram_merges");
56
57 // We do not care if this is the run end, or run start or anything. We just write it out.
58 HistogramMapping mergeHistograms;
59
60 TMemFile memFile(m_dqmMemFileName.c_str(), "RECREATE");
61 memFile.cd();
62
63 log("last_merged_histograms", static_cast<long>(storedMessages.size()));
64 average("average_merged_histograms", static_cast<double>(storedMessages.size()));
65
66 logTime("last_merge");
67 for (const auto& keyValue : storedMessages) {
68 const auto& histogram = keyValue.second;
69 mergeHistograms += histogram;
70 }
71
72 memFile.Write();
73
74 average("memory_file_size", memFile.GetSize());
75
76 if (m_sharedMemory) {
77 m_sharedMemory->lock();
78 B2ASSERT("Writing to shared memory failed!",
79 memFile.CopyTo(m_sharedMemory->ptr(), memFile.GetSize()) == memFile.GetSize());
80 m_sharedMemory->unlock();
81 }
82
83 // Also write the memory content out to a regular ROOT file
84 auto outputFileName = boost::replace_all_copy(m_rootFileName, "{run_number}", (boost::format("%05d") % *run).str());
85 boost::replace_all(outputFileName, "{experiment_number}", (boost::format("%04d") % *experiment).str());
86 memFile.Cp(outputFileName.c_str(), false);
87
88 log("last_written_file_name", outputFileName);
89
90 mergeHistograms.clear();
91}
Utility to store received histograms (hierarchical tree structures) from clients (as an event message...
void clear()
Clear all histograms in the internal map also deleting the pointers.
void logTime(const std::string &key)
Store the current time as a string under the given key.
Definition: ZMQLogger.cc:42
void increment(const std::string &key)
Increment the value with the given key (only numerical values). If not present, set to 1.
Definition: ZMQLogger.cc:32
void average(const std::string &key, double value)
Instead of storeing the double value directly under the given key, store the average of the last MAX_...
Definition: ZMQLogger.h:102

◆ poll()

bool poll ( const std::map< const ZMQConnection *, ReactorFunction > &  connectionList,
int  timeout 
)
staticinherited

Poll on the given connections and call the attached function if a messages comes in.

If after timeout milliseconds still no message is received, return anyways. If timeout is 0, do not wait. If timeout is -1, wait infinitely.

Returns true if a message was received on any socket, false otherwise. Attention: in case of an interrupted system call (e.g. because a signal was received) the function might return anyways with a negative result even before the timeout!

Definition at line 27 of file ZMQConnection.cc.

28{
29 std::vector<const ReactorFunction*> socketMapping;
30 std::vector<zmq::pollitem_t> pollItems;
31
32 // zmq needs a special format for its polling, so create it here.
33 for (const auto& [connection, function] : connectionList) {
34 auto sockets = connection->getSockets();
35 for (zmq::socket_t* socket : sockets) {
36 zmq::pollitem_t pollItem;
37 pollItem.socket = static_cast<void*>(*socket);
38 pollItem.events = ZMQ_POLLIN;
39 pollItem.revents = 0;
40 pollItems.push_back(std::move(pollItem));
41
42 // but keep reference to the original function, so we can call the correct one later
43 socketMapping.push_back(&function);
44 }
45 }
46
47 if (pollItems.empty()) {
48 return false;
49 }
50
51 try {
52 zmq::poll(&pollItems[0], pollItems.size(), timeout);
53
54 bool anySocket = false;
55 unsigned int counter = 0;
56 for (const auto& pollItem : pollItems) {
57 if (pollItem.revents & ZMQ_POLLIN) {
58 anySocket = true;
59 const auto* functionPtr = socketMapping.at(counter);
60 const auto function = *functionPtr;
61 function();
62 }
63 counter++;
64 }
65
66 return anySocket;
67 } catch (zmq::error_t& error) {
68 if (error.num() == EINTR) {
69 // Could happen if there was an interrupt, return false so the caller knows the time did not pass already
70 return false;
71 } else {
72 // cannot handle, rethrow exception
73 throw;
74 }
75 }
76}

Member Data Documentation

◆ m_averages

std::unordered_map<std::string, std::tuple<std::vector<double>, size_t> > m_averages
privateinherited

Internal storage of the previous values when calculating averages.

Definition at line 60 of file ZMQLogger.h.

◆ m_dqmMemFileName

std::string m_dqmMemFileName
private

Name of the shared memory.

Definition at line 75 of file ZMQHistogramConnection.h.

◆ m_monitoring

std::map<std::string, std::variant<long, double, std::string> > m_monitoring
privateinherited

Internal storage of all stored values.

Definition at line 58 of file ZMQLogger.h.

◆ m_rootFileName

std::string m_rootFileName
private

Output file name (possible with placeholders)

Definition at line 77 of file ZMQHistogramConnection.h.

◆ m_sharedMemory

DqmSharedMem* m_sharedMemory = nullptr
private

The SHM file. Please note that we do not call its destructor on purpose.

Definition at line 73 of file ZMQHistogramConnection.h.

◆ m_timeCounters

std::unordered_map<std::string, std::tuple<unsigned long, std::chrono::system_clock::time_point> > m_timeCounters
privateinherited

Internal storage how often the timeit function for a given key was called and when it has last reached MAX_SIZE.

Definition at line 62 of file ZMQLogger.h.


The documentation for this class was generated from the following files: