Belle II Software  release-08-01-10
PathUtils.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include <framework/pcore/PathUtils.h>
9 #include <framework/pcore/zmq/utils/ZMQAddressUtils.h>
10 #include <framework/core/ModuleManager.h>
11 #include <framework/core/Environment.h>
12 #include <set>
13 
14 using namespace Belle2;
15 
16 std::tuple<PathPtr, PathPtr, PathPtr> PathUtils::splitPath(const PathPtr& path)
17 {
18  //modules that can be parallelized, but should not go into a parallel section by themselves
19  std::set<std::string> uselessParallelModules({"HistoManager", "Gearbox", "Geometry"});
20 
21  PathPtr inputPath(new Path);
22  PathPtr mainPath(new Path);
23  PathPtr outputPath(new Path);
24 
25  int stage = 0; //0: in, 1: event/main, 2: out
26  for (const ModulePtr& module : path->getModules()) {
27  bool hasParallelFlag = module->hasProperties(Module::c_ParallelProcessingCertified) and
28  !module->hasProperties(Module::c_Input) and
29  !module->hasProperties(Module::c_Output) and
30  !module->hasProperties(Module::c_HistogramManager) ;
31  //entire conditional path must also be compatible
32  if (hasParallelFlag and module->hasCondition()) {
33  for (const auto& conditionPath : module->getAllConditionPaths()) {
35  hasParallelFlag = false;
36  }
37  }
38  }
39  //if modules have parallal flag -> stage = 1 , event/main
40  if ((stage == 0 and hasParallelFlag) or (stage == 1 and !hasParallelFlag)) {
41  stage++;
42 
43  if (stage == 2) {
44  bool path_is_useful = false;
45  for (const auto& parallelModule : mainPath->getModules()) {
46  if (uselessParallelModules.count(parallelModule->getType()) == 0) {
47  path_is_useful = true;
48  break;
49  }
50  }
51  if (not path_is_useful) {
52  //merge mainPath back into input path
53  inputPath->addPath(mainPath);
54  mainPath.reset(new Path);
55  //and search for further parallel sections
56  stage = 0;
57  }
58  }
59  }
60 
61  if (stage == 0) {
62  inputPath->addModule(module);
63  if (module->hasProperties(Module::c_HistogramManager)) {
64  // Initialize histogram manager if found in the path
65 
66  //add histoman to other paths
67  mainPath->addModule(module);
68  outputPath->addModule(module);
69  }
70  } else if (stage == 1) {
71  mainPath->addModule(module);
72  } else if (stage == 2) {
73  outputPath->addModule(module);
74  }
75  }
76 
77  bool createAllPaths = false; //usually we might not need e.g. an output path
78  for (const ModulePtr& module : path->getModules()) {
79  if (module->hasProperties(Module::c_TerminateInAllProcesses)) {
80  createAllPaths = true; //ensure there are all kinds of processes
81  }
82  }
83 
84  // if main path is empty, createAllPaths doesn't really matter, since we'll fall back to single-core processing
85  if (mainPath->isEmpty() and not createAllPaths) {
86  mainPath.reset();
87  }
88  if (inputPath->isEmpty() and not createAllPaths) {
89  inputPath.reset();
90  }
91  if (outputPath->isEmpty() and not createAllPaths) {
92  outputPath.reset();
93  }
94 
95  return {inputPath, mainPath, outputPath};
96 }
97 
98 ModulePtr PathUtils::getHistogramManager(PathPtr& inputPath, PathPtr& mainPath, PathPtr& outputPath)
99 {
100  ModulePtr histoManagerModule;
101  for (const ModulePtr& module : inputPath->getModules()) {
102  if (module->hasProperties(Module::c_HistogramManager)) {
103  // Initialize histogram manager if found in the path
104  histoManagerModule = module;
105 
106  //add histoman to other paths
107  // mainPath->addModule(histoManagerModule);
108  // outputPath->addModule(histoManagerModule);
109  }
110  }
111 
112  return histoManagerModule;
113 }
114 ModulePtrList PathUtils::preparePaths(PathPtr& inputPath, PathPtr& mainPath, PathPtr& outputPath)
115 {
116  B2ASSERT("The main part is empty. This is a bug in the framework.",
117  mainPath and not mainPath->isEmpty());
118 
119  ModuleManager& moduleManager = ModuleManager::Instance();
120 
121  const auto& environment = Environment::Instance();
122 
123  const auto& socketAddress = environment.getZMQSocketAddress();
124  const auto inputSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_input));
125  const auto outputSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_output));
126  const auto pubSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_pub));
127  const auto subSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_sub));
128 
129  unsigned int maximalWaitingTime = environment.getZMQMaximalWaitingTime();
130  unsigned int eventBufferSize = environment.getZMQEventBufferSize();
131  unsigned int workerTimeout = environment.getZMQWorkerTimeout();
132  bool useEventBackup = environment.getZMQUseEventBackup();
133 
134  if (inputPath) {
135  // Add TXInput after input path
136  ModulePtr zmqTxInputModule = moduleManager.registerModule("ZMQTxInput");
137  zmqTxInputModule->getParam<std::string>("socketName").setValue(inputSocketAddress);
138  zmqTxInputModule->getParam<std::string>("xpubProxySocketName").setValue(pubSocketAddress);
139  zmqTxInputModule->getParam<std::string>("xsubProxySocketName").setValue(subSocketAddress);
140  zmqTxInputModule->getParam<unsigned int>("workerProcessTimeout").setValue(workerTimeout);
141  zmqTxInputModule->getParam<bool>("useEventBackup").setValue(useEventBackup);
142  zmqTxInputModule->getParam<unsigned int>("maximalWaitingTime").setValue(maximalWaitingTime);
143  appendModule(inputPath, zmqTxInputModule);
144 
145  // Add RXWorker before main path
146  ModulePtr zmqRxWorkerModule = moduleManager.registerModule("ZMQRxWorker");
147  zmqRxWorkerModule->getParam<std::string>("socketName").setValue(inputSocketAddress);
148  zmqRxWorkerModule->getParam<std::string>("xpubProxySocketName").setValue(pubSocketAddress);
149  zmqRxWorkerModule->getParam<std::string>("xsubProxySocketName").setValue(subSocketAddress);
150  zmqRxWorkerModule->getParam<unsigned int>("maximalWaitingTime").setValue(maximalWaitingTime);
151  zmqRxWorkerModule->getParam<unsigned int>("eventBufferSize").setValue(eventBufferSize);
152  prependModule(mainPath, zmqRxWorkerModule);
153 
154  }
155 
156  if (outputPath) {
157  // Add TXWorker after main path
158  ModulePtr zmqTxWorkerModule = moduleManager.registerModule("ZMQTxWorker");
159  zmqTxWorkerModule->getParam<std::string>("socketName").setValue(outputSocketAddress);
160  zmqTxWorkerModule->getParam<std::string>("xpubProxySocketName").setValue(pubSocketAddress);
161  zmqTxWorkerModule->getParam<std::string>("xsubProxySocketName").setValue(pubSocketAddress);
162  appendModule(mainPath, zmqTxWorkerModule);
163 
164  // Add RXOutput before output path
165  ModulePtr zmqRxOutputModule = moduleManager.registerModule("ZMQRxOutput");
166  zmqRxOutputModule->getParam<std::string>("socketName").setValue(outputSocketAddress);
167  zmqRxOutputModule->getParam<std::string>("xpubProxySocketName").setValue(pubSocketAddress);
168  zmqRxOutputModule->getParam<std::string>("xsubProxySocketName").setValue(subSocketAddress);
169  zmqRxOutputModule->getParam<unsigned int>("maximalWaitingTime").setValue(maximalWaitingTime);
170  prependModule(outputPath, zmqRxOutputModule);
171 
172  }
173 
174  if (inputPath) {
175  B2INFO("Input Path " << inputPath->getPathString());
176  }
177  if (mainPath) {
178  B2INFO("Main Path " << mainPath->getPathString());
179  }
180  if (outputPath) {
181  B2INFO("Output Path " << outputPath->getPathString());
182  }
183 
184  Path mergedPath;
185  if (inputPath) {
186  mergedPath.addPath(inputPath);
187  }
188  mergedPath.addPath(mainPath);
189  if (outputPath) {
190  mergedPath.addPath(outputPath);
191  }
192 
193  B2INFO("ModuleList " << mergedPath.getPathString());
194 
195  return mergedPath.buildModulePathList();
196 }
197 
199 {
200  ModulePtrList tmpModuleList;
201  for (const ModulePtr& m : modules) {
202  if (m->hasProperties(Module::c_TerminateInAllProcesses))
203  tmpModuleList.push_back(m);
204  }
205  return tmpModuleList;
206 }
207 
209 {
210  for (const ModulePtr& m : prependModules) {
211  if (std::find(modules->begin(), modules->end(), m) == modules->end()) { //not present
212  modules->push_front(m);
213  }
214  }
215 }
216 
217 void PathUtils::appendModule(PathPtr& path, const ModulePtr& module)
218 {
219  path->addModule(module);
220 }
221 
222 void PathUtils::prependModule(PathPtr& path, const ModulePtr& module)
223 {
224  PathPtr newPath(new Path());
225  newPath->addModule(module);
226  newPath->addPath(path);
227  path.swap(newPath);
228 }
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:28
The ModuleManager Class.
Definition: ModuleManager.h:53
std::shared_ptr< Module > registerModule(const std::string &moduleName, std::string sharedLibPath="") noexcept(false)
Creates an instance of a module and registers it to the ModuleManager.
static bool allModulesHaveFlag(const std::list< std::shared_ptr< Module >> &list, unsigned int flag)
Returns true if and only if all modules in list have the given flag (or list is empty).
static ModuleManager & Instance()
Exception is thrown if the requested module could not be created by the ModuleManager.
@ c_HistogramManager
This module is used to manage histograms accumulated by other modules.
Definition: Module.h:81
@ c_Input
This module is an input module (reads data).
Definition: Module.h:78
@ c_ParallelProcessingCertified
This module can be run in parallel processing mode safely (All I/O must be done through the data stor...
Definition: Module.h:80
@ c_Output
This module is an output module (writes data).
Definition: Module.h:79
@ c_TerminateInAllProcesses
When using parallel processing, call this module's terminate() function in all processes().
Definition: Module.h:83
static ModulePtrList getTerminateGloballyModules(const ModulePtrList &modules)
Return only modules which have the TerminateGlobally Module flag set.
Definition: PathUtils.cc:198
static ModulePtrList preparePaths(PathPtr &inputPath, PathPtr &mainPath, PathPtr &outputPath)
Adds internal zmq modules to the paths.
Definition: PathUtils.cc:114
static std::tuple< PathPtr, PathPtr, PathPtr > splitPath(const PathPtr &path)
Split the given part into the input, main and output path (in this order) by looking onto the paralle...
Definition: PathUtils.cc:16
static ModulePtr getHistogramManager(PathPtr &inputPath, PathPtr &mainPath, PathPtr &outputPath)
Find the histogram manager in the paths and return it.
Definition: PathUtils.cc:98
static void prependModulesIfNotPresent(ModulePtrList *modules, const ModulePtrList &prependModules)
Prepend given 'prependModules' to 'modules', if they're not already present.
Definition: PathUtils.cc:208
static void prependModule(PathPtr &path, const ModulePtr &module)
Append given modules to the path.
Definition: PathUtils.cc:222
static void appendModule(PathPtr &path, const ModulePtr &module)
Prepend given modules to the path.
Definition: PathUtils.cc:217
Implements a path consisting of Module and/or Path objects.
Definition: Path.h:38
std::list< std::shared_ptr< Module > > buildModulePathList(bool unique=true) const
Builds a list of all modules which could be executed during the data processing.
Definition: Path.cc:67
void addPath(const PathPtr &path)
See 'pydoc3 basf2.Path'.
Definition: Path.cc:37
std::string getPathString() const override
return a string of the form [module a -> module b -> [another path]]
Definition: Path.cc:206
static std::string getSocketAddress(const std::string &socketAddress, ZMQAddressType socketPart)
Create a full socket address for the given type from a random socket address, ba adding a suffix.
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
Definition: Path.h:28
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
Definition: Module.h:40
std::list< ModulePtr > ModulePtrList
Defines a std::list of shared module pointers.
Definition: Module.h:584
@ c_sub
Multicast publish socket.
@ c_pub
Output socket.
@ c_output
Input socket.
Abstract base class for different kinds of events.