Belle II Software development
ZMQHistogramConnection.h
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#pragma once
9
10#include <framework/pcore/zmq/connections/ZMQConnection.h>
11#include <daq/hbasf2/utils/HistogramMapping.h>
12
13#include <framework/pcore/zmq/connections/ZMQNullConnection.h>
14#include <framework/pcore/zmq/connections/ZMQConfirmedConnection.h>
15#include <framework/pcore/zmq/connections/ZMQRawConnection.h>
16
17#include <framework/pcore/zmq/messages/ZMQIdMessage.h>
18
19#include <framework/pcore/EvtMessage.h>
20
21#include <string>
22#include <memory>
23#include <map>
24#include <optional>
25
26namespace Belle2 {
47 public:
49 ZMQHistoServerToFileOutput(const std::string& dqmFileName,
50 const std::string& rootFileName);
51
53 void mergeAndSend(const std::map<std::string, HistogramMapping>& storedMessages, const std::optional<unsigned int>& experiment,
54 const std::optional<unsigned int>& run,
55 EMessageTypes messageType);
57 void clear();
58
60 std::vector<zmq::socket_t*> getSockets() const final;
62 void handleIncomingData();
64 std::string getEndPoint() const { return "file://" + m_rootFileName; }
65 private:
67 std::string m_dqmMemFileName;
69 std::string m_rootFileName;
70 };
71
81 public:
83 ZMQHistoServerToZMQOutput(const std::string& outputAddress, const std::shared_ptr<ZMQParent>& parent);
84
86 void mergeAndSend(const std::map<std::string, HistogramMapping>& storedMessages, const std::optional<unsigned int>& experiment,
87 const std::optional<unsigned int>& run,
88 EMessageTypes messageType);
90 void clear() {}
91
93 std::string getMonitoringJSON() const final { return m_output.getMonitoringJSON(); }
95 void handleIncomingData() { m_output.handleIncomingData(); }
97 bool isReady() const override { return m_output.isReady(); }
99 std::vector<zmq::socket_t*> getSockets() const final { return m_output.getSockets(); }
101 std::string getEndPoint() const { return m_output.getEndPoint(); }
102 private:
105
107 std::vector<char> m_outputBuffer;
109 unsigned int m_maximalCompressedSize = 100'000'000;
110 };
111
120 public:
122 ZMQHistoServerToRawOutput(const std::string& outputAddress, const std::shared_ptr<ZMQParent>& parent);
123
125 void mergeAndSend(const std::map<std::string, HistogramMapping>& storedMessages, const std::optional<unsigned int>& experiment,
126 const std::optional<unsigned int>& run,
127 EMessageTypes messageType);
129 void clear() {}
130
132 std::string getMonitoringJSON() const final { return m_output.getMonitoringJSON(); }
134 void handleIncomingData() { m_output.handleIncomingData(); }
136 bool isReady() const override { return m_output.isReady(); }
138 std::vector<zmq::socket_t*> getSockets() const final { return m_output.getSockets(); }
140 std::string getEndPoint() const { return m_output.getEndPoint(); }
141 private:
144 };
145
146}
Output part of a confirmed connection.
Base class for every connection with virtual functions to be implemented:
std::string m_rootFileName
Output file name (possible with placeholders)
std::string getEndPoint() const
Return the connection string.
void handleIncomingData()
There should be never incoming data, so raise an exception if called anyways.
std::string m_dqmMemFileName
Name of the shared memory.
void mergeAndSend(const std::map< std::string, HistogramMapping > &storedMessages, const std::optional< unsigned int > &experiment, const std::optional< unsigned int > &run, EMessageTypes messageType)
Merge the given histograms into a single set of histograms and store them to file/shm.
void clear()
Clear the shared memory.
std::vector< zmq::socket_t * > getSockets() const final
No sockets to poll on, so return an empty list.
ZMQHistoServerToFileOutput(const std::string &dqmFileName, const std::string &rootFileName)
Create a new connection initializing the DQMMemFile.
std::string getEndPoint() const
Return the connection string.
ZMQHistoServerToRawOutput(const std::string &outputAddress, const std::shared_ptr< ZMQParent > &parent)
Create a new raw output with the given address.
void handleIncomingData()
The input data handling is passed from the ZMQRawOutput.
ZMQRawOutput m_output
The output connection used for sending the histograms.
void mergeAndSend(const std::map< std::string, HistogramMapping > &storedMessages, const std::optional< unsigned int > &experiment, const std::optional< unsigned int > &run, EMessageTypes messageType)
Merge the histograms and send them via the connection. Stop/Terminate messages are not sent.
bool isReady() const override
The is ready is passed from the ZMQRawOutput.
std::string getMonitoringJSON() const final
The monitoring JSON is just passed from the ZMQRawOutput.
void clear()
Nothing to do on clear.
std::vector< zmq::socket_t * > getSockets() const final
The sockets are passed from ZMQRawOutput.
ZMQConfirmedOutput m_output
The output connection used for sending the histograms.
std::string getEndPoint() const
Return the connection string.
std::vector< char > m_outputBuffer
Buffer used for compression.
void handleIncomingData()
The input data handling is passed from the ZMQConfirmedOutput.
ZMQHistoServerToZMQOutput(const std::string &outputAddress, const std::shared_ptr< ZMQParent > &parent)
Initialize the ZMQConfirmedOutput with the given address.
void mergeAndSend(const std::map< std::string, HistogramMapping > &storedMessages, const std::optional< unsigned int > &experiment, const std::optional< unsigned int > &run, EMessageTypes messageType)
Merge the histograms and send them via the connection. Stop/Terminate messages are sent after that.
unsigned int m_maximalCompressedSize
Maximal size of the compression buffer.
bool isReady() const override
The is ready is passed from the ZMQConfirmedOutput.
std::string getMonitoringJSON() const final
The monitoring JSON is just passed from the ZMQConfirmedOutput.
void clear()
Nothing to do on clear.
std::vector< zmq::socket_t * > getSockets() const final
The sockets are passed from ZMQConfirmedOutput.
Output connection to speak to non-zmq sockets via a ZMQ_STREAM socket.
EMessageTypes
Type the messages can have.
Abstract base class for different kinds of events.