Belle II Software development
PathUtils Class Reference

Helper utils for path arithmetics needed in the pEventProcessor. More...

#include <PathUtils.h>

Static Public Member Functions

static std::tuple< PathPtr, PathPtr, PathPtrsplitPath (const PathPtr &path)
 Split the given part into the input, main and output path (in this order) by looking onto the parallel certificate of the modules.
 
static ModulePtrList preparePaths (PathPtr &inputPath, PathPtr &mainPath, PathPtr &outputPath)
 Adds internal zmq modules to the paths.
 
static ModulePtr getHistogramManager (PathPtr &inputPath)
 Find the histogram manager in the paths and return it.
 
static ModulePtrList getTerminateGloballyModules (const ModulePtrList &modules)
 Return only modules which have the TerminateGlobally Module flag set.
 
static void prependModulesIfNotPresent (ModulePtrList *modules, const ModulePtrList &prependModules)
 Prepend given 'prependModules' to 'modules', if they're not already present.
 
static void appendModule (PathPtr &path, const ModulePtr &module)
 Prepend given modules to the path.
 
static void prependModule (PathPtr &path, const ModulePtr &module)
 Append given modules to the path.
 

Detailed Description

Helper utils for path arithmetics needed in the pEventProcessor.

Definition at line 19 of file PathUtils.h.

Member Function Documentation

◆ appendModule()

void appendModule ( PathPtr path,
const ModulePtr module 
)
static

Prepend given modules to the path.

Definition at line 216 of file PathUtils.cc.

217{
218 path->addModule(module);
219}

◆ getHistogramManager()

ModulePtr getHistogramManager ( PathPtr inputPath)
static

Find the histogram manager in the paths and return it.

Definition at line 98 of file PathUtils.cc.

99{
100 ModulePtr histoManagerModule;
101
102
103 for (const ModulePtr& module : inputPath->getModules()) {
104 if (module->hasProperties(Module::c_HistogramManager)) {
105 // Initialize histogram manager if found in the path
106 histoManagerModule = module;
107
108 }
109 }
110
111 return histoManagerModule;
112}
@ c_HistogramManager
This module is used to manage histograms accumulated by other modules.
Definition: Module.h:81
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
Definition: Module.h:43

◆ getTerminateGloballyModules()

ModulePtrList getTerminateGloballyModules ( const ModulePtrList modules)
static

Return only modules which have the TerminateGlobally Module flag set.

Definition at line 197 of file PathUtils.cc.

198{
199 ModulePtrList tmpModuleList;
200 for (const ModulePtr& m : modules) {
201 if (m->hasProperties(Module::c_TerminateInAllProcesses))
202 tmpModuleList.push_back(m);
203 }
204 return tmpModuleList;
205}
@ c_TerminateInAllProcesses
When using parallel processing, call this module's terminate() function in all processes().
Definition: Module.h:83
std::list< ModulePtr > ModulePtrList
Defines a std::list of shared module pointers.
Definition: Module.h:584

◆ preparePaths()

ModulePtrList preparePaths ( PathPtr inputPath,
PathPtr mainPath,
PathPtr outputPath 
)
static

Adds internal zmq modules to the paths.

Definition at line 113 of file PathUtils.cc.

114{
115 B2ASSERT("The main part is empty. This is a bug in the framework.",
116 mainPath and not mainPath->isEmpty());
117
118 ModuleManager& moduleManager = ModuleManager::Instance();
119
120 const auto& environment = Environment::Instance();
121
122 const auto& socketAddress = environment.getZMQSocketAddress();
123 const auto inputSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_input));
124 const auto outputSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_output));
125 const auto pubSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_pub));
126 const auto subSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_sub));
127
128 unsigned int maximalWaitingTime = environment.getZMQMaximalWaitingTime();
129 unsigned int eventBufferSize = environment.getZMQEventBufferSize();
130 unsigned int workerTimeout = environment.getZMQWorkerTimeout();
131 bool useEventBackup = environment.getZMQUseEventBackup();
132
133 if (inputPath) {
134 // Add TXInput after input path
135 ModulePtr zmqTxInputModule = moduleManager.registerModule("ZMQTxInput");
136 zmqTxInputModule->getParam<std::string>("socketName").setValue(inputSocketAddress);
137 zmqTxInputModule->getParam<std::string>("xpubProxySocketName").setValue(pubSocketAddress);
138 zmqTxInputModule->getParam<std::string>("xsubProxySocketName").setValue(subSocketAddress);
139 zmqTxInputModule->getParam<unsigned int>("workerProcessTimeout").setValue(workerTimeout);
140 zmqTxInputModule->getParam<bool>("useEventBackup").setValue(useEventBackup);
141 zmqTxInputModule->getParam<unsigned int>("maximalWaitingTime").setValue(maximalWaitingTime);
142 appendModule(inputPath, zmqTxInputModule);
143
144 // Add RXWorker before main path
145 ModulePtr zmqRxWorkerModule = moduleManager.registerModule("ZMQRxWorker");
146 zmqRxWorkerModule->getParam<std::string>("socketName").setValue(inputSocketAddress);
147 zmqRxWorkerModule->getParam<std::string>("xpubProxySocketName").setValue(pubSocketAddress);
148 zmqRxWorkerModule->getParam<std::string>("xsubProxySocketName").setValue(subSocketAddress);
149 zmqRxWorkerModule->getParam<unsigned int>("maximalWaitingTime").setValue(maximalWaitingTime);
150 zmqRxWorkerModule->getParam<unsigned int>("eventBufferSize").setValue(eventBufferSize);
151 prependModule(mainPath, zmqRxWorkerModule);
152
153 }
154
155 if (outputPath) {
156 // Add TXWorker after main path
157 ModulePtr zmqTxWorkerModule = moduleManager.registerModule("ZMQTxWorker");
158 zmqTxWorkerModule->getParam<std::string>("socketName").setValue(outputSocketAddress);
159 zmqTxWorkerModule->getParam<std::string>("xpubProxySocketName").setValue(pubSocketAddress);
160 zmqTxWorkerModule->getParam<std::string>("xsubProxySocketName").setValue(pubSocketAddress);
161 appendModule(mainPath, zmqTxWorkerModule);
162
163 // Add RXOutput before output path
164 ModulePtr zmqRxOutputModule = moduleManager.registerModule("ZMQRxOutput");
165 zmqRxOutputModule->getParam<std::string>("socketName").setValue(outputSocketAddress);
166 zmqRxOutputModule->getParam<std::string>("xpubProxySocketName").setValue(pubSocketAddress);
167 zmqRxOutputModule->getParam<std::string>("xsubProxySocketName").setValue(subSocketAddress);
168 zmqRxOutputModule->getParam<unsigned int>("maximalWaitingTime").setValue(maximalWaitingTime);
169 prependModule(outputPath, zmqRxOutputModule);
170
171 }
172
173 if (inputPath) {
174 B2INFO("Input Path " << inputPath->getPathString());
175 }
176 if (mainPath) {
177 B2INFO("Main Path " << mainPath->getPathString());
178 }
179 if (outputPath) {
180 B2INFO("Output Path " << outputPath->getPathString());
181 }
182
183 Path mergedPath;
184 if (inputPath) {
185 mergedPath.addPath(inputPath);
186 }
187 mergedPath.addPath(mainPath);
188 if (outputPath) {
189 mergedPath.addPath(outputPath);
190 }
191
192 B2INFO("ModuleList " << mergedPath.getPathString());
193
194 return mergedPath.buildModulePathList();
195}
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:28
The ModuleManager Class.
Definition: ModuleManager.h:53
std::shared_ptr< Module > registerModule(const std::string &moduleName, std::string sharedLibPath="") noexcept(false)
Creates an instance of a module and registers it to the ModuleManager.
static ModuleManager & Instance()
Exception is thrown if the requested module could not be created by the ModuleManager.
static void prependModule(PathPtr &path, const ModulePtr &module)
Append given modules to the path.
Definition: PathUtils.cc:221
static void appendModule(PathPtr &path, const ModulePtr &module)
Prepend given modules to the path.
Definition: PathUtils.cc:216
Implements a path consisting of Module and/or Path objects.
Definition: Path.h:38
std::list< std::shared_ptr< Module > > buildModulePathList(bool unique=true) const
Builds a list of all modules which could be executed during the data processing.
Definition: Path.cc:67
void addPath(const PathPtr &path)
See 'pydoc3 basf2.Path'.
Definition: Path.cc:37
std::string getPathString() const override
return a string of the form [module a -> module b -> [another path]]
Definition: Path.cc:206
static std::string getSocketAddress(const std::string &socketAddress, ZMQAddressType socketPart)
Create a full socket address for the given type from a random socket address, ba adding a suffix.
@ c_sub
Multicast publish socket.
@ c_pub
Output socket.
@ c_output
Input socket.

◆ prependModule()

void prependModule ( PathPtr path,
const ModulePtr module 
)
static

Append given modules to the path.

Definition at line 221 of file PathUtils.cc.

222{
223 PathPtr newPath(new Path());
224 newPath->addModule(module);
225 newPath->addPath(path);
226 path.swap(newPath);
227}
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
Definition: Path.h:35

◆ prependModulesIfNotPresent()

void prependModulesIfNotPresent ( ModulePtrList modules,
const ModulePtrList prependModules 
)
static

Prepend given 'prependModules' to 'modules', if they're not already present.

Definition at line 207 of file PathUtils.cc.

208{
209 for (const ModulePtr& m : prependModules) {
210 if (std::find(modules->begin(), modules->end(), m) == modules->end()) { //not present
211 modules->push_front(m);
212 }
213 }
214}

◆ splitPath()

std::tuple< PathPtr, PathPtr, PathPtr > splitPath ( const PathPtr path)
static

Split the given part into the input, main and output path (in this order) by looking onto the parallel certificate of the modules.

Definition at line 16 of file PathUtils.cc.

17{
18 //modules that can be parallelized, but should not go into a parallel section by themselves
19 std::set<std::string> uselessParallelModules({"HistoManager", "Gearbox", "Geometry"});
20
21 PathPtr inputPath(new Path);
22 PathPtr mainPath(new Path);
23 PathPtr outputPath(new Path);
24
25 int stage = 0; //0: in, 1: event/main, 2: out
26 for (const ModulePtr& module : path->getModules()) {
27 bool hasParallelFlag = module->hasProperties(Module::c_ParallelProcessingCertified) and
28 !module->hasProperties(Module::c_Input) and
29 !module->hasProperties(Module::c_Output) and
30 !module->hasProperties(Module::c_HistogramManager) ;
31 //entire conditional path must also be compatible
32 if (hasParallelFlag and module->hasCondition()) {
33 for (const auto& conditionPath : module->getAllConditionPaths()) {
35 hasParallelFlag = false;
36 }
37 }
38 }
39 //if modules have parallal flag -> stage = 1 , event/main
40 if ((stage == 0 and hasParallelFlag) or (stage == 1 and !hasParallelFlag)) {
41 stage++;
42
43 if (stage == 2) {
44 bool path_is_useful = false;
45 for (const auto& parallelModule : mainPath->getModules()) {
46 if (uselessParallelModules.count(parallelModule->getType()) == 0) {
47 path_is_useful = true;
48 break;
49 }
50 }
51 if (not path_is_useful) {
52 //merge mainPath back into input path
53 inputPath->addPath(mainPath);
54 mainPath.reset(new Path);
55 //and search for further parallel sections
56 stage = 0;
57 }
58 }
59 }
60
61 if (stage == 0) {
62 inputPath->addModule(module);
63 if (module->hasProperties(Module::c_HistogramManager)) {
64 // Initialize histogram manager if found in the path
65
66 //add histoman to other paths
67 mainPath->addModule(module);
68 outputPath->addModule(module);
69 }
70 } else if (stage == 1) {
71 mainPath->addModule(module);
72 } else if (stage == 2) {
73 outputPath->addModule(module);
74 }
75 }
76
77 bool createAllPaths = false; //usually we might not need e.g. an output path
78 for (const ModulePtr& module : path->getModules()) {
79 if (module->hasProperties(Module::c_TerminateInAllProcesses)) {
80 createAllPaths = true; //ensure there are all kinds of processes
81 }
82 }
83
84 // if main path is empty, createAllPaths doesn't really matter, since we'll fall back to single-core processing
85 if (mainPath->isEmpty() and not createAllPaths) {
86 mainPath.reset();
87 }
88 if (inputPath->isEmpty() and not createAllPaths) {
89 inputPath.reset();
90 }
91 if (outputPath->isEmpty() and not createAllPaths) {
92 outputPath.reset();
93 }
94
95 return {inputPath, mainPath, outputPath};
96}
static bool allModulesHaveFlag(const std::list< std::shared_ptr< Module > > &list, unsigned int flag)
Returns true if and only if all modules in list have the given flag (or list is empty).
@ c_Input
This module is an input module (reads data).
Definition: Module.h:78
@ c_ParallelProcessingCertified
This module can be run in parallel processing mode safely (All I/O must be done through the data stor...
Definition: Module.h:80
@ c_Output
This module is an output module (writes data).
Definition: Module.h:79

The documentation for this class was generated from the following files: