Belle II Software development
ZMQDistributor.h
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#pragma once
9
10#include <daq/hbasf2/apps/ZMQApp.h>
11#include <framework/pcore/zmq/connections/ZMQLoadBalancedConnection.h>
12#include <framework/pcore/zmq/connections/ZMQRawConnection.h>
13#include <framework/pcore/zmq/connections/ZMQConfirmedConnection.h>
14
15#include <boost/program_options.hpp>
16
17namespace po = boost::program_options;
18
19namespace Belle2 {
33 class ZMQDistributor : public ZMQStandardApp<ZMQRawInput, ZMQLoadBalancedOutput> {
34 protected:
36 void initialize() final;
38 void addOptions(po::options_description& desc) final;
40 void handleExternalSignal(EMessageTypes type) final;
42 void handleInput() final;
44 void handleTimeout() final;
45
46 private:
48 std::string m_inputAddress;
50 std::string m_outputAddress;
52 unsigned int m_maximalBufferSize = 80'000'000;
54 bool m_expressRecoMode = false;
56 unsigned int m_stopWaitingTime = 2;
57 };
58
59 class ZMQInputAdapter : public ZMQStandardApp<ZMQRawInput, ZMQConfirmedOutput> {
60 protected:
62 void initialize() final;
64 void addOptions(po::options_description& desc) final;
66 void handleExternalSignal(EMessageTypes type) final;
68 void handleInput() final;
70 void handleTimeout() final;
71
72 private:
74 std::string m_inputAddress;
76 std::string m_outputAddress;
78 unsigned int m_maximalBufferSize = 80'000'000;
80 bool m_expressRecoMode = false;
82 unsigned int m_stopWaitingTime = 2;
83 };
85}
Standard distributor app: receive data via a raw connection (e.g.
unsigned int m_maximalBufferSize
Parameter: buffer size for storing input messages.
std::string m_inputAddress
Parameter: input address.
void initialize() final
Initialize the two connections using the command line arguments.
bool m_expressRecoMode
Parameter: Do not wait for a ready worker if set to true, but dismiss the incoming event.
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
std::string m_outputAddress
Parameter: output address.
void handleExternalSignal(EMessageTypes type) final
Handle stop, start and terminate messages as described above.
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
void handleInput() final
Pass the message from the input connection to the output connection (only data messages)
unsigned int m_maximalBufferSize
Parameter: buffer size for storing input messages.
std::string m_inputAddress
Parameter: input address.
void initialize() final
Initialize the two connections using the command line arguments.
bool m_expressRecoMode
Parameter: Choose how the input events are formatted.
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
std::string m_outputAddress
Parameter: output address.
void handleExternalSignal(EMessageTypes type) final
Send a stop message on stop or clear the counters on start from the monitoring connection.
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
void handleInput() final
Pass the message from the input connection to the output connection (if there is a message)
Generic base class for all standalone ZMQ applications.
Definition: ZMQApp.h:46
EMessageTypes
Type the messages can have.
Abstract base class for different kinds of events.