Belle II Software  release-08-02-06
ZMQConfirmedConnection.h
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #pragma once
9 
10 #include <framework/pcore/zmq/utils/ZMQParent.h>
11 #include <framework/pcore/zmq/connections/ZMQConnection.h>
12 
13 #include <framework/pcore/zmq/messages/ZMQIdMessage.h>
14 #include <framework/pcore/zmq/messages/ZMQNoIdMessage.h>
15 
16 #include <set>
17 #include <string>
18 #include <memory>
19 
20 
21 namespace Belle2 {
56  public:
58  ZMQConfirmedInput(const std::string& inputAddress, const std::shared_ptr<ZMQParent>& parent);
59 
66  std::unique_ptr<ZMQIdMessage> handleIncomingData();
67 
69  void clear();
70 
72  std::unique_ptr<ZMQIdMessage> overwriteStopMessage();
73 
74  private:
76  std::set<std::string> m_receivedStopMessages;
78  bool m_allStopMessages = false;
80  std::set<std::string> m_receivedTerminateMessages;
82  bool m_allTerminateMessages = false;
84  std::set<std::string> m_registeredWorkersInput;
88  std::chrono::time_point<std::chrono::system_clock> m_whenEventAfterAllStopMessages;
89  };
90 
111  public:
113  ZMQConfirmedOutput(const std::string& outputAddress, const std::shared_ptr<ZMQParent>& parent);
114 
121  void handleEvent(std::unique_ptr<ZMQNoIdMessage> message, bool requireConfirmation = true, int maximalWaitTime = 10000);
122 
128  void handleIncomingData();
129 
130  private:
132  unsigned int m_waitingForConfirmation = 0;
135  };
137 }
Input part of a confirmed connection.
std::set< std::string > m_receivedStopMessages
The set of input identities which have already sent a stop message.
bool m_eventAfterAllStopMessages
A flag to check the events appear after the all stop messages.
std::unique_ptr< ZMQIdMessage > handleIncomingData()
Block until a message can be received from one of the inputs.
std::set< std::string > m_receivedTerminateMessages
The set of input identities which have already sent a terminate message.
std::set< std::string > m_registeredWorkersInput
The set of all registered inputs.
std::chrono::time_point< std::chrono::system_clock > m_whenEventAfterAllStopMessages
A time when the eventAfterAllStopMessages is issued.
bool m_allTerminateMessages
Have we received all terminante messages?
ZMQConfirmedInput(const std::string &inputAddress, const std::shared_ptr< ZMQParent > &parent)
Create a new confirmed output by binding to the address.
void clear()
Reset the counters for all received stop and terminate messages. Should be called on run start.
bool m_allStopMessages
Have we received all stop messages?
std::unique_ptr< ZMQIdMessage > overwriteStopMessage()
Manually overwrite the stop message counter and set it to have all stop messages received.
Output part of a confirmed connection.
void handleEvent(std::unique_ptr< ZMQNoIdMessage > message, bool requireConfirmation=true, int maximalWaitTime=10000)
Send the message to the output (a message without a ID as there is only a single output).
long m_timespanWaitingForConfirmation
Internal monitoring how long we were waiting for confirmation messages.
void handleIncomingData()
Blocks until it can receive the (hopefully confirmation) message from the output.
unsigned int m_waitingForConfirmation
On how many confirmation messages are we still waiting?
ZMQConfirmedOutput(const std::string &outputAddress, const std::shared_ptr< ZMQParent > &parent)
Create a new confirmed output by connecting to the address.
Specialized connection over a ZMQ socket.
Definition: ZMQConnection.h:62
Abstract base class for different kinds of events.