Belle II Software  release-05-02-19
ZMQLoadBalancedConnection.h
1 /**************************************************************************
2  * BASF2 (Belle Analysis Framework 2) *
3  * Copyright(C) 2019 - Belle II Collaboration *
4  * *
5  * Author: The Belle II Collaboration *
6  * Contributors: Nils Braun *
7  * *
8  * This software is provided "as is" without any warranty. *
9  **************************************************************************/
10 #pragma once
11 
12 #include <framework/pcore/zmq/connections/ZMQConnection.h>
13 #include <framework/pcore/zmq/utils/ZMQParent.h>
14 
15 #include <framework/pcore/zmq/messages/ZMQIdMessage.h>
16 #include <framework/pcore/zmq/messages/ZMQNoIdMessage.h>
17 
18 #include <zmq.hpp>
19 
20 #include <string>
21 #include <memory>
22 #include <set>
23 #include <deque>
24 
25 namespace Belle2 {
42  class ZMQLoadBalancedInput : public ZMQConnectionOverSocket {
43  public:
45  ZMQLoadBalancedInput(const std::string& inputAddress, unsigned int bufferSize,
46  const std::shared_ptr<ZMQParent>& parent);
47 
49  std::unique_ptr<ZMQNoIdMessage> handleIncomingData();
50  };
51 
74  public:
76  ZMQLoadBalancedOutput(const std::string& outputAddress, bool lax, const std::shared_ptr<ZMQParent>& parent);
77 
90  void handleEvent(std::unique_ptr<ZMQNoIdMessage> message);
91 
93  void handleIncomingData();
95  void clear();
97  bool isReady() const final;
98 
99  protected:
101  std::deque<std::string> m_readyWorkers;
103  std::set<std::string> m_allWorkers;
105  bool m_sentStopMessages = false;
107  bool m_sentTerminateMessages = false;
109  bool m_lax = false;
110  };
112 }
Belle2::ZMQLoadBalancedOutput::m_sentStopMessages
bool m_sentStopMessages
Did we already sent a stop message?
Definition: ZMQLoadBalancedConnection.h:113
Belle2::ZMQLoadBalancedOutput::handleEvent
void handleEvent(std::unique_ptr< ZMQNoIdMessage > message)
Send the given message (without identity) to the next input in the ready list.
Definition: ZMQLoadBalancedConnection.cc:83
Belle2::ZMQLoadBalancedOutput::isReady
bool isReady() const final
If lax mode is disabled, the output is ready if at least a single input is ready. Else always.
Definition: ZMQLoadBalancedConnection.cc:180
Belle2::ZMQLoadBalancedInput::handleIncomingData
std::unique_ptr< ZMQNoIdMessage > handleIncomingData()
Answer event messages with a ready message and pass on every received message.
Definition: ZMQLoadBalancedConnection.cc:37
Belle2::ZMQLoadBalancedOutput::m_lax
bool m_lax
Parameter to enable lax mode.
Definition: ZMQLoadBalancedConnection.h:117
Belle2::ZMQLoadBalancedInput::ZMQLoadBalancedInput
ZMQLoadBalancedInput(const std::string &inputAddress, unsigned int bufferSize, const std::shared_ptr< ZMQParent > &parent)
Create a new load-balanced input connecting to the given address. Send bufferSize ready messages.
Definition: ZMQLoadBalancedConnection.cc:17
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::ZMQLoadBalancedOutput::handleIncomingData
void handleIncomingData()
Block until a ready message from an input is received and add it to the ready queue.
Definition: ZMQLoadBalancedConnection.cc:142
Belle2::ZMQLoadBalancedOutput::m_sentTerminateMessages
bool m_sentTerminateMessages
Did we already sent a terminate message?
Definition: ZMQLoadBalancedConnection.h:115
Belle2::ZMQLoadBalancedOutput::m_readyWorkers
std::deque< std::string > m_readyWorkers
List of identities of ready inputs in LIFO order.
Definition: ZMQLoadBalancedConnection.h:109
Belle2::ZMQLoadBalancedOutput::clear
void clear()
Clear the counter for sent stop and terminate messages. Should be called on run start.
Definition: ZMQLoadBalancedConnection.cc:171
Belle2::ZMQLoadBalancedOutput
Output part of a load-balanced connection.
Definition: ZMQLoadBalancedConnection.h:81
Belle2::ZMQConnectionOverSocket
Specialized connection over a ZMQ socket.
Definition: ZMQConnection.h:72
Belle2::ZMQLoadBalancedOutput::ZMQLoadBalancedOutput
ZMQLoadBalancedOutput(const std::string &outputAddress, bool lax, const std::shared_ptr< ZMQParent > &parent)
Create a new load-balanced output and bind to the given address.
Definition: ZMQLoadBalancedConnection.cc:58
Belle2::ZMQLoadBalancedOutput::m_allWorkers
std::set< std::string > m_allWorkers
All ever registered inputs.
Definition: ZMQLoadBalancedConnection.h:111