Belle II Software  release-05-01-25
ZMQConfirmedConnection.h
1 /**************************************************************************
2  * BASF2 (Belle Analysis Framework 2) *
3  * Copyright(C) 2019 - Belle II Collaboration *
4  * *
5  * Author: The Belle II Collaboration *
6  * Contributors: Nils Braun *
7  * *
8  * This software is provided "as is" without any warranty. *
9  **************************************************************************/
10 #pragma once
11 
12 #include <framework/pcore/zmq/utils/ZMQParent.h>
13 #include <framework/pcore/zmq/connections/ZMQConnection.h>
14 
15 #include <framework/pcore/zmq/messages/ZMQIdMessage.h>
16 #include <framework/pcore/zmq/messages/ZMQNoIdMessage.h>
17 
18 #include <set>
19 #include <string>
20 #include <memory>
21 
22 
23 namespace Belle2 {
57  class ZMQConfirmedInput : public ZMQConnectionOverSocket {
58  public:
60  ZMQConfirmedInput(const std::string& inputAddress, const std::shared_ptr<ZMQParent>& parent);
61 
68  std::unique_ptr<ZMQIdMessage> handleIncomingData();
69 
71  void clear();
72 
74  std::unique_ptr<ZMQIdMessage> overwriteStopMessage();
75 
76  private:
78  std::set<std::string> m_receivedStopMessages;
80  bool m_allStopMessages = false;
82  std::set<std::string> m_receivedTerminateMessages;
84  bool m_allTerminateMessages = false;
86  std::set<std::string> m_registeredWorkersInput;
87  };
88 
109  public:
111  ZMQConfirmedOutput(const std::string& outputAddress, const std::shared_ptr<ZMQParent>& parent);
112 
119  void handleEvent(std::unique_ptr<ZMQNoIdMessage> message, bool requireConfirmation = true, int maximalWaitTime = 10000);
120 
126  void handleIncomingData();
127 
128  private:
130  unsigned int m_waitingForConfirmation = 0;
133  };
135 }
Belle2::ZMQConfirmedOutput::ZMQConfirmedOutput
ZMQConfirmedOutput(const std::string &outputAddress, const std::shared_ptr< ZMQParent > &parent)
Create a new confirmed output by connecting to the address.
Definition: ZMQConfirmedConnection.cc:209
Belle2::ZMQConfirmedInput::m_receivedStopMessages
std::set< std::string > m_receivedStopMessages
The set of input identities which have already sent a stop message.
Definition: ZMQConfirmedConnection.h:86
Belle2::ZMQConfirmedOutput
Output part of a confirmed connection.
Definition: ZMQConfirmedConnection.h:116
Belle2::ZMQConfirmedOutput::handleEvent
void handleEvent(std::unique_ptr< ZMQNoIdMessage > message, bool requireConfirmation=true, int maximalWaitTime=10000)
Send the message to the output (a message without a ID as there is only a single output).
Definition: ZMQConfirmedConnection.cc:230
Belle2::ZMQConfirmedInput::m_registeredWorkersInput
std::set< std::string > m_registeredWorkersInput
The set of all registered inputs.
Definition: ZMQConfirmedConnection.h:94
Belle2::ZMQConfirmedOutput::m_waitingForConfirmation
unsigned int m_waitingForConfirmation
On how many confirmation messages are we still waiting?
Definition: ZMQConfirmedConnection.h:138
Belle2::ZMQConfirmedInput::m_allTerminateMessages
bool m_allTerminateMessages
Have we received all terminante messages?
Definition: ZMQConfirmedConnection.h:92
Belle2::ZMQConfirmedInput::m_receivedTerminateMessages
std::set< std::string > m_receivedTerminateMessages
The set of input identities which have already sent a terminate message.
Definition: ZMQConfirmedConnection.h:90
Belle2::ZMQConfirmedInput::overwriteStopMessage
std::unique_ptr< ZMQIdMessage > overwriteStopMessage()
Manually overwrite the stop message counter and set it to have all stop messages received.
Definition: ZMQConfirmedConnection.cc:189
Belle2::ZMQConfirmedInput::m_allStopMessages
bool m_allStopMessages
Have we received all stop messages?
Definition: ZMQConfirmedConnection.h:88
Belle2::ZMQConfirmedInput::ZMQConfirmedInput
ZMQConfirmedInput(const std::string &inputAddress, const std::shared_ptr< ZMQParent > &parent)
Create a new confirmed output by binding to the address.
Definition: ZMQConfirmedConnection.cc:19
Belle2::ZMQConfirmedInput::handleIncomingData
std::unique_ptr< ZMQIdMessage > handleIncomingData()
Block until a message can be received from one of the inputs.
Definition: ZMQConfirmedConnection.cc:50
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::ZMQConfirmedInput::clear
void clear()
Reset the counters for all received stop and terminate messages. Should be called on run start.
Definition: ZMQConfirmedConnection.cc:173
Belle2::ZMQConfirmedOutput::m_timespanWaitingForConfirmation
long m_timespanWaitingForConfirmation
Internal monitoring how long we were waiting for confirmation messages.
Definition: ZMQConfirmedConnection.h:140
Belle2::ZMQConnectionOverSocket
Specialized connection over a ZMQ socket.
Definition: ZMQConnection.h:72
Belle2::ZMQConfirmedOutput::handleIncomingData
void handleIncomingData()
Blocks until it can receive the (hopefully confirmation) message from the output.
Definition: ZMQConfirmedConnection.cc:265