Belle II Software development
PathUtils.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include <framework/pcore/PathUtils.h>
9#include <framework/pcore/zmq/utils/ZMQAddressUtils.h>
10#include <framework/core/ModuleManager.h>
11#include <framework/core/Environment.h>
12#include <set>
13
14using namespace Belle2;
15
16std::tuple<PathPtr, PathPtr, PathPtr> PathUtils::splitPath(const PathPtr& path)
17{
18 //modules that can be parallelized, but should not go into a parallel section by themselves
19 std::set<std::string> uselessParallelModules({"HistoManager", "Gearbox", "Geometry"});
20
21 PathPtr inputPath(new Path);
22 PathPtr mainPath(new Path);
23 PathPtr outputPath(new Path);
24
25 int stage = 0; //0: in, 1: event/main, 2: out
26 for (const ModulePtr& module : path->getModules()) {
27 bool hasParallelFlag = module->hasProperties(Module::c_ParallelProcessingCertified) and
28 !module->hasProperties(Module::c_Input) and
29 !module->hasProperties(Module::c_Output) and
30 !module->hasProperties(Module::c_HistogramManager) ;
31 //entire conditional path must also be compatible
32 if (hasParallelFlag and module->hasCondition()) {
33 for (const auto& conditionPath : module->getAllConditionPaths()) {
35 hasParallelFlag = false;
36 }
37 }
38 }
39 //if modules have parallal flag -> stage = 1 , event/main
40 if ((stage == 0 and hasParallelFlag) or (stage == 1 and !hasParallelFlag)) {
41 stage++;
42
43 if (stage == 2) {
44 bool path_is_useful = false;
45 for (const auto& parallelModule : mainPath->getModules()) {
46 if (uselessParallelModules.count(parallelModule->getType()) == 0) {
47 path_is_useful = true;
48 break;
49 }
50 }
51 if (not path_is_useful) {
52 //merge mainPath back into input path
53 inputPath->addPath(mainPath);
54 mainPath.reset(new Path);
55 //and search for further parallel sections
56 stage = 0;
57 }
58 }
59 }
60
61 if (stage == 0) {
62 inputPath->addModule(module);
63 if (module->hasProperties(Module::c_HistogramManager)) {
64 // Initialize histogram manager if found in the path
65
66 //add histoman to other paths
67 mainPath->addModule(module);
68 outputPath->addModule(module);
69 }
70 } else if (stage == 1) {
71 mainPath->addModule(module);
72 } else if (stage == 2) {
73 outputPath->addModule(module);
74 }
75 }
76
77 bool createAllPaths = false; //usually we might not need e.g. an output path
78 for (const ModulePtr& module : path->getModules()) {
79 if (module->hasProperties(Module::c_TerminateInAllProcesses)) {
80 createAllPaths = true; //ensure there are all kinds of processes
81 }
82 }
83
84 // if main path is empty, createAllPaths doesn't really matter, since we'll fall back to single-core processing
85 if (mainPath->isEmpty() and not createAllPaths) {
86 mainPath.reset();
87 }
88 if (inputPath->isEmpty() and not createAllPaths) {
89 inputPath.reset();
90 }
91 if (outputPath->isEmpty() and not createAllPaths) {
92 outputPath.reset();
93 }
94
95 return {inputPath, mainPath, outputPath};
96}
97
99{
100 ModulePtr histoManagerModule;
101
102
103 for (const ModulePtr& module : inputPath->getModules()) {
104 if (module->hasProperties(Module::c_HistogramManager)) {
105 // Initialize histogram manager if found in the path
106 histoManagerModule = module;
107
108 }
109 }
110
111 return histoManagerModule;
112}
114{
115 B2ASSERT("The main part is empty. This is a bug in the framework.",
116 mainPath and not mainPath->isEmpty());
117
118 ModuleManager& moduleManager = ModuleManager::Instance();
119
120 const auto& environment = Environment::Instance();
121
122 const auto& socketAddress = environment.getZMQSocketAddress();
123 const auto inputSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_input));
124 const auto outputSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_output));
125 const auto pubSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_pub));
126 const auto subSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_sub));
127
128 unsigned int maximalWaitingTime = environment.getZMQMaximalWaitingTime();
129 unsigned int eventBufferSize = environment.getZMQEventBufferSize();
130 unsigned int workerTimeout = environment.getZMQWorkerTimeout();
131 bool useEventBackup = environment.getZMQUseEventBackup();
132
133 if (inputPath) {
134 // Add TXInput after input path
135 ModulePtr zmqTxInputModule = moduleManager.registerModule("ZMQTxInput");
136 zmqTxInputModule->getParam<std::string>("socketName").setValue(inputSocketAddress);
137 zmqTxInputModule->getParam<std::string>("xpubProxySocketName").setValue(pubSocketAddress);
138 zmqTxInputModule->getParam<std::string>("xsubProxySocketName").setValue(subSocketAddress);
139 zmqTxInputModule->getParam<unsigned int>("workerProcessTimeout").setValue(workerTimeout);
140 zmqTxInputModule->getParam<bool>("useEventBackup").setValue(useEventBackup);
141 zmqTxInputModule->getParam<unsigned int>("maximalWaitingTime").setValue(maximalWaitingTime);
142 appendModule(inputPath, zmqTxInputModule);
143
144 // Add RXWorker before main path
145 ModulePtr zmqRxWorkerModule = moduleManager.registerModule("ZMQRxWorker");
146 zmqRxWorkerModule->getParam<std::string>("socketName").setValue(inputSocketAddress);
147 zmqRxWorkerModule->getParam<std::string>("xpubProxySocketName").setValue(pubSocketAddress);
148 zmqRxWorkerModule->getParam<std::string>("xsubProxySocketName").setValue(subSocketAddress);
149 zmqRxWorkerModule->getParam<unsigned int>("maximalWaitingTime").setValue(maximalWaitingTime);
150 zmqRxWorkerModule->getParam<unsigned int>("eventBufferSize").setValue(eventBufferSize);
151 prependModule(mainPath, zmqRxWorkerModule);
152
153 }
154
155 if (outputPath) {
156 // Add TXWorker after main path
157 ModulePtr zmqTxWorkerModule = moduleManager.registerModule("ZMQTxWorker");
158 zmqTxWorkerModule->getParam<std::string>("socketName").setValue(outputSocketAddress);
159 zmqTxWorkerModule->getParam<std::string>("xpubProxySocketName").setValue(pubSocketAddress);
160 zmqTxWorkerModule->getParam<std::string>("xsubProxySocketName").setValue(pubSocketAddress);
161 appendModule(mainPath, zmqTxWorkerModule);
162
163 // Add RXOutput before output path
164 ModulePtr zmqRxOutputModule = moduleManager.registerModule("ZMQRxOutput");
165 zmqRxOutputModule->getParam<std::string>("socketName").setValue(outputSocketAddress);
166 zmqRxOutputModule->getParam<std::string>("xpubProxySocketName").setValue(pubSocketAddress);
167 zmqRxOutputModule->getParam<std::string>("xsubProxySocketName").setValue(subSocketAddress);
168 zmqRxOutputModule->getParam<unsigned int>("maximalWaitingTime").setValue(maximalWaitingTime);
169 prependModule(outputPath, zmqRxOutputModule);
170
171 }
172
173 if (inputPath) {
174 B2INFO("Input Path " << inputPath->getPathString());
175 }
176 if (mainPath) {
177 B2INFO("Main Path " << mainPath->getPathString());
178 }
179 if (outputPath) {
180 B2INFO("Output Path " << outputPath->getPathString());
181 }
182
183 Path mergedPath;
184 if (inputPath) {
185 mergedPath.addPath(inputPath);
186 }
187 mergedPath.addPath(mainPath);
188 if (outputPath) {
189 mergedPath.addPath(outputPath);
190 }
191
192 B2INFO("ModuleList " << mergedPath.getPathString());
193
194 return mergedPath.buildModulePathList();
195}
196
198{
199 ModulePtrList tmpModuleList;
200 for (const ModulePtr& m : modules) {
201 if (m->hasProperties(Module::c_TerminateInAllProcesses))
202 tmpModuleList.push_back(m);
203 }
204 return tmpModuleList;
205}
206
208{
209 for (const ModulePtr& m : prependModules) {
210 if (std::find(modules->begin(), modules->end(), m) == modules->end()) { //not present
211 modules->push_front(m);
212 }
213 }
214}
215
216void PathUtils::appendModule(PathPtr& path, const ModulePtr& module)
217{
218 path->addModule(module);
219}
220
222{
223 PathPtr newPath(new Path());
224 newPath->addModule(module);
225 newPath->addPath(path);
226 path.swap(newPath);
227}
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:28
The ModuleManager Class.
Definition: ModuleManager.h:53
std::shared_ptr< Module > registerModule(const std::string &moduleName, std::string sharedLibPath="") noexcept(false)
Creates an instance of a module and registers it to the ModuleManager.
static ModuleManager & Instance()
Exception is thrown if the requested module could not be created by the ModuleManager.
static bool allModulesHaveFlag(const std::list< std::shared_ptr< Module > > &list, unsigned int flag)
Returns true if and only if all modules in list have the given flag (or list is empty).
@ c_HistogramManager
This module is used to manage histograms accumulated by other modules.
Definition: Module.h:81
@ c_Input
This module is an input module (reads data).
Definition: Module.h:78
@ c_ParallelProcessingCertified
This module can be run in parallel processing mode safely (All I/O must be done through the data stor...
Definition: Module.h:80
@ c_Output
This module is an output module (writes data).
Definition: Module.h:79
@ c_TerminateInAllProcesses
When using parallel processing, call this module's terminate() function in all processes().
Definition: Module.h:83
static ModulePtrList getTerminateGloballyModules(const ModulePtrList &modules)
Return only modules which have the TerminateGlobally Module flag set.
Definition: PathUtils.cc:197
static ModulePtrList preparePaths(PathPtr &inputPath, PathPtr &mainPath, PathPtr &outputPath)
Adds internal zmq modules to the paths.
Definition: PathUtils.cc:113
static std::tuple< PathPtr, PathPtr, PathPtr > splitPath(const PathPtr &path)
Split the given part into the input, main and output path (in this order) by looking onto the paralle...
Definition: PathUtils.cc:16
static ModulePtr getHistogramManager(PathPtr &inputPath)
Find the histogram manager in the paths and return it.
Definition: PathUtils.cc:98
static void prependModulesIfNotPresent(ModulePtrList *modules, const ModulePtrList &prependModules)
Prepend given 'prependModules' to 'modules', if they're not already present.
Definition: PathUtils.cc:207
static void prependModule(PathPtr &path, const ModulePtr &module)
Append given modules to the path.
Definition: PathUtils.cc:221
static void appendModule(PathPtr &path, const ModulePtr &module)
Prepend given modules to the path.
Definition: PathUtils.cc:216
Implements a path consisting of Module and/or Path objects.
Definition: Path.h:38
std::list< std::shared_ptr< Module > > buildModulePathList(bool unique=true) const
Builds a list of all modules which could be executed during the data processing.
Definition: Path.cc:67
void addPath(const PathPtr &path)
See 'pydoc3 basf2.Path'.
Definition: Path.cc:37
std::string getPathString() const override
return a string of the form [module a -> module b -> [another path]]
Definition: Path.cc:206
static std::string getSocketAddress(const std::string &socketAddress, ZMQAddressType socketPart)
Create a full socket address for the given type from a random socket address, ba adding a suffix.
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
Definition: Path.h:35
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
Definition: Module.h:43
std::list< ModulePtr > ModulePtrList
Defines a std::list of shared module pointers.
Definition: Module.h:584
@ c_sub
Multicast publish socket.
@ c_pub
Output socket.
@ c_output
Input socket.
Abstract base class for different kinds of events.