12 #include <daq/hbasf2/apps/ZMQApp.h>
20 template <
class AInputConnection,
class AOutputConnection>
23 po::options_description desc(description);
26 po::positional_options_description p;
31 po::command_line_parser(argc, argv).options(desc).positional(p).run(), vm);
32 }
catch (std::exception& e) {
36 if (vm.count(
"help")) {
37 std::cout << desc << std::endl;
43 }
catch (std::exception& e) {
50 template <
class AInputConnection,
class AOutputConnection>
55 while (not terminated()) {
56 if (not m_output->isReady()) {
59 m_monitor->log(
"output_state",
"not_ready");
63 m_monitor->log(
"output_state",
"ready");
69 template <
class AInputConnection,
class AOutputConnection>
72 auto reactToOutput = [
this]() {
76 if (m_monitorHasPriority) {
82 auto reactToMonitor = [
this]() {
89 auto reactToInput = [
this]() {
93 if (m_monitorHasPriority) {
99 m_monitor->logTime(
"waiting_since");
102 ZMQConnection::poll({{m_output.get(), reactToOutput}, {m_monitor.get(), reactToMonitor}, {m_input.get(), reactToInput}},
105 ZMQConnection::poll({{m_output.get(), reactToOutput}, {m_monitor.get(), reactToMonitor}}, m_remainingTime);
108 if (checkTimer() and not terminated()) {
109 B2ASSERT(
"There is no timeout set, but we still call the timeout() function? A bug!", m_timeout != 0);
110 m_monitor->increment(
"timeouts");
118 template <
class AInputConnection,
class AOutputConnection>
121 m_parent.reset(
new ZMQParent);
122 m_monitor.reset(
new ZMQSimpleConnection(m_monitoringAddress, m_parent));
125 template <
class AInputConnection,
class AOutputConnection>
128 desc.add_options()(
"help,h",
"Print this help message")(
"monitor", po::value<std::string>(&m_monitoringAddress)->required(),
129 "where to listen for monitoring");
132 template <
class AInputConnection,
class AOutputConnection>
137 template <
class AInputConnection,
class AOutputConnection>
140 m_input->handleIncomingData();
143 template <
class AInputConnection,
class AOutputConnection>
146 m_output->handleIncomingData();
149 template <
class AInputConnection,
class AOutputConnection>
154 template <
class AInputConnection,
class AOutputConnection>
157 return not m_mainLoop.isRunning() or m_terminate;
160 template <
class AInputConnection,
class AOutputConnection>
164 return m_remainingTime == 0;
167 template <
class AInputConnection,
class AOutputConnection>
171 if (m_timeout == 0) {
172 m_remainingTime = -1;
176 auto currentTime = std::chrono::system_clock::now();
177 auto timeDifference = std::chrono::duration_cast<std::chrono::milliseconds>(currentTime - m_start);
178 m_remainingTime = m_timeout * 1000 - timeDifference.count();
179 if (m_remainingTime < 0) {
184 template <
class AInputConnection,
class AOutputConnection>
188 if (m_timeout == 0) {
189 m_remainingTime = -1;
193 m_start = std::chrono::system_clock::now();
194 m_remainingTime = m_timeout * 1000;
197 template <
class AInputConnection,
class AOutputConnection>
200 auto monitoringMessage = m_monitor->handleIncomingData();
202 if (monitoringMessage->isMessage(EMessageTypes::c_newRunMessage)) {
203 handleExternalSignal(EMessageTypes::c_newRunMessage);
205 }
else if (monitoringMessage->isMessage(EMessageTypes::c_lastEventMessage)) {
206 handleExternalSignal(EMessageTypes::c_lastEventMessage);
208 }
else if (monitoringMessage->isMessage(EMessageTypes::c_terminateMessage)) {
209 handleExternalSignal(EMessageTypes::c_terminateMessage);
213 std::stringstream buffer;
214 fillMonitoringJSON(buffer);
217 EMessageTypes::c_confirmMessage, buffer.str());
218 m_monitor->handleEvent(std::move(message));
221 template <
class AInputConnection,
class AOutputConnection>
224 buffer <<
"{" << std::endl;
225 buffer <<
"\"monitor\": " << m_monitor->getMonitoringJSON() <<
"," << std::endl;
226 buffer <<
"\"input\": " << m_input->getMonitoringJSON() <<
"," << std::endl;
227 buffer <<
"\"output\": " << m_output->getMonitoringJSON() << std::endl;
228 buffer <<
"}" << std::endl;