Belle II Software  release-08-01-10
AsyncWrapper.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <framework/pcore/AsyncWrapper.h>
10 #include <framework/core/Path.h>
11 
12 #include <framework/core/EventProcessor.h>
13 #include <framework/core/ModuleManager.h>
14 #include <framework/pcore/GlobalProcHandler.h>
15 #include <framework/pcore/RingBuffer.h>
16 #include <framework/pcore/RxModule.h>
17 #include <framework/pcore/TxModule.h>
18 #include <framework/datastore/StoreObjPtr.h>
19 #include <framework/dataobjects/EventMetaData.h>
20 
21 #include <sys/wait.h>
22 
23 #include <cstdlib>
24 
25 using namespace Belle2;
26 
27 bool AsyncWrapper::s_isAsync = false;
29 namespace {
30  static std::vector<RingBuffer*> rbList;
31  void cleanupIPC()
32  {
33  if (!AsyncWrapper::isAsync()) {
34  for (RingBuffer* rb : rbList)
35  delete rb;
36  rbList.clear();
37  }
38  }
39 }
40 
42 {
43  if (!s_isAsync) {
44  B2ERROR("AsyncWrapper::numAvailableEvents() used in synchronous thread??");
45  return true;
46  }
47 
48  return s_currentRingBuffer->numq();
49 }
50 
51 AsyncWrapper::AsyncWrapper(const std::string& moduleType): Module(),
52  m_wrappedModule(ModuleManager::Instance().registerModule(moduleType)),
53  m_ringBuffer(nullptr), m_rx(nullptr), m_tx(nullptr)
54 {
55  setParamList(m_wrappedModule->getParamList()); //inherit parameters from wrapped module
56 
57  addParam("discardOldEvents", m_discardOldEvents,
58  "Discard old events when buffer is full. If false, the main process will wait until there is enough space in the buffer. (i.e. synchronous operation)",
59  true);
60 }
61 
62 AsyncWrapper::~AsyncWrapper() = default;
63 
65 {
66  B2INFO("Initializing AsyncWrapper...");
67 
69  const int bufferSizeInts = 8000000; //~32M, within Ubuntu's shmmax limit
70  m_ringBuffer = new RingBuffer(bufferSizeInts);
71  rbList.push_back(m_ringBuffer);
72  m_rx = new RxModule(m_ringBuffer);
74  m_tx = new TxModule(m_ringBuffer);
76  m_tx->setBlockingInsert(!m_discardOldEvents); //actually decouple this process
77 
78  //fork out one extra process
80  //forked thread:
81  //allow access to async parts
82  s_isAsync = true;
84 
85  PathPtr path(new Path);
86  path->addModule(ModulePtr(m_rx));
87  path->addModule(m_wrappedModule);
88 
89  //LogSystem::Instance().resetMessageCounter(); //for testing parallel processing
90 
91  EventProcessor eventProc;
92  eventProc.process(path);
93  B2INFO("Asynchronous process done!");
94  exit(0);
95  } else {
96  atexit(cleanupIPC);
97 
98  //main thread: chain tx and return
99  m_tx->initialize();
100  }
101 }
102 
104 {
106  if (waitpid(-1, nullptr, WNOHANG) != 0) {
107  StoreObjPtr<EventMetaData> eventMetaData;
108  eventMetaData->setEndOfData();
109  }
110 
111  m_tx->event();
112  }
113 }
114 
116 {
118  m_tx->terminate();
119 
120  m_ringBuffer->kill();
121  B2INFO("Waiting for asynchronous process...");
123  B2INFO("Done, cleaning up...");
124  delete m_tx;
125  delete m_rx;
126  delete m_ringBuffer;
127  for (RingBuffer*& rb : rbList)
128  if (rb == m_ringBuffer)
129  rb = nullptr;
130  }
131 }
TxModule * m_tx
transmitting module.
Definition: AsyncWrapper.h:74
void initialize() override
Call this from initialize().
Definition: AsyncWrapper.cc:64
void event() override
Call this from event().
void terminate() override
Call this from terminate().
ModulePtr m_wrappedModule
The wrapped module.
Definition: AsyncWrapper.h:65
AsyncWrapper(const std::string &moduleType)
Wrap am module of given type.
Definition: AsyncWrapper.cc:51
static bool isAsync()
returns true if the current process is on the receiving (async) side of an AsyncWrapper.
Definition: AsyncWrapper.h:57
RxModule * m_rx
receiving module.
Definition: AsyncWrapper.h:71
static int numAvailableEvents()
Retun number of events available in the RingBuffer.
Definition: AsyncWrapper.cc:41
RingBuffer * m_ringBuffer
shared memory buffer
Definition: AsyncWrapper.h:68
static RingBuffer * s_currentRingBuffer
if s_isAsync is true, this contains the corresponding RingBuffer, see numAvailableEvents().
Definition: AsyncWrapper.h:83
static bool s_isAsync
true if the current process is on the receiving (async) side of an AsyncWrapper.
Definition: AsyncWrapper.h:80
bool m_discardOldEvents
Discard old events when buffer is full.
Definition: AsyncWrapper.h:77
provides the core event processing loop.
void process(const PathPtr &startPath, long maxEvent=0)
Processes the full module chain, starting with the first module in the given path.
static bool isWorkerProcess()
Return true if the process is of type c_Worker.
static void initialize(unsigned int nWorkerProc)
Create a new process handler, which will handle nWorkerProc processes.
static bool startWorkerProcesses(unsigned int numProcesses)
Fork and initialize as many worker processes as requested.
static void waitForAllProcesses()
Wait until all forked processes handled by this GlobalProcHandler.
The ModuleManager Class.
Definition: ModuleManager.h:53
Base class for Modules.
Definition: Module.h:72
void setParamList(const ModuleParamList &params)
Replace existing parameter list.
Definition: Module.h:501
Implements a path consisting of Module and/or Path objects.
Definition: Path.h:38
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:39
int clear()
Clear the RingBuffer.
Definition: RingBuffer.cc:426
int numq() const
Returns number of entries/buffers in the RingBuffer.
Definition: RingBuffer.cc:368
void kill()
Cause termination of reading processes (if they use isDead()).
Definition: RingBuffer.cc:389
Module to decode data store contents from RingBuffer.
Definition: RxModule.h:25
void disableMergeableHandling(bool disable=true)
Disable handling of Mergeable objects.
Definition: RxModule.h:44
Type-safe access to single objects in the data store.
Definition: StoreObjPtr.h:96
Module for encoding data store contents into a RingBuffer.
Definition: TxModule.h:25
virtual void initialize() override
Module functions to be called from main process.
Definition: TxModule.cc:40
virtual void event() override
This method is the core of the module.
Definition: TxModule.cc:72
virtual void terminate() override
This method is called at the end of the event processing.
Definition: TxModule.cc:111
void disableMergeableHandling(bool disable=true)
Disable handling of Mergeable objects.
Definition: TxModule.h:53
void setBlockingInsert(bool block)
Whether to block until we can insert data into the ring buffer in event().
Definition: TxModule.h:50
void addParam(const std::string &name, T &paramVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
Definition: Module.h:560
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
Definition: Path.h:28
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
Definition: Module.h:40
Abstract base class for different kinds of events.