Belle II Software  release-08-01-10
ZMQConfirmedConnection.h
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #pragma once
9 
10 #include <framework/pcore/zmq/utils/ZMQParent.h>
11 #include <framework/pcore/zmq/connections/ZMQConnection.h>
12 
13 #include <framework/pcore/zmq/messages/ZMQIdMessage.h>
14 #include <framework/pcore/zmq/messages/ZMQNoIdMessage.h>
15 
16 #include <set>
17 #include <string>
18 #include <memory>
19 
20 
21 namespace Belle2 {
56  public:
58  ZMQConfirmedInput(const std::string& inputAddress, const std::shared_ptr<ZMQParent>& parent);
59 
66  std::unique_ptr<ZMQIdMessage> handleIncomingData();
67 
69  void clear();
70 
72  std::unique_ptr<ZMQIdMessage> overwriteStopMessage();
73 
74  private:
76  std::set<std::string> m_receivedStopMessages;
78  bool m_allStopMessages = false;
80  std::set<std::string> m_receivedTerminateMessages;
82  bool m_allTerminateMessages = false;
84  std::set<std::string> m_registeredWorkersInput;
85  };
86 
107  public:
109  ZMQConfirmedOutput(const std::string& outputAddress, const std::shared_ptr<ZMQParent>& parent);
110 
117  void handleEvent(std::unique_ptr<ZMQNoIdMessage> message, bool requireConfirmation = true, int maximalWaitTime = 10000);
118 
124  void handleIncomingData();
125 
126  private:
128  unsigned int m_waitingForConfirmation = 0;
131  };
133 }
Input part of a confirmed connection.
std::set< std::string > m_receivedStopMessages
The set of input identities which have already sent a stop message.
std::unique_ptr< ZMQIdMessage > handleIncomingData()
Block until a message can be received from one of the inputs.
std::set< std::string > m_receivedTerminateMessages
The set of input identities which have already sent a terminate message.
std::set< std::string > m_registeredWorkersInput
The set of all registered inputs.
bool m_allTerminateMessages
Have we received all terminante messages?
ZMQConfirmedInput(const std::string &inputAddress, const std::shared_ptr< ZMQParent > &parent)
Create a new confirmed output by binding to the address.
void clear()
Reset the counters for all received stop and terminate messages. Should be called on run start.
bool m_allStopMessages
Have we received all stop messages?
std::unique_ptr< ZMQIdMessage > overwriteStopMessage()
Manually overwrite the stop message counter and set it to have all stop messages received.
Output part of a confirmed connection.
void handleEvent(std::unique_ptr< ZMQNoIdMessage > message, bool requireConfirmation=true, int maximalWaitTime=10000)
Send the message to the output (a message without a ID as there is only a single output).
long m_timespanWaitingForConfirmation
Internal monitoring how long we were waiting for confirmation messages.
void handleIncomingData()
Blocks until it can receive the (hopefully confirmation) message from the output.
unsigned int m_waitingForConfirmation
On how many confirmation messages are we still waiting?
ZMQConfirmedOutput(const std::string &outputAddress, const std::shared_ptr< ZMQParent > &parent)
Create a new confirmed output by connecting to the address.
Specialized connection over a ZMQ socket.
Definition: ZMQConnection.h:62
Abstract base class for different kinds of events.