Belle II Software development
ZMQHistogramOutput< AConnectionClass > Class Template Reference

Add the common functionality to the histogram output classes. More...

#include <ZMQHistogramOutput.h>

Inheritance diagram for ZMQHistogramOutput< AConnectionClass >:

Public Member Functions

template<class... Args>
 ZMQHistogramOutput (unsigned int maximalUncompressedBufferSize, Args &&... args)
 Perfectly forward the given arguments to the base class initializer (and init the buffer size)
 
void handleEvent (std::unique_ptr< ZMQIdMessage > message)
 Handle a new message to be "sent" (what this means is up to the base class) as described above.
 
void mergeAndSend (EMessageTypes messageType=EMessageTypes::c_eventMessage)
 Forward a merge call to the base class handing over the stored messages.
 
void clear ()
 Forward a clear call to the base class and clear the stored messages. Should be called on run start.
 

Private Attributes

unsigned int m_maximalUncompressedBufferSize
 Paramter for the buffer size (needed during decompression)
 
std::map< std::string, HistogramMappingm_storedMessages
 The stored histograms for each sender identity.
 
std::vector< char > m_uncompressedBuffer
 The buffer used during decompression.
 
std::optional< int > m_storedExperiment = {}
 If already received: the experiment number of the data (on mismatch, everything is cleared)
 
std::optional< int > m_storedRun = {}
 If already received: the run number of the data (on mismatch, everything is cleared)
 

Detailed Description

template<class AConnectionClass>
class Belle2::ZMQHistogramOutput< AConnectionClass >

Add the common functionality to the histogram output classes.

This histogram output connection itself does not know how to merge or send histograms, everything is implemented in the template class AConnectionClass.

This class just adds the common code on top of those to prevent code duplication. The mergeAndSend calls and the clear call are directly passed on to the parent class. On a message, different things happen depending on the message type:

  • on stop or terminate messages one last mergeAndSend is called from the base class. The base class needs to handle any needed sending etc.
  • If a compressed data message is retrieved it is decompressed.
  • After that (or for uncompressed messages) the received histograms in the message are stored in the message map with the identity as key. In this way only ever the latest message for each sender is stored (and used when merging).

Data messages are supposed to have the run and experiment number stored as JSON-transformed EventMetaData in the additional messages. This sent event meta data is compared with the already received data. On mismatch, all data is cleared.

Definition at line 55 of file ZMQHistogramOutput.h.

Member Function Documentation

◆ clear()

void clear

Forward a clear call to the base class and clear the stored messages. Should be called on run start.

Definition at line 101 of file ZMQHistogramOutput.cc.

102{
103 AConnectionClass::increment("histogram_clears");
104 AConnectionClass::logTime("last_clear");
105
106 AConnectionClass::clear();
107
108 m_storedMessages.clear();
109 m_storedExperiment.reset();
110 m_storedRun.reset();
111}
std::optional< int > m_storedRun
If already received: the run number of the data (on mismatch, everything is cleared)
std::optional< int > m_storedExperiment
If already received: the experiment number of the data (on mismatch, everything is cleared)
std::map< std::string, HistogramMapping > m_storedMessages
The stored histograms for each sender identity.

◆ handleEvent()

void handleEvent ( std::unique_ptr< ZMQIdMessage message)

Handle a new message to be "sent" (what this means is up to the base class) as described above.

Definition at line 27 of file ZMQHistogramOutput.cc.

28{
29 if (message->isMessage(EMessageTypes::c_lastEventMessage)) {
30 // This run is over, so merge everything a final time and send it out
31 mergeAndSend(EMessageTypes::c_lastEventMessage);
32 clear();
33 return;
34 } else if (message->isMessage(EMessageTypes::c_terminateMessage)) {
35 // Everything is over, so send out a terminate message
36 mergeAndSend(EMessageTypes::c_terminateMessage);
37 clear();
38 return;
39 }
40
41 auto identity = message->getIdentity();
42 auto& dataMessage = message->getDataMessage();
43 auto& additionalMessage = message->getAdditionalDataMessage();
44
45 // Check if we get messages with the same event number
46 EventMetaData* eventMetaDataPtr = nullptr;
47 std::string additionalMessageAsString(additionalMessage.data<const char>(), additionalMessage.size());
48 TBufferJSON::FromJSON(eventMetaDataPtr, additionalMessageAsString.c_str());
49
50 if (m_storedExperiment and * m_storedExperiment != eventMetaDataPtr->getExperiment()) {
51 B2ERROR("Having received histograms with different experiment numbers! Not a good sign!");
52 AConnectionClass::increment("different_event_meta_data");
53 clear();
54 return;
55 }
56 if (m_storedRun and * m_storedRun != eventMetaDataPtr->getRun()) {
57 B2ERROR("Having received histograms with different run numbers! Not a good sign!");
58 AConnectionClass::increment("different_event_meta_data");
59 clear();
60 return;
61 }
62
63 m_storedExperiment = eventMetaDataPtr->getExperiment();
64 m_storedRun = eventMetaDataPtr->getRun();
65
66 if (message->isMessage(Belle2::EMessageTypes::c_compressedDataMessage)) {
67 int uncompressedSize = LZ4_decompress_safe(dataMessage.data<char>(), &m_uncompressedBuffer[0],
68 dataMessage.size(), m_maximalUncompressedBufferSize);
69 B2ASSERT("Decompression failed", uncompressedSize > 0);
70
71 std::unique_ptr<Belle2::EvtMessage> msg(new Belle2::EvtMessage(&m_uncompressedBuffer[0]));
72
73 AConnectionClass::average("uncompressed_size", msg->size());
74
75 B2DEBUG(10,
76 "After decompression, the size is " << uncompressedSize << " and the message itself says " << msg->size());
77 HistogramMapping histogram(std::move(msg));
78 if (not histogram.empty()) {
79 m_storedMessages[identity] = std::move(histogram);
80 }
81 } else if (message->isMessage(Belle2::EMessageTypes::c_rawDataMessage)) {
82 std::unique_ptr<Belle2::EvtMessage> msg(new Belle2::EvtMessage(dataMessage.data<char>()));
83 HistogramMapping histogram(std::move(msg));
84 if (not histogram.empty()) {
85 m_storedMessages[identity] = std::move(histogram);
86 }
87 } else {
88 B2FATAL("Unknown message type!");
89 }
90
91 AConnectionClass::log("stored_identities", static_cast<long>(m_storedMessages.size()));
92}
Store event, run, and experiment numbers.
Definition: EventMetaData.h:33
int getRun() const
Run Getter.
int getExperiment() const
Experiment Getter.
Class to manage streamed object.
Definition: EvtMessage.h:59
Utility to store received histograms (hierarchical tree structures) from clients (as an event message...
std::vector< char > m_uncompressedBuffer
The buffer used during decompression.
unsigned int m_maximalUncompressedBufferSize
Paramter for the buffer size (needed during decompression)
void clear()
Forward a clear call to the base class and clear the stored messages. Should be called on run start.
void mergeAndSend(EMessageTypes messageType=EMessageTypes::c_eventMessage)
Forward a merge call to the base class handing over the stored messages.

◆ mergeAndSend()

void mergeAndSend ( EMessageTypes  messageType = EMessageTypes::c_eventMessage)

Forward a merge call to the base class handing over the stored messages.

Definition at line 95 of file ZMQHistogramOutput.cc.

96{
97 AConnectionClass::mergeAndSend(m_storedMessages, m_storedExperiment, m_storedRun, messageType);
98}

Member Data Documentation

◆ m_maximalUncompressedBufferSize

unsigned int m_maximalUncompressedBufferSize
private

Paramter for the buffer size (needed during decompression)

Definition at line 70 of file ZMQHistogramOutput.h.

◆ m_storedExperiment

std::optional<int> m_storedExperiment = {}
private

If already received: the experiment number of the data (on mismatch, everything is cleared)

Definition at line 78 of file ZMQHistogramOutput.h.

◆ m_storedMessages

std::map<std::string, HistogramMapping> m_storedMessages
private

The stored histograms for each sender identity.

Definition at line 73 of file ZMQHistogramOutput.h.

◆ m_storedRun

std::optional<int> m_storedRun = {}
private

If already received: the run number of the data (on mismatch, everything is cleared)

Definition at line 80 of file ZMQHistogramOutput.h.

◆ m_uncompressedBuffer

std::vector<char> m_uncompressedBuffer
private

The buffer used during decompression.

Definition at line 75 of file ZMQHistogramOutput.h.


The documentation for this class was generated from the following files: