9 #include <framework/pcore/PathUtils.h>
10 #include <framework/pcore/zmq/utils/ZMQAddressUtils.h>
11 #include <framework/core/ModuleManager.h>
12 #include <framework/core/Environment.h>
20 std::set<std::string> uselessParallelModules({
"HistoManager",
"Gearbox",
"Geometry"});
27 for (
const ModulePtr& module : path->getModules()) {
30 if (hasParallelFlag and module->hasCondition()) {
31 for (
const auto& conditionPath : module->getAllConditionPaths()) {
33 hasParallelFlag =
false;
38 if ((stage == 0 and hasParallelFlag) or (stage == 1 and !hasParallelFlag)) {
42 bool path_is_useful =
false;
43 for (
const auto& parallelModule : mainPath->getModules()) {
44 if (uselessParallelModules.count(parallelModule->getType()) == 0) {
45 path_is_useful =
true;
49 if (not path_is_useful) {
51 inputPath->addPath(mainPath);
52 mainPath.reset(
new Path);
60 inputPath->addModule(module);
61 }
else if (stage == 1) {
62 mainPath->addModule(module);
63 }
else if (stage == 2) {
64 outputPath->addModule(module);
68 bool createAllPaths =
false;
69 for (
const ModulePtr& module : path->getModules()) {
71 createAllPaths =
true;
76 if (mainPath->isEmpty() and not createAllPaths) {
79 if (inputPath->isEmpty() and not createAllPaths) {
82 if (outputPath->isEmpty() and not createAllPaths) {
86 return {inputPath, mainPath, outputPath};
92 for (
const ModulePtr& module : inputPath->getModules()) {
95 histoManagerModule = module;
98 mainPath->addModule(histoManagerModule);
99 outputPath->addModule(histoManagerModule);
103 return histoManagerModule;
107 B2ASSERT(
"The main part is empty. This is a bug in the framework.",
108 mainPath and not mainPath->isEmpty());
114 const auto& socketAddress = environment.getZMQSocketAddress();
120 unsigned int maximalWaitingTime = environment.getZMQMaximalWaitingTime();
121 unsigned int eventBufferSize = environment.getZMQEventBufferSize();
122 unsigned int workerTimeout = environment.getZMQWorkerTimeout();
123 bool useEventBackup = environment.getZMQUseEventBackup();
128 zmqTxInputModule->getParam<std::string>(
"socketName").setValue(inputSocketAddress);
129 zmqTxInputModule->getParam<std::string>(
"xpubProxySocketName").setValue(pubSocketAddress);
130 zmqTxInputModule->getParam<std::string>(
"xsubProxySocketName").setValue(subSocketAddress);
131 zmqTxInputModule->getParam<
unsigned int>(
"workerProcessTimeout").setValue(workerTimeout);
132 zmqTxInputModule->getParam<
bool>(
"useEventBackup").setValue(useEventBackup);
133 zmqTxInputModule->getParam<
unsigned int>(
"maximalWaitingTime").setValue(maximalWaitingTime);
138 zmqRxWorkerModule->getParam<std::string>(
"socketName").setValue(inputSocketAddress);
139 zmqRxWorkerModule->getParam<std::string>(
"xpubProxySocketName").setValue(pubSocketAddress);
140 zmqRxWorkerModule->getParam<std::string>(
"xsubProxySocketName").setValue(subSocketAddress);
141 zmqRxWorkerModule->getParam<
unsigned int>(
"maximalWaitingTime").setValue(maximalWaitingTime);
142 zmqRxWorkerModule->getParam<
unsigned int>(
"eventBufferSize").setValue(eventBufferSize);
149 zmqTxWorkerModule->getParam<std::string>(
"socketName").setValue(outputSocketAddress);
150 zmqTxWorkerModule->getParam<std::string>(
"xpubProxySocketName").setValue(pubSocketAddress);
151 zmqTxWorkerModule->getParam<std::string>(
"xsubProxySocketName").setValue(pubSocketAddress);
156 zmqRxOutputModule->getParam<std::string>(
"socketName").setValue(outputSocketAddress);
157 zmqRxOutputModule->getParam<std::string>(
"xpubProxySocketName").setValue(pubSocketAddress);
158 zmqRxOutputModule->getParam<std::string>(
"xsubProxySocketName").setValue(subSocketAddress);
159 zmqRxOutputModule->getParam<
unsigned int>(
"maximalWaitingTime").setValue(maximalWaitingTime);
164 B2INFO(
"Input Path " << inputPath->getPathString());
167 B2INFO(
"Main Path " << mainPath->getPathString());
170 B2INFO(
"Output Path " << outputPath->getPathString());
179 mergedPath.
addPath(outputPath);
189 tmpModuleList.push_back(m);
191 return tmpModuleList;
196 for (
const ModulePtr& m : prependModules) {
197 if (std::find(modules->begin(), modules->end(), m) == modules->end()) {
198 modules->push_front(m);
205 path->addModule(module);
211 newPath->addModule(module);
212 newPath->addPath(path);