Belle II Software development
HLTZMQ2DsDirect.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include <daq/hbasf2/modules/HLTZMQ2DsDirect.h>
9#include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
10
11#include <framework/logging/Logger.h>
12
13using namespace std;
14using namespace Belle2;
15
16REG_MODULE(HLTZMQ2DsDirect);
17
19{
21 ""
22 );
24
25 addParam("input", m_param_input, "ZMQ address of the input ZMQ application");
26 addParam("bufferSize", m_param_bufferSize,
27 "How many events should be kept in flight. Has an impact on the stopping time as well as the rate stability", m_param_bufferSize);
28}
29
30void HLTZMQ2DsDirectModule::readEvent()
31{
32 try {
33 bool tryAgain = false;
34 const auto reactToInput = [this, &tryAgain]() {
35 auto eventMessage = m_input->handleIncomingData();
36
37 if (eventMessage->isMessage(EMessageTypes::c_lastEventMessage)) {
38 // We do not care about those messages, so just continue
39 tryAgain = true;
40 return;
41 } else if (eventMessage->isMessage(EMessageTypes::c_terminateMessage)) {
42 B2DEBUG(10, "Received termination request");
43 tryAgain = false;
44 return;
45 }
46
47 B2ASSERT("Must be event message", eventMessage->isMessage(EMessageTypes::c_eventMessage) or
48 eventMessage->isMessage(EMessageTypes::c_rawDataMessage));
49 B2DEBUG(10, "received event message... write it to data store");
50
51 m_streamHelper.read(std::move(eventMessage));
52 tryAgain = false;
53 };
54
55 do {
56 ZMQConnection::poll({{m_input.get(), reactToInput}}, -1);
57 } while (tryAgain);
58 } catch (zmq::error_t& error) {
59 if (error.num() == EINTR) {
60 // Well, that is probably ok. It will be handled by the framework, just go out here.
61 B2DEBUG(10, "Received an signal interrupt during the event call. Will return");
62 return;
63 }
64 // This is an unexpected error: better report it.
65 B2ERROR("ZMQ Error while calling the event: " << error.num());
66 }
67}
68
70{
72
73 m_parent = std::make_unique<ZMQParent>();
74 m_input = std::make_unique<ZMQLoadBalancedInput>(m_param_input, m_param_bufferSize, m_parent);
75
76 readEvent();
77}
78
80{
81 // We do not need to process this event again
82 if (m_firstEvent) {
83 m_firstEvent = false;
84 return;
85 }
86
87 readEvent();
88}
void initialize()
Initialize this class. Call this e.g. in the first event.
void read(std::unique_ptr< ZMQNoIdMessage > message)
Read in a ZMQ message and rebuilt the data store from it.
bool m_firstEvent
Are we still in the first real event?
std::unique_ptr< ZMQLoadBalancedInput > m_input
Load balanced connection to the previous ZMQ application.
void initialize() override
Receive the first event and initialize the data store with it.
HLTZMQ2DsDirectModule()
Register the module parameters.
void event() override
If not in the first event, receive an event and store in the DS.
std::shared_ptr< ZMQParent > m_parent
ZMQ Parent needed for the connections.
unsigned int m_param_bufferSize
Module parameter: how many events should be kept in flight. Has an impact on the stopping time as wel...
HLTStreamHelper m_streamHelper
Utility class for deserialization.
std::string m_param_input
Module parameter: ZMQ address of the input ZMQ application.
Base class for Modules.
Definition: Module.h:72
void setDescription(const std::string &description)
Sets the description of the module.
Definition: Module.cc:214
void setPropertyFlags(unsigned int propertyFlags)
Sets the flags for the module properties.
Definition: Module.cc:208
@ c_Input
This module is an input module (reads data).
Definition: Module.h:78
@ c_ParallelProcessingCertified
This module can be run in parallel processing mode safely (All I/O must be done through the data stor...
Definition: Module.h:80
static bool poll(const std::map< const ZMQConnection *, ReactorFunction > &connectionList, int timeout)
Poll on the given connections and call the attached function if a messages comes in.
void addParam(const std::string &name, T &paramVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
Definition: Module.h:560
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
Abstract base class for different kinds of events.
STL namespace.