Belle II Software development
ZMQLoadBalancedConnection.h
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#pragma once
9
10#include <framework/pcore/zmq/connections/ZMQConnection.h>
11#include <framework/pcore/zmq/utils/ZMQParent.h>
12
13#include <framework/pcore/zmq/messages/ZMQIdMessage.h>
14#include <framework/pcore/zmq/messages/ZMQNoIdMessage.h>
15
16#include <zmq.hpp>
17
18#include <string>
19#include <memory>
20#include <set>
21#include <deque>
22
23namespace Belle2 {
41 public:
43 ZMQLoadBalancedInput(const std::string& inputAddress, unsigned int bufferSize,
44 const std::shared_ptr<ZMQParent>& parent);
45
47 std::unique_ptr<ZMQNoIdMessage> handleIncomingData();
48 };
49
72 public:
74 ZMQLoadBalancedOutput(const std::string& outputAddress, bool lax, const std::shared_ptr<ZMQParent>& parent);
75
88 void handleEvent(std::unique_ptr<ZMQNoIdMessage> message);
89
91 void handleIncomingData();
93 void clear();
95 bool isReady() const final;
96
97 protected:
99 std::deque<std::string> m_readyWorkers;
101 std::set<std::string> m_allWorkers;
103 bool m_sentStopMessages = false;
107 bool m_lax = false;
108 };
110}
Specialized connection over a ZMQ socket.
Definition: ZMQConnection.h:63
Input part of a load-balanced connection.
std::unique_ptr< ZMQNoIdMessage > handleIncomingData()
Answer event messages with a ready message and pass on every received message.
Output part of a load-balanced connection.
std::set< std::string > m_allWorkers
All ever registered inputs.
bool m_sentTerminateMessages
Did we already sent a terminate message?
void handleIncomingData()
Block until a ready message from an input is received and add it to the ready queue.
bool m_lax
Parameter to enable lax mode.
std::deque< std::string > m_readyWorkers
List of identities of ready inputs in LIFO order.
bool isReady() const final
If lax mode is disabled, the output is ready if at least a single input is ready. Else always.
void clear()
Clear the counter for sent stop and terminate messages. Should be called on run start.
void handleEvent(std::unique_ptr< ZMQNoIdMessage > message)
Send the given message (without identity) to the next input in the ready list.
bool m_sentStopMessages
Did we already sent a stop message?
Abstract base class for different kinds of events.