Belle II Software development
ZMQTxInputModule.h
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#pragma once
9
10#include <framework/core/Module.h>
11#include <framework/datastore/StoreObjPtr.h>
12#include <framework/dataobjects/EventMetaData.h>
13#include <framework/core/RandomGenerator.h>
14#include <framework/pcore/zmq/sockets/ZMQClient.h>
15#include <framework/pcore/zmq/utils/StreamHelper.h>
16
17#include <framework/pcore/zmq/processModules/ProcessedEventsBackupList.h>
18#include <deque>
19
20namespace Belle2 {
29 class ZMQTxInputModule : public Module {
30 public:
34 void event() override;
36 void terminate() override;
38 void initialize() override;
40 // void beginRun() override;
42 void endRun() override;
43
44 private:
46 std::deque<unsigned int> m_nextWorker;
48 std::vector<unsigned int> m_workers;
51
58
60 bool m_firstEvent = true;
61
63 std::string m_param_socketName;
72
77
82
85 };
87}
Base class for Modules.
Definition: Module.h:72
List-like structure for storing and retaining event backups.
Type-safe access to single objects in the data store.
Definition: StoreObjPtr.h:96
Helper class for data store serialization.
Definition: StreamHelper.h:23
A helper class for communicating over ZMQ. Includes a multicast and (if needed) also a data socket.
Definition: ZMQClient.h:22
Module connecting the input path with the worker path on the input side.
int m_param_compressionLevel
Parameter: Compression level of the streamer.
bool m_firstEvent
Set to false if the objects are initialized.
std::string m_param_socketName
Parameter: name of the data socket.
StreamHelper m_streamer
The data store streamer.
std::deque< unsigned int > m_nextWorker
The list of next worker ids.
unsigned int m_param_workerProcessTimeout
Maximal time a worker is allowed to spent in ms.
void initialize() override
Initialize the RandomSeedGenerator.
ZMQClient m_zmqClient
Our ZMQ client.
ProcessedEventsBackupList m_procEvtBackupList
The backup list.
void event() override
Pack the datastore and send it. Also handle ready or hello messages of workers.
void checkWorkerProcTimeout()
Check if a worker has fallen into a timeout and send a kill message if needed.
ZMQTxInputModule()
Constructor setting the moudle paramters.
void endRun() override
BeginRun processing.
void terminate() override
Terminate the client and tell the monitor, we are done. Tell the output to end if all backups are out...
unsigned int m_param_maximalWaitingTime
Maximal time to wait for any message from the workers in ms.
StoreObjPtr< EventMetaData > m_eventMetaData
The event meta data in the data store needed for confirming events.
std::vector< unsigned int > m_workers
The list of all workers (to say goodbye properly).
std::string m_param_xpubProxySocketName
Parameter: name of the pub multicast socket.
bool m_param_handleMergeable
Parameter: Can we handle mergeables?
StoreObjPtr< RandomGenerator > m_randomgenerator
The random generator in the data store.
std::string m_param_xsubProxySocketName
Parameter: name of the sub multicast socket.
bool m_param_useEventBackup
Flag to use the event backup or not.
Abstract base class for different kinds of events.