Belle II Software  release-08-01-10
ZMQHistogramServer.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include <daq/hbasf2/apps/ZMQHistogramServer.h>
9 #include <framework/pcore/zmq/connections/ZMQConfirmedConnection.h>
10 #include <daq/hbasf2/connections/ZMQHistogramConnection.h>
11 
12 using namespace Belle2;
13 
15 {
18  m_output.reset(
20 }
21 
22 void ZMQHistogramToFileServer::addOptions(po::options_description& desc)
23 {
24  m_timeout = 30;
26  desc.add_options()
27  ("input", boost::program_options::value<std::string>(&m_inputAddress)->required(),
28  "where to read the events from")
29  ("sharedMemoryName",
30  boost::program_options::value<std::string>(&m_sharedMemoryName)->default_value(m_sharedMemoryName),
31  "name of the shared memory")
32  ("rootFileName", boost::program_options::value<std::string>(&m_rootFileName)->required(),
33  "name of the ROOT file - can include {run_number} or {experiment_number}")
34  ("timeout", boost::program_options::value<unsigned int>(&m_timeout)->default_value(m_timeout),
35  "how many seconds to wait between histogram dumps")
36  ("maximalUncompressedBufferSize",
37  boost::program_options::value<unsigned int>(&m_maximalUncompressedBufferSize)->default_value(
39  "size of the uncompress buffer");
40 }
41 
43 {
44  if (type == EMessageTypes::c_newRunMessage) {
45  m_input->clear();
46  m_output->clear();
47  return;
48  } else if (type == EMessageTypes::c_lastEventMessage) {
49  auto message = m_input->overwriteStopMessage();
50  if (message) {
51  m_output->handleEvent(std::move(message));
52  }
53  return;
54  }
55 }
56 
58 {
59  auto message = m_input->handleIncomingData();
60  if (message) {
61  if (message->isMessage(EMessageTypes::c_terminateMessage)) {
62  m_terminate = true;
63  }
64 
65  m_output->handleEvent(std::move(message));
66  }
67 }
68 
70 {
71  m_output->mergeAndSend();
72 }
73 
75 {
79 }
80 
81 void ZMQHistogramToZMQServer::addOptions(po::options_description& desc)
82 {
84  desc.add_options()
85  ("input", boost::program_options::value<std::string>(&m_inputAddress)->required(),
86  "where to read the events from")
87  ("output", boost::program_options::value<std::string>(&m_outputAddress)->required(),
88  "where to send the events to")
89  ("timeout", boost::program_options::value<unsigned int>(&m_timeout)->default_value(30),
90  "how many seconds to wait between histogram dumps")
91  ("maximalUncompressedBufferSize",
92  boost::program_options::value<unsigned int>(&m_maximalUncompressedBufferSize)->default_value(
94  "size of the uncompress buffer");
95 }
96 
98 {
99  if (type == EMessageTypes::c_newRunMessage) {
100  m_input->clear();
101  m_output->clear();
102  return;
103  } else if (type == EMessageTypes::c_lastEventMessage) {
104  auto message = m_input->overwriteStopMessage();
105  if (message) {
106  m_output->handleEvent(std::move(message));
107  }
108  return;
109  }
110 }
111 
113 {
114  auto message = m_input->handleIncomingData();
115  if (message) {
116  if (message->isMessage(EMessageTypes::c_terminateMessage)) {
117  m_terminate = true;
118  }
119 
120  m_output->handleEvent(std::move(message));
121  }
122 }
123 
125 {
126  m_output->mergeAndSend();
127 }
128 
130 {
134 }
135 
136 void ZMQHistogramToRawServer::addOptions(po::options_description& desc)
137 {
139  desc.add_options()
140  ("input", boost::program_options::value<std::string>(&m_inputAddress)->required(),
141  "where to read the events from")
142  ("output", boost::program_options::value<std::string>(&m_outputAddress)->required(),
143  "where to send the events to")
144  ("timeout", boost::program_options::value<unsigned int>(&m_timeout)->default_value(30),
145  "how many seconds to wait between histogram dumps")
146  ("maximalUncompressedBufferSize",
147  boost::program_options::value<unsigned int>(&m_maximalUncompressedBufferSize)->default_value(
149  "size of the uncompress buffer");
150 }
151 
153 {
154  if (type == EMessageTypes::c_newRunMessage) {
155  m_input->clear();
156  m_output->clear();
157  return;
158  } else if (type == EMessageTypes::c_lastEventMessage) {
159  auto message = m_input->overwriteStopMessage();
160  if (message) {
161  m_output->handleEvent(std::move(message));
162  }
163  return;
164  }
165 }
166 
168 {
169  auto message = m_input->handleIncomingData();
170  if (message) {
171  if (message->isMessage(EMessageTypes::c_terminateMessage)) {
172  m_terminate = true;
173  }
174 
175  m_output->handleEvent(std::move(message));
176  }
177 }
178 
180 {
181  m_output->mergeAndSend();
182 }
Input part of a confirmed connection.
Add the common functionality to the histogram output classes.
std::string m_rootFileName
Parameter: name of the root file - can include {run_number} or {experiment_number}.
std::string m_sharedMemoryName
Parameter: name of the shared memory (or empty)
std::string m_inputAddress
Parameter: input address.
void initialize() final
Initialize the two connections using the command line arguments.
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
unsigned int m_maximalUncompressedBufferSize
Parameter: size of the temporary internal compression buffer.
void handleExternalSignal(EMessageTypes type) final
Merge on stop (even if not all clients have stopped) or clear the counters on start from the monitori...
void handleTimeout() final
Call the mergeAndSend function on timeout.
void handleInput() final
Pass the message from the input connection to the histogram storage.
std::string m_inputAddress
Parameter: input address.
void initialize() final
Initialize the two connections using the command line arguments.
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
unsigned int m_maximalUncompressedBufferSize
Parameter: size of the temporary internal compression buffer.
std::string m_outputAddress
Parameter: output address.
void handleExternalSignal(EMessageTypes type) final
Merge on stop (even if not all clients have stopped) or clear the counters on start from the monitori...
void handleTimeout() final
Call the mergeAndSend function on timeout.
void handleInput() final
Pass the message from the input connection to the histogram storage.
std::string m_inputAddress
Parameter: input address.
void initialize() final
Initialize the two connections using the command line arguments.
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
unsigned int m_maximalUncompressedBufferSize
Parameter: size of the temporary internal compression buffer.
std::string m_outputAddress
Parameter: output address.
void handleExternalSignal(EMessageTypes type) final
Merge on stop (even if not all clients have stopped) or clear the counters on start from the monitori...
void handleTimeout() final
Call the mergeAndSend function on timeout.
void handleInput() final
Pass the message from the input connection to the histogram storage.
std::shared_ptr< ZMQParent > m_parent
Pointer to the ZMQParent to be used as base for all connections.
Definition: ZMQApp.h:65
std::unique_ptr< ZMQHistoServerToFile > m_output
Pointer to the output connection. Should be set in initialize.
Definition: ZMQApp.h:69
bool m_terminate
Can be set by functions to terminate the main loop at the next possibility.
Definition: ZMQApp.h:73
std::unique_ptr< ZMQConfirmedInput > m_input
Pointer to the input connection. Should be set in initialize.
Definition: ZMQApp.h:67
unsigned int m_timeout
If set to a value != 0, will call handleTimeout with this frequency (in seconds).
Definition: ZMQApp.h:75
virtual void initialize()
Override in a derived class to initialize the connections from the given command line arguments....
virtual void addOptions(po::options_description &desc)
Override in a derived class to add the command line arguments. Do not forget to call this base functi...
EMessageTypes
Type the messages can have.
Abstract base class for different kinds of events.