Belle II Software  release-08-01-10
ZMQLoadBalancedConnection.h
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #pragma once
9 
10 #include <framework/pcore/zmq/connections/ZMQConnection.h>
11 #include <framework/pcore/zmq/utils/ZMQParent.h>
12 
13 #include <framework/pcore/zmq/messages/ZMQIdMessage.h>
14 #include <framework/pcore/zmq/messages/ZMQNoIdMessage.h>
15 
16 #include <zmq.hpp>
17 
18 #include <string>
19 #include <memory>
20 #include <set>
21 #include <deque>
22 
23 namespace Belle2 {
41  public:
43  ZMQLoadBalancedInput(const std::string& inputAddress, unsigned int bufferSize,
44  const std::shared_ptr<ZMQParent>& parent);
45 
47  std::unique_ptr<ZMQNoIdMessage> handleIncomingData();
48  };
49 
72  public:
74  ZMQLoadBalancedOutput(const std::string& outputAddress, bool lax, const std::shared_ptr<ZMQParent>& parent);
75 
88  void handleEvent(std::unique_ptr<ZMQNoIdMessage> message);
89 
91  void handleIncomingData();
93  void clear();
95  bool isReady() const final;
96 
97  protected:
99  std::deque<std::string> m_readyWorkers;
101  std::set<std::string> m_allWorkers;
103  bool m_sentStopMessages = false;
107  bool m_lax = false;
108  };
110 }
Specialized connection over a ZMQ socket.
Definition: ZMQConnection.h:62
Input part of a load-balanced connection.
ZMQLoadBalancedInput(const std::string &inputAddress, unsigned int bufferSize, const std::shared_ptr< ZMQParent > &parent)
Create a new load-balanced input connecting to the given address. Send bufferSize ready messages.
std::unique_ptr< ZMQNoIdMessage > handleIncomingData()
Answer event messages with a ready message and pass on every received message.
Output part of a load-balanced connection.
std::set< std::string > m_allWorkers
All ever registered inputs.
bool m_sentTerminateMessages
Did we already sent a terminate message?
ZMQLoadBalancedOutput(const std::string &outputAddress, bool lax, const std::shared_ptr< ZMQParent > &parent)
Create a new load-balanced output and bind to the given address.
void handleIncomingData()
Block until a ready message from an input is received and add it to the ready queue.
bool m_lax
Parameter to enable lax mode.
std::deque< std::string > m_readyWorkers
List of identities of ready inputs in LIFO order.
bool isReady() const final
If lax mode is disabled, the output is ready if at least a single input is ready. Else always.
void clear()
Clear the counter for sent stop and terminate messages. Should be called on run start.
void handleEvent(std::unique_ptr< ZMQNoIdMessage > message)
Send the given message (without identity) to the next input in the ready list.
bool m_sentStopMessages
Did we already sent a stop message?
Abstract base class for different kinds of events.