Belle II Software development
ZMQConfirmedConnection.h
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#pragma once
9
10#include <framework/pcore/zmq/utils/ZMQParent.h>
11#include <framework/pcore/zmq/connections/ZMQConnection.h>
12
13#include <framework/pcore/zmq/messages/ZMQIdMessage.h>
14#include <framework/pcore/zmq/messages/ZMQNoIdMessage.h>
15
16#include <set>
17#include <string>
18#include <memory>
19
20
21namespace Belle2 {
56 public:
58 ZMQConfirmedInput(const std::string& inputAddress, const std::shared_ptr<ZMQParent>& parent);
59
66 std::unique_ptr<ZMQIdMessage> handleIncomingData();
67
69 void clear();
70
72 std::unique_ptr<ZMQIdMessage> overwriteStopMessage();
73
74 private:
76 std::set<std::string> m_receivedStopMessages;
78 bool m_allStopMessages = false;
80 std::set<std::string> m_receivedTerminateMessages;
84 std::set<std::string> m_registeredWorkersInput;
85 };
86
107 public:
109 ZMQConfirmedOutput(const std::string& outputAddress, const std::shared_ptr<ZMQParent>& parent);
110
117 void handleEvent(std::unique_ptr<ZMQNoIdMessage> message, bool requireConfirmation = true, int maximalWaitTime = 10000);
118
124 void handleIncomingData();
125
126 private:
128 unsigned int m_waitingForConfirmation = 0;
131 };
133}
Input part of a confirmed connection.
std::set< std::string > m_receivedStopMessages
The set of input identities which have already sent a stop message.
std::unique_ptr< ZMQIdMessage > handleIncomingData()
Block until a message can be received from one of the inputs.
std::set< std::string > m_receivedTerminateMessages
The set of input identities which have already sent a terminate message.
std::set< std::string > m_registeredWorkersInput
The set of all registered inputs.
bool m_allTerminateMessages
Have we received all terminante messages?
void clear()
Reset the counters for all received stop and terminate messages. Should be called on run start.
bool m_allStopMessages
Have we received all stop messages?
std::unique_ptr< ZMQIdMessage > overwriteStopMessage()
Manually overwrite the stop message counter and set it to have all stop messages received.
Output part of a confirmed connection.
void handleEvent(std::unique_ptr< ZMQNoIdMessage > message, bool requireConfirmation=true, int maximalWaitTime=10000)
Send the message to the output (a message without a ID as there is only a single output).
long m_timespanWaitingForConfirmation
Internal monitoring how long we were waiting for confirmation messages.
void handleIncomingData()
Blocks until it can receive the (hopefully confirmation) message from the output.
unsigned int m_waitingForConfirmation
On how many confirmation messages are we still waiting?
Specialized connection over a ZMQ socket.
Definition: ZMQConnection.h:63
Abstract base class for different kinds of events.