Belle II Software  release-06-01-15
ZMQHistogramConnection.h
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #pragma once
9 
10 #include <framework/pcore/zmq/connections/ZMQConnection.h>
11 #include <daq/hbasf2/utils/HistogramMapping.h>
12 
13 #include <framework/pcore/zmq/connections/ZMQNullConnection.h>
14 #include <framework/pcore/zmq/connections/ZMQConfirmedConnection.h>
15 #include <framework/pcore/zmq/connections/ZMQRawConnection.h>
16 
17 #include <framework/pcore/zmq/messages/ZMQIdMessage.h>
18 
19 #include <framework/pcore/EvtMessage.h>
20 #include <daq/dqm/DqmMemFile.h>
21 #include <daq/rfarm/manager/SharedMem.h>
22 
23 #include <string>
24 #include <memory>
25 #include <map>
26 #include <optional>
27 
28 namespace Belle2 {
53  public:
55  ZMQHistoServerToFileOutput(unsigned int maximalUncompressedBufferSize,
56  const std::string& dqmFileName,
57  const std::string& rootFileName);
58 
60  void mergeAndSend(const std::map<std::string, HistogramMapping>& storedMessages, const std::optional<unsigned int>& experiment,
61  const std::optional<unsigned int>& run,
62  EMessageTypes messageType);
64  void clear();
65 
67  std::vector<zmq::socket_t*> getSockets() const final;
69  void handleIncomingData();
71  std::string getEndPoint() const { return "file://" + m_rootFileName; }
72  private:
76  std::string m_dqmMemFileName;
78  std::string m_rootFileName;
79  };
80 
90  public:
92  ZMQHistoServerToZMQOutput(const std::string& outputAddress, const std::shared_ptr<ZMQParent>& parent);
93 
95  void mergeAndSend(const std::map<std::string, HistogramMapping>& storedMessages, const std::optional<unsigned int>& experiment,
96  const std::optional<unsigned int>& run,
97  EMessageTypes messageType);
99  void clear() {}
100 
102  std::string getMonitoringJSON() const final { return m_output.getMonitoringJSON(); }
106  bool isReady() const { return m_output.isReady(); }
108  std::vector<zmq::socket_t*> getSockets() const final { return m_output.getSockets(); }
110  std::string getEndPoint() const { return m_output.getEndPoint(); }
111  private:
114 
116  std::vector<char> m_outputBuffer;
118  unsigned int m_maximalCompressedSize = 100'000'000;
119  };
120 
129  public:
131  ZMQHistoServerToRawOutput(const std::string& outputAddress, const std::shared_ptr<ZMQParent>& parent);
132 
134  void mergeAndSend(const std::map<std::string, HistogramMapping>& storedMessages, const std::optional<unsigned int>& experiment,
135  const std::optional<unsigned int>& run,
136  EMessageTypes messageType);
138  void clear() {}
139 
141  std::string getMonitoringJSON() const final { return m_output.getMonitoringJSON(); }
145  bool isReady() const { return m_output.isReady(); }
147  std::vector<zmq::socket_t*> getSockets() const final { return m_output.getSockets(); }
149  std::string getEndPoint() const { return m_output.getEndPoint(); }
150  private:
153  };
155 }
Output part of a confirmed connection.
void handleIncomingData()
Blocks until it can receive the (hopefully confirmation) message from the output.
std::string getEndPoint() const
Return the connection string for this socket.
std::vector< zmq::socket_t * > getSockets() const final
The socket used for polling is just the stored socket.
Base class for every connection with virtual functions to be implemented:
Definition: ZMQConnection.h:30
virtual bool isReady() const
Return true of this connection is able to send messages right now. Can be overloaded in derived class...
Output histograms into a ROOT file and shared memory after merging.
std::string m_rootFileName
Output file name (possible with placeholders)
std::string getEndPoint() const
Return the connection string.
void handleIncomingData()
There should be never incoming data, so raise an exception if called anyways.
ZMQHistoServerToFileOutput(unsigned int maximalUncompressedBufferSize, const std::string &dqmFileName, const std::string &rootFileName)
Create a new connection initializing the DQMMemFile.
std::string m_dqmMemFileName
Name of the shared memory.
void mergeAndSend(const std::map< std::string, HistogramMapping > &storedMessages, const std::optional< unsigned int > &experiment, const std::optional< unsigned int > &run, EMessageTypes messageType)
Merge the given histograms into a single set of histograms and store them to file/shm.
void clear()
Clear the shared memory.
std::vector< zmq::socket_t * > getSockets() const final
No sockets to poll on, so return an empty list.
SharedMem * m_sharedMemory
The SHM file. Please note that we do not call its destructor on purpose.
Same as ZMQHistoServerToZMQOutput just send uncompressed to a raw output.
bool isReady() const
The is ready is passed from the ZMQRawOutput.
std::string getEndPoint() const
Return the connection string.
ZMQHistoServerToRawOutput(const std::string &outputAddress, const std::shared_ptr< ZMQParent > &parent)
Create a new raw output with the given address.
void handleIncomingData()
The input data handling is passed from the ZMQRawOutput.
ZMQRawOutput m_output
The output connection used for sending the histograms.
void mergeAndSend(const std::map< std::string, HistogramMapping > &storedMessages, const std::optional< unsigned int > &experiment, const std::optional< unsigned int > &run, EMessageTypes messageType)
Merge the histograms and send them via the connection. Stop/Terminate messages are not sent.
std::vector< zmq::socket_t * > getSockets() const final
The sockets are passed from ZMQRawOutput.
std::string getMonitoringJSON() const final
The monitoring JSON is just passed from the ZMQRawOutput.
void clear()
Nothing to do on clear.
Send the histograms as compressed byte stream via a ZMQConfirmedOutput connection after merging with ...
bool isReady() const
The is ready is passed from the ZMQConfirmedOutput.
ZMQConfirmedOutput m_output
The output connection used for sending the histograms.
std::string getEndPoint() const
Return the connection string.
std::vector< char > m_outputBuffer
Buffer used for compression.
void handleIncomingData()
The input data handling is passed from the ZMQConfirmedOutput.
ZMQHistoServerToZMQOutput(const std::string &outputAddress, const std::shared_ptr< ZMQParent > &parent)
Initialize the ZMQConfirmedOutput with the given address.
void mergeAndSend(const std::map< std::string, HistogramMapping > &storedMessages, const std::optional< unsigned int > &experiment, const std::optional< unsigned int > &run, EMessageTypes messageType)
Merge the histograms and send them via the connection. Stop/Terminate messages are sent after that.
unsigned int m_maximalCompressedSize
Maximal size of the compression buffer.
std::vector< zmq::socket_t * > getSockets() const final
The sockets are passed from ZMQConfirmedOutput.
std::string getMonitoringJSON() const final
The monitoring JSON is just passed from the ZMQConfirmedOutput.
void clear()
Nothing to do on clear.
virtual std::string getMonitoringJSON() const
Convert the stored monitoring values to a JSON string ready for sending out via a message.
Definition: ZMQLogger.cc:15
Output connection to speak to non-zmq sockets via a ZMQ_STREAM socket.
void handleIncomingData()
Handle incoming data: a socket (dis)connect.
bool isReady() const final
If no socket is connected, this connection is not ready.
EMessageTypes
Type the messages can have.
Abstract base class for different kinds of events.