 |
Belle II Software
release-05-01-25
|
11 #include <framework/pcore/AsyncWrapper.h>
12 #include <framework/core/Path.h>
14 #include <framework/core/EventProcessor.h>
15 #include <framework/core/ModuleManager.h>
16 #include <framework/pcore/GlobalProcHandler.h>
17 #include <framework/pcore/RingBuffer.h>
18 #include <framework/pcore/RxModule.h>
19 #include <framework/pcore/TxModule.h>
20 #include <framework/datastore/StoreObjPtr.h>
21 #include <framework/dataobjects/EventMetaData.h>
32 static std::vector<RingBuffer*> rbList;
46 B2ERROR(
"AsyncWrapper::numAvailableEvents() used in synchronous thread??");
54 m_wrappedModule(
ModuleManager::Instance().registerModule(moduleType)),
55 m_ringBuffer(nullptr), m_rx(nullptr), m_tx(nullptr)
60 "Discard old events when buffer is full. If false, the main process will wait until there is enough space in the buffer. (i.e. synchronous operation)",
64 AsyncWrapper::~AsyncWrapper() =
default;
68 B2INFO(
"Initializing AsyncWrapper...");
71 const int bufferSizeInts = 8000000;
95 B2INFO(
"Asynchronous process done!");
108 if (waitpid(-1,
nullptr, WNOHANG) != 0) {
110 eventMetaData->setEndOfData();
123 B2INFO(
"Waiting for asynchronous process...");
125 B2INFO(
"Done, cleaning up...");
static int numAvailableEvents()
Retun number of events available in the RingBuffer.
int clear()
Clear the RingBuffer.
void process(const PathPtr &startPath, long maxEvent=0)
Processes the full module chain, starting with the first module in the given path.
virtual void event() override
This method is the core of the module.
virtual void initialize() override
Module functions to be called from main process.
static RingBuffer * s_currentRingBuffer
if s_isAsync is true, this contains the corresponding RingBuffer, see numAvailableEvents().
static bool s_isAsync
true if the current process is on the receiving (async) side of an AsyncWrapper.
Module to decode data store contents from RingBuffer.
static void initialize(unsigned int nWorkerProc)
Create a new process handler, which will handle nWorkerProc processes.
Module for encoding data store contents into a RingBuffer.
virtual void terminate() override
This method is called at the end of the event processing.
void setBlockingInsert(bool block)
Whether to block until we can insert data into the ring buffer in event().
void terminate() override
Call this from terminate().
static bool startWorkerProcesses(unsigned int numProcesses)
Fork and initialize as many worker processes as requested.
int numq() const
Returns number of entries/buffers in the RingBuffer.
TxModule * m_tx
transmitting module.
Class to manage a Ring Buffer placed in an IPC shared memory.
static bool isAsync()
returns true if the current process is on the receiving (async) side of an AsyncWrapper.
void event() override
Call this from event().
bool m_discardOldEvents
Discard old events when buffer is full.
Abstract base class for different kinds of events.
Type-safe access to single objects in the data store.
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
void kill()
Cause termination of reading processes (if they use isDead()).
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
static bool isWorkerProcess()
Return true if the process is of type c_Worker.
void disableMergeableHandling(bool disable=true)
Disable handling of Mergeable objects.
RxModule * m_rx
receiving module.
void addParam(const std::string &name, T ¶mVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
void disableMergeableHandling(bool disable=true)
Disable handling of Mergeable objects.
Implements a path consisting of Module and/or Path objects.
static void waitForAllProcesses()
Wait until all forked processes handled by this GlobalProcHandler.
void initialize() override
Call this from initialize().
AsyncWrapper(const std::string &moduleType)
Wrap am module of given type.
ModulePtr m_wrappedModule
The wrapped module.
RingBuffer * m_ringBuffer
shared memory buffer
provides the core event processing loop.
void setParamList(const ModuleParamList ¶ms)
Replace existing parameter list.