Belle II Software  release-05-01-25
ZMQHistogramServer.cc
1 /**************************************************************************
2  * BASF2 (Belle Analysis Framework 2) *
3  * Copyright(C) 2019 - Belle II Collaboration *
4  * *
5  * Author: The Belle II Collaboration *
6  * Contributors: Nils Braun *
7  * *
8  * This software is provided "as is" without any warranty. *
9  **************************************************************************/
10 #include <daq/hbasf2/apps/ZMQHistogramServer.h>
11 #include <framework/pcore/zmq/connections/ZMQConfirmedConnection.h>
12 #include <daq/hbasf2/connections/ZMQHistogramConnection.h>
13 
14 using namespace Belle2;
15 
17 {
20  m_output.reset(
22 }
23 
24 void ZMQHistogramToFileServer::addOptions(po::options_description& desc)
25 {
26  m_timeout = 30;
28  desc.add_options()
29  ("input", boost::program_options::value<std::string>(&m_inputAddress)->required(),
30  "where to read the events from")
31  ("sharedMemoryName",
32  boost::program_options::value<std::string>(&m_sharedMemoryName)->default_value(m_sharedMemoryName),
33  "name of the shared memory")
34  ("rootFileName", boost::program_options::value<std::string>(&m_rootFileName)->required(),
35  "name of the ROOT file - can include {run_number} or {experiment_number}")
36  ("timeout", boost::program_options::value<unsigned int>(&m_timeout)->default_value(m_timeout),
37  "how many seconds to wait between histogram dumps")
38  ("maximalUncompressedBufferSize",
39  boost::program_options::value<unsigned int>(&m_maximalUncompressedBufferSize)->default_value(
41  "size of the uncompress buffer");
42 }
43 
45 {
46  if (type == EMessageTypes::c_newRunMessage) {
47  m_input->clear();
48  m_output->clear();
49  return;
50  } else if (type == EMessageTypes::c_lastEventMessage) {
51  auto message = m_input->overwriteStopMessage();
52  if (message) {
53  m_output->handleEvent(std::move(message));
54  }
55  return;
56  }
57 }
58 
60 {
61  auto message = m_input->handleIncomingData();
62  if (message) {
63  if (message->isMessage(EMessageTypes::c_terminateMessage)) {
64  m_terminate = true;
65  }
66 
67  m_output->handleEvent(std::move(message));
68  }
69 }
70 
72 {
73  m_output->mergeAndSend();
74 }
75 
77 {
81 }
82 
83 void ZMQHistogramToZMQServer::addOptions(po::options_description& desc)
84 {
86  desc.add_options()
87  ("input", boost::program_options::value<std::string>(&m_inputAddress)->required(),
88  "where to read the events from")
89  ("output", boost::program_options::value<std::string>(&m_outputAddress)->required(),
90  "where to send the events to")
91  ("timeout", boost::program_options::value<unsigned int>(&m_timeout)->default_value(30),
92  "how many seconds to wait between histogram dumps")
93  ("maximalUncompressedBufferSize",
94  boost::program_options::value<unsigned int>(&m_maximalUncompressedBufferSize)->default_value(
96  "size of the uncompress buffer");
97 }
98 
100 {
101  if (type == EMessageTypes::c_newRunMessage) {
102  m_input->clear();
103  m_output->clear();
104  return;
105  } else if (type == EMessageTypes::c_lastEventMessage) {
106  auto message = m_input->overwriteStopMessage();
107  if (message) {
108  m_output->handleEvent(std::move(message));
109  }
110  return;
111  }
112 }
113 
115 {
116  auto message = m_input->handleIncomingData();
117  if (message) {
118  if (message->isMessage(EMessageTypes::c_terminateMessage)) {
119  m_terminate = true;
120  }
121 
122  m_output->handleEvent(std::move(message));
123  }
124 }
125 
127 {
128  m_output->mergeAndSend();
129 }
130 
132 {
136 }
137 
138 void ZMQHistogramToRawServer::addOptions(po::options_description& desc)
139 {
141  desc.add_options()
142  ("input", boost::program_options::value<std::string>(&m_inputAddress)->required(),
143  "where to read the events from")
144  ("output", boost::program_options::value<std::string>(&m_outputAddress)->required(),
145  "where to send the events to")
146  ("timeout", boost::program_options::value<unsigned int>(&m_timeout)->default_value(30),
147  "how many seconds to wait between histogram dumps")
148  ("maximalUncompressedBufferSize",
149  boost::program_options::value<unsigned int>(&m_maximalUncompressedBufferSize)->default_value(
151  "size of the uncompress buffer");
152 }
153 
155 {
156  if (type == EMessageTypes::c_newRunMessage) {
157  m_input->clear();
158  m_output->clear();
159  return;
160  } else if (type == EMessageTypes::c_lastEventMessage) {
161  auto message = m_input->overwriteStopMessage();
162  if (message) {
163  m_output->handleEvent(std::move(message));
164  }
165  return;
166  }
167 }
168 
170 {
171  auto message = m_input->handleIncomingData();
172  if (message) {
173  if (message->isMessage(EMessageTypes::c_terminateMessage)) {
174  m_terminate = true;
175  }
176 
177  m_output->handleEvent(std::move(message));
178  }
179 }
180 
182 {
183  m_output->mergeAndSend();
184 }
Belle2::ZMQHistogramToRawServer::m_maximalUncompressedBufferSize
unsigned int m_maximalUncompressedBufferSize
Parameter: size of the temporary internal compression buffer.
Definition: ZMQHistogramServer.h:124
Belle2::ZMQHistogramToFileServer::m_sharedMemoryName
std::string m_sharedMemoryName
Parameter: name of the shared memory (or empty)
Definition: ZMQHistogramServer.h:57
Belle2::ZMQHistogramToFileServer::m_inputAddress
std::string m_inputAddress
Parameter: input address.
Definition: ZMQHistogramServer.h:55
Belle2::ZMQStandardApp< ZMQConfirmedInput, ZMQHistoServerToFile >::m_output
std::unique_ptr< ZMQHistoServerToFile > m_output
Pointer to the output connection. Should be set in initialize.
Definition: ZMQApp.h:79
Belle2::ZMQHistogramToFileServer::handleExternalSignal
void handleExternalSignal(EMessageTypes type) final
Merge on stop (even if not all clients have stopped) or clear the counters on start from the monitori...
Definition: ZMQHistogramServer.cc:44
Belle2::EMessageTypes
EMessageTypes
Type the messages can have.
Definition: ZMQDefinitions.h:26
Belle2::ZMQHistogramToFileServer::m_rootFileName
std::string m_rootFileName
Parameter: name of the root file - can include {run_number} or {experiment_number}.
Definition: ZMQHistogramServer.h:59
Belle2::ZMQHistogramToZMQServer::initialize
void initialize() final
Initialize the two connections using the command line arguments.
Definition: ZMQHistogramServer.cc:76
Belle2::ZMQHistogramToFileServer::initialize
void initialize() final
Initialize the two connections using the command line arguments.
Definition: ZMQHistogramServer.cc:16
Belle2::ZMQConfirmedInput
Input part of a confirmed connection.
Definition: ZMQConfirmedConnection.h:65
Belle2::ZMQHistogramToRawServer::m_outputAddress
std::string m_outputAddress
Parameter: output address.
Definition: ZMQHistogramServer.h:122
Belle2::ZMQHistogramToFileServer::handleTimeout
void handleTimeout() final
Call the mergeAndSend function on timeout.
Definition: ZMQHistogramServer.cc:71
Belle2::ZMQHistogramToRawServer::initialize
void initialize() final
Initialize the two connections using the command line arguments.
Definition: ZMQHistogramServer.cc:131
Belle2::ZMQStandardApp< ZMQConfirmedInput, ZMQHistoServerToFile >::m_terminate
bool m_terminate
Can be set by functions to terminate the main loop at the next possibility.
Definition: ZMQApp.h:83
Belle2::ZMQHistogramToFileServer::handleInput
void handleInput() final
Pass the message from the input connection to the histogram storage.
Definition: ZMQHistogramServer.cc:59
Belle2::ZMQStandardApp< ZMQConfirmedInput, ZMQHistoServerToFile >::m_parent
std::shared_ptr< ZMQParent > m_parent
Pointer to the ZMQParent to be used as base for all connections.
Definition: ZMQApp.h:75
Belle2::ZMQHistogramToRawServer::handleExternalSignal
void handleExternalSignal(EMessageTypes type) final
Merge on stop (even if not all clients have stopped) or clear the counters on start from the monitori...
Definition: ZMQHistogramServer.cc:154
Belle2::ZMQHistogramToZMQServer::handleTimeout
void handleTimeout() final
Call the mergeAndSend function on timeout.
Definition: ZMQHistogramServer.cc:126
Belle2::ZMQStandardApp::addOptions
virtual void addOptions(po::options_description &desc)
Override in a derived class to add the command line arguments. Do not forget to call this base functi...
Definition: ZMQApp.details.h:134
Belle2::ZMQHistogramToZMQServer::handleExternalSignal
void handleExternalSignal(EMessageTypes type) final
Merge on stop (even if not all clients have stopped) or clear the counters on start from the monitori...
Definition: ZMQHistogramServer.cc:99
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::ZMQHistogramToRawServer::handleTimeout
void handleTimeout() final
Call the mergeAndSend function on timeout.
Definition: ZMQHistogramServer.cc:181
Belle2::ZMQHistogramToRawServer::m_inputAddress
std::string m_inputAddress
Parameter: input address.
Definition: ZMQHistogramServer.h:120
Belle2::ZMQHistogramToFileServer::m_maximalUncompressedBufferSize
unsigned int m_maximalUncompressedBufferSize
Parameter: size of the temporary internal compression buffer.
Definition: ZMQHistogramServer.h:61
Belle2::ZMQHistogramToRawServer::addOptions
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
Definition: ZMQHistogramServer.cc:138
Belle2::ZMQHistogramToZMQServer::m_maximalUncompressedBufferSize
unsigned int m_maximalUncompressedBufferSize
Parameter: size of the temporary internal compression buffer.
Definition: ZMQHistogramServer.h:93
Belle2::ZMQHistogramToZMQServer::addOptions
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
Definition: ZMQHistogramServer.cc:83
Belle2::ZMQHistogramToZMQServer::handleInput
void handleInput() final
Pass the message from the input connection to the histogram storage.
Definition: ZMQHistogramServer.cc:114
Belle2::ZMQHistogramToZMQServer::m_inputAddress
std::string m_inputAddress
Parameter: input address.
Definition: ZMQHistogramServer.h:89
Belle2::ZMQHistogramToRawServer::handleInput
void handleInput() final
Pass the message from the input connection to the histogram storage.
Definition: ZMQHistogramServer.cc:169
Belle2::ZMQHistogramToFileServer::addOptions
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
Definition: ZMQHistogramServer.cc:24
Belle2::ZMQStandardApp::initialize
virtual void initialize()
Override in a derived class to initialize the connections from the given command line arguments....
Definition: ZMQApp.details.h:127
Belle2::ZMQStandardApp< ZMQConfirmedInput, ZMQHistoServerToFile >::m_timeout
unsigned int m_timeout
If set to a value != 0, will call handleTimeout with this frequency (in seconds).
Definition: ZMQApp.h:85
Belle2::ZMQStandardApp< ZMQConfirmedInput, ZMQHistoServerToFile >::m_input
std::unique_ptr< ZMQConfirmedInput > m_input
Pointer to the input connection. Should be set in initialize.
Definition: ZMQApp.h:77
Belle2::ZMQHistogramOutput
Add the common functionality to the histogram output classes.
Definition: ZMQHistogramOutput.h:66
Belle2::ZMQHistogramToZMQServer::m_outputAddress
std::string m_outputAddress
Parameter: output address.
Definition: ZMQHistogramServer.h:91