Belle II Software development
AsyncWrapper.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include <framework/pcore/AsyncWrapper.h>
10#include <framework/core/Path.h>
11
12#include <framework/core/EventProcessor.h>
13#include <framework/core/ModuleManager.h>
14#include <framework/pcore/GlobalProcHandler.h>
15#include <framework/pcore/RingBuffer.h>
16#include <framework/pcore/RxModule.h>
17#include <framework/pcore/TxModule.h>
18#include <framework/datastore/StoreObjPtr.h>
19#include <framework/dataobjects/EventMetaData.h>
20
21#include <sys/wait.h>
22
23#include <cstdlib>
24
25using namespace Belle2;
26
27bool AsyncWrapper::s_isAsync = false;
29namespace {
30 static std::vector<RingBuffer*> rbList;
31 void cleanupIPC()
32 {
33 if (!AsyncWrapper::isAsync()) {
34 for (RingBuffer* rb : rbList)
35 delete rb;
36 rbList.clear();
37 }
38 }
39}
40
42{
43 if (!s_isAsync) {
44 B2ERROR("AsyncWrapper::numAvailableEvents() used in synchronous thread??");
45 return true;
46 }
47
48 return s_currentRingBuffer->numq();
49}
50
51AsyncWrapper::AsyncWrapper(const std::string& moduleType): Module(),
52 m_wrappedModule(ModuleManager::Instance().registerModule(moduleType)),
53 m_ringBuffer(nullptr), m_rx(nullptr), m_tx(nullptr)
54{
55 setParamList(m_wrappedModule->getParamList()); //inherit parameters from wrapped module
56
57 addParam("discardOldEvents", m_discardOldEvents,
58 "Discard old events when buffer is full. If false, the main process will wait until there is enough space in the buffer. (i.e. synchronous operation)",
59 true);
60}
61
62AsyncWrapper::~AsyncWrapper() = default;
63
65{
66 B2INFO("Initializing AsyncWrapper...");
67
69 const int bufferSizeInts = 8000000; //~32M, within Ubuntu's shmmax limit
70 m_ringBuffer = new RingBuffer(bufferSizeInts);
71 rbList.push_back(m_ringBuffer);
76 m_tx->setBlockingInsert(!m_discardOldEvents); //actually decouple this process
77
78 //fork out one extra process
80 //forked thread:
81 //allow access to async parts
82 s_isAsync = true;
84
85 PathPtr path(new Path);
86 path->addModule(ModulePtr(m_rx));
87 path->addModule(m_wrappedModule);
88
89 //LogSystem::Instance().resetMessageCounter(); //for testing parallel processing
90
91 EventProcessor eventProc;
92 eventProc.process(path);
93 B2INFO("Asynchronous process done!");
94 exit(0);
95 } else {
96 atexit(cleanupIPC);
97
98 //main thread: chain tx and return
100 }
101}
102
104{
106 if (waitpid(-1, nullptr, WNOHANG) != 0) {
107 StoreObjPtr<EventMetaData> eventMetaData;
108 eventMetaData->setEndOfData();
109 }
110
111 m_tx->event();
112 }
113}
114
116{
118 m_tx->terminate();
119
121 B2INFO("Waiting for asynchronous process...");
123 B2INFO("Done, cleaning up...");
124 delete m_tx;
125 delete m_rx;
126 delete m_ringBuffer;
127 for (RingBuffer*& rb : rbList)
128 if (rb == m_ringBuffer)
129 rb = nullptr;
130 }
131}
TxModule * m_tx
transmitting module.
Definition: AsyncWrapper.h:74
void initialize() override
Call this from initialize().
Definition: AsyncWrapper.cc:64
void event() override
Call this from event().
void terminate() override
Call this from terminate().
ModulePtr m_wrappedModule
The wrapped module.
Definition: AsyncWrapper.h:65
AsyncWrapper(const std::string &moduleType)
Wrap am module of given type.
Definition: AsyncWrapper.cc:51
static bool isAsync()
returns true if the current process is on the receiving (async) side of an AsyncWrapper.
Definition: AsyncWrapper.h:57
RxModule * m_rx
receiving module.
Definition: AsyncWrapper.h:71
static int numAvailableEvents()
Retun number of events available in the RingBuffer.
Definition: AsyncWrapper.cc:41
RingBuffer * m_ringBuffer
shared memory buffer
Definition: AsyncWrapper.h:68
static RingBuffer * s_currentRingBuffer
if s_isAsync is true, this contains the corresponding RingBuffer, see numAvailableEvents().
Definition: AsyncWrapper.h:83
static bool s_isAsync
true if the current process is on the receiving (async) side of an AsyncWrapper.
Definition: AsyncWrapper.h:80
bool m_discardOldEvents
Discard old events when buffer is full.
Definition: AsyncWrapper.h:77
provides the core event processing loop.
void process(const PathPtr &startPath, long maxEvent=0)
Processes the full module chain, starting with the first module in the given path.
static bool isWorkerProcess()
Return true if the process is of type c_Worker.
static void initialize(unsigned int nWorkerProc)
Create a new process handler, which will handle nWorkerProc processes.
static bool startWorkerProcesses(unsigned int numProcesses)
Fork and initialize as many worker processes as requested.
static void waitForAllProcesses()
Wait until all forked processes handled by this GlobalProcHandler.
The ModuleManager Class.
Definition: ModuleManager.h:53
Base class for Modules.
Definition: Module.h:72
void setParamList(const ModuleParamList &params)
Replace existing parameter list.
Definition: Module.h:501
Implements a path consisting of Module and/or Path objects.
Definition: Path.h:38
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:39
int clear()
Clear the RingBuffer.
Definition: RingBuffer.cc:426
int numq() const
Returns number of entries/buffers in the RingBuffer.
Definition: RingBuffer.cc:368
void kill()
Cause termination of reading processes (if they use isDead()).
Definition: RingBuffer.cc:389
Module to decode data store contents from RingBuffer.
Definition: RxModule.h:25
void disableMergeableHandling(bool disable=true)
Disable handling of Mergeable objects.
Definition: RxModule.h:44
Type-safe access to single objects in the data store.
Definition: StoreObjPtr.h:96
Module for encoding data store contents into a RingBuffer.
Definition: TxModule.h:25
virtual void initialize() override
Module functions to be called from main process.
Definition: TxModule.cc:40
virtual void event() override
This method is the core of the module.
Definition: TxModule.cc:72
virtual void terminate() override
This method is called at the end of the event processing.
Definition: TxModule.cc:111
void disableMergeableHandling(bool disable=true)
Disable handling of Mergeable objects.
Definition: TxModule.h:53
void setBlockingInsert(bool block)
Whether to block until we can insert data into the ring buffer in event().
Definition: TxModule.h:50
void addParam(const std::string &name, T &paramVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
Definition: Module.h:560
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
Definition: Path.h:35
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
Definition: Module.h:43
Abstract base class for different kinds of events.