Belle II Software  release-05-02-19
ZMQTxInputModule.h
1 /**************************************************************************
2  * BASF2 (Belle Analysis Framework 2) *
3  * Copyright(C) 2018 - Belle II Collaboration *
4  * *
5  * Author: The Belle II Collaboration *
6  * Contributors: Nils Braun, Anselm Baur *
7  * *
8  * This software is provided "as is" without any warranty. *
9  **************************************************************************/
10 #pragma once
11 
12 #include <framework/core/Module.h>
13 #include <framework/datastore/StoreObjPtr.h>
14 #include <framework/dataobjects/EventMetaData.h>
15 #include <framework/core/RandomGenerator.h>
16 #include <framework/pcore/zmq/sockets/ZMQClient.h>
17 #include <framework/pcore/zmq/utils/StreamHelper.h>
18 
19 #include <framework/pcore/zmq/processModules/ProcessedEventsBackupList.h>
20 #include <deque>
21 
22 namespace Belle2 {
31  class ZMQTxInputModule : public Module {
32  public:
36  void event() override;
38  void terminate() override;
40  void initialize() override;
41 
42  private:
44  std::deque<unsigned int> m_nextWorker;
46  std::vector<unsigned int> m_workers;
48  ProcessedEventsBackupList m_procEvtBackupList;
49 
51  unsigned int m_param_maximalWaitingTime;
53  unsigned int m_param_workerProcessTimeout;
56 
58  bool m_firstEvent = true;
59 
61  std::string m_param_socketName;
65  std::string m_param_xsubProxySocketName;
70 
75 
80 
83  };
85 }
Belle2::ZMQTxInputModule::m_param_xsubProxySocketName
std::string m_param_xsubProxySocketName
Parameter: name of the sub multicast socket.
Definition: ZMQTxInputModule.h:73
Belle2::ZMQTxInputModule::checkWorkerProcTimeout
void checkWorkerProcTimeout()
Check if a worker has fallen into a timeout and send a kill message if needed.
Definition: ZMQTxInputModule.cc:151
Belle2::ZMQTxInputModule::event
void event() override
Pack the datastore and send it. Also handle ready or hello messages of workers.
Definition: ZMQTxInputModule.cc:39
Belle2::ZMQTxInputModule::m_workers
std::vector< unsigned int > m_workers
The list of all workers (to say goodbye properly).
Definition: ZMQTxInputModule.h:54
Belle2::ZMQTxInputModule::initialize
void initialize() override
Initialize the RandomSeedGenerator.
Definition: ZMQTxInputModule.cc:33
Belle2::ZMQTxInputModule::m_eventMetaData
StoreObjPtr< EventMetaData > m_eventMetaData
The event meta data in the data store needed for confirming events.
Definition: ZMQTxInputModule.h:85
Belle2::ZMQTxInputModule::m_nextWorker
std::deque< unsigned int > m_nextWorker
The list of next worker ids.
Definition: ZMQTxInputModule.h:52
Belle2::ZMQTxInputModule::m_param_workerProcessTimeout
unsigned int m_param_workerProcessTimeout
Maximal time a worker is allowed to spent in ms.
Definition: ZMQTxInputModule.h:61
Belle2::ZMQTxInputModule::m_randomgenerator
StoreObjPtr< RandomGenerator > m_randomgenerator
The random generator in the data store.
Definition: ZMQTxInputModule.h:87
Belle2::ZMQTxInputModule::ZMQTxInputModule
ZMQTxInputModule()
Constructor setting the moudle paramters.
Definition: ZMQTxInputModule.cc:17
Belle2::StreamHelper
Helper class for data store serialization.
Definition: StreamHelper.h:33
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::ZMQTxInputModule::m_firstEvent
bool m_firstEvent
Set to false if the objects are initialized.
Definition: ZMQTxInputModule.h:66
Belle2::ZMQTxInputModule::m_procEvtBackupList
ProcessedEventsBackupList m_procEvtBackupList
The backup list.
Definition: ZMQTxInputModule.h:56
Belle2::StoreObjPtr
Type-safe access to single objects in the data store.
Definition: ParticleList.h:33
Belle2::ZMQTxInputModule::m_param_handleMergeable
bool m_param_handleMergeable
Parameter: Can we handle mergeables?
Definition: ZMQTxInputModule.h:77
Belle2::ZMQTxInputModule::m_streamer
StreamHelper m_streamer
The data store streamer.
Definition: ZMQTxInputModule.h:82
Belle2::ZMQTxInputModule::m_param_maximalWaitingTime
unsigned int m_param_maximalWaitingTime
Maximal time to wait for any message from the workers in ms.
Definition: ZMQTxInputModule.h:59
Belle2::ZMQTxInputModule::m_param_compressionLevel
int m_param_compressionLevel
Parameter: Compression level of the streamer.
Definition: ZMQTxInputModule.h:75
Belle2::ZMQTxInputModule::terminate
void terminate() override
Terminate the client and tell the monitor, we are done. Tell the output to end if all backups are out...
Definition: ZMQTxInputModule.cc:169
Belle2::ZMQTxInputModule::m_param_xpubProxySocketName
std::string m_param_xpubProxySocketName
Parameter: name of the pub multicast socket.
Definition: ZMQTxInputModule.h:71
Belle2::ZMQTxInputModule::m_zmqClient
ZMQClient m_zmqClient
Our ZMQ client.
Definition: ZMQTxInputModule.h:80
Belle2::ZMQTxInputModule::m_param_useEventBackup
bool m_param_useEventBackup
Flag to use the event backup or not.
Definition: ZMQTxInputModule.h:63
Belle2::ZMQClient
A helper class for communicating over ZMQ. Includes a multicast and (if needed) also a data socket.
Definition: ZMQClient.h:32
Belle2::ZMQTxInputModule::m_param_socketName
std::string m_param_socketName
Parameter: name of the data socket.
Definition: ZMQTxInputModule.h:69